本文を読み飛ばす

Azure Data Lake Storage Gen2にデータを保存する

私てっしー、もう数年前からセンサーのリアルタイムデータの保管先に苦労しておりまして。

色々な要件を鑑みると、現状Azure Data Lake Storage Gen2(ADLS Gen2)が非常に調子が良さそうと、Previewの段階から思ってたわけです。 ただ、ADLS Gen2にぶっこんだデータを分析する方法などは数多あれど、肝心のデータのぶっこみ方法がわからない●

(厳密に言うと、AzCopy、Dictcp、Azure Data Factoryに関しては書いてある)

けどさぁ、センサーデータそのままストリームで書き込みたいじゃん?とか厨二なこと考えて試行錯誤したらえらい面倒だった後失敗したのでその記録。。。

ADLS Gen2 Interface List

ADLS Gen2にファイルを保存するには以下の方法がある。(と、書かれてる。)

API セット Data Lake Storage Gen1 Data Lake Storage Gen2 での使用可否 - 共有キー認証を使用 Data Lake Storage Gen2 での使用可否 - OAuth 認証を使用
.NET SDK - 管理 リンク サポートされていません 既に使用可能 - リンク
.NET SDK - ファイル システム リンク まだ使用不可能 まだ使用不可能
Java SDK - 管理 リンク サポートされていません 既に使用可能 - リンク
Java SDK - ファイル システム リンク まだ使用不可能 まだ使用不可能
Node.js - 管理 リンク サポートされていません 既に使用可能 - リンク
Node.js - ファイル システム リンク まだ使用不可能 まだ使用不可能
Python - 管理 リンク サポートされていません 既に使用可能 - リンク
Python - ファイル システム リンク まだ使用不可能 まだ使用不可能
REST API - 管理 リンク サポートされていません 既に使用可能 -
REST API - ファイル システム リンク 既に使用可能 既に使用可能 - リンク

https://docs.microsoft.com/ja-jp/azure/storage/blobs/data-lake-storage-upgrade より転記及び編集)

要するに、RESTでしか使えない、と言うわけである●(2019-07-12現在)

ADLS Gen2 with REST and Shared Key

これ、若干語弊があって、階層型記憶構造(https://docs.microsoft.com/ja-jp/azure/storage/blobs/data-lake-storage-namespace) をONにしてなければ、Azure-Storage SDKのBlob Driverで書けるっぽい。けど、階層型記憶構造使いたいんじゃん?と言うわけでRESTでガリガリ書いた。

この時、OAuth2使った場合はRefreshの処理とか面倒なのでShared Keyでやればいいやと思ったのが運の尽き・・・

In [2]:
import requests
import datetime
import hmac
import hashlib
import base64
import pprint
import json
import uuid
import re
import time
from dateutil.parser import parse

storage_account_name = <STORAGE_ACCOUNT_NAME>
storage_account_key = <STORAGE_ACCUNT_KEY>
api_version = '2018-11-09'
request_id = str(uuid.uuid1())
request_time = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')

#the file path on adls gen2
FILE_SYSTEM_NAME='<FILE_PATH>\nresource:file'

data = "<TEST></TEST>"
content_len = str(len(data))

string_params = {
    'verb': 'PUT',
    'Content-Encoding': '',
    'Content-Language': '',
    'Content-Length': content_len,
    'Content-MD5': '',
    'Content-Type': '', #application/octet-stream',
    'Date': '',
    'If-Modified-Since': '',
    'If-Match': '',
    'If-None-Match': '',
    'If-Unmodified-Since': '',
    'Range': '',
    'CanonicalizedHeaders': f"x-ms-date:{request_time}\nx-ms-version:{api_version}\nx-ms-client-request-id:{request_id}",
    'CanonicalizedResource': f"/{storage_account_name}/{FILE_SYSTEM_NAME}"
    }

string_to_sign = (string_params['verb'] + '\n'
                  + string_params['Content-Encoding'] + '\n'
                  + string_params['Content-Language'] + '\n'
                  + string_params['Content-Length'] + '\n'
                  + string_params['Content-MD5'] + '\n'
                  + string_params['Content-Type'] + '\n'
                  + string_params['Date'] + '\n'
                  + string_params['If-Modified-Since'] + '\n'
                  + string_params['If-Match'] + '\n'
                  + string_params['If-None-Match'] + '\n'
                  + string_params['If-Unmodified-Since'] + '\n'
                  + string_params['Range'] + '\n'
                  + string_params['CanonicalizedHeaders'] + '\n'
                  + string_params['CanonicalizedResource'])

pprint.pprint(string_to_sign)

signed_string = base64.b64encode(
    hmac.new(
        base64.b64decode(storage_account_key), 
        msg=string_to_sign.encode('utf-8'), 
        digestmod=hashlib.sha256).digest()).decode()

params = {
    'resource': 'file'
}

headers = {
    'x-ms-date' : request_time,
    'x-ms-version' : api_version,
    'x-ms-client-request-id': request_id,
    # 'Content-Type': 'application/octet-stream',
    # 'Content-Length': content_len,
    'Authorization' : f'SharedKey {storage_account_name}:{signed_string}'
}

pprint.pprint(string_params)
print(headers)
url = ('https://' + storage_account_name + '.dfs.core.windows.net/'+'<FILE_PATH>')
r = requests.put(url, headers=headers, params=params, data=data)

#print out the file content
print(r.content)
  File "<ipython-input-2-2d37819005c7>", line 13
    storage_account_name = <STORAGE_ACCOUNT_NAME>
                           ^
SyntaxError: invalid syntax

SharedKey Authの何が面倒ってこのAuthorization HeaderにくっつけるSigned Stringの生成。ドキュメント見ながらしこしこ書いたんだが、どーにもうまくいかない・・・。 というのも、どうも Content-Lengthに空文字以外を指定するとAuthが通らない、すなわちBodyが入ったRequestができない●(GETでファイルのリスト取るとかはできた)

詰んだ・・・orz(これ理解するのに丸一日かかった・・・)

ADLS Gen2 with AzCopy

んが、どうも色々調べてみたらAzCopyでできるらしい、そしていつの間にかAzCopyがLinux, macOSでも動くようになったと。 (AzCopyって文字列見ただけで撥ねてた、Windowsでしか動かないと思ってたから・・)

と言うわけで、AzCopyでADLS Gen2にファイルをコピーするシェルスクリプトを書いてみた。

storageAccount=<STORAGE_ACCOUNT_NAME>
tenantID=<TENANT_ID>
applicationID=<Application_ID>
clientSecret=<CLIENT_SECRET>
filePath=<FILE_PATH>

export AZCOPY_SPA_CLIENT_SECRET=$clientSecret
./azcopy login --service-principal --application-id $applicationID --tenant-id $tenantID
./azcopy cp "./targetFile.json" "https://$storageAccount.dfs.core.windows.net/$filePath"

前提ですが、サービスプリンシパル作ってClientSecret生成してくださいな。やり方は以下のページにまとまってます。

AzCopy on Docker

さてさて、これで終わらないのがキモ。これ、Dockerで動かそうとすると動かないのです。。。 理由はKeyutils使うため。何も考えずにDockerImageにすると、azcopy loginが失敗します。

なので、Dockerのセキュリティーポリシーをオーバーライドしないといかんのです。 以下が作ったDocker FIle

FROM alpine:3.9.4
RUN apk --no-cache add curl jq libc6-compat
WORKDIR /root
RUN curl -L https://aka.ms/downloadazcopy-v10-linux | tar zx && mv ./azcopy_linux*/azcopy . && chown root:root ./azcopy && rm -rf ./azcopy_linux*
ADD azcopy.sh /root
ENTRYPOINT ["/bin/sh", "./azcopy.sh"]

azcopy.sh は、先に書いたシェルスクリプトです。これをビルドして実行するには以下のコマンド。

docker build -t azcopy_test:0.1.0 .
docker run -it --rm --security-opt seccomp=unconfined azcopy_test:0.1.0

--security-opt seccomp=unconfinedがキモ。これを指定するとseccompがunconfinedに設定されてkeyutilsが使えるようになります。ただし、セキュリティーリスクは上がるのでご利用は計画的に。。。

AzCopy on Kubernetes

さらに、Docker Imageできたんだからk8sで動かしたくなるわけですよ。ここでもseccompの設定を行う必要あり(ただし、どうも現状だとk8sのdockerはseccompがunconfinedみたいな記述もあり、もしかしたら不要かも。ただし今後効いてくる可能性高し)

kml
apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: azcopy_test
  labels:
    kind: CronJob
  annotations:
    # https://kubernetes.io/docs/concepts/policy/pod-security-policy/
    seccomp.security.alpha.kubernetes.io/pod: unconfined
spec:
  schedule: "* * * * *"
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: azcopy_test
              image: azcopy_test:0.1.0
          restartPolicy: Never

こんな風に、annotationとしてpodのseccompをunconfinedにすれば動きます。めでたしめでたし。

注意 元あったソースからコアの部分抜き出してコードを書いてるので間違ってたらごめんなさい●

Share