最近東京でも利用できるようになったRedshift spectrumを使ってみた. やりたいこととしては以下の通り.
- 適当なnginxのログがS3に溜まっているとする
- 形式は改行区切りのjson
- それをRedshift spectrumから触れるようにしたい
Redshift spectrumとAthena
これは最初に知りたかったことなのだが,Redshift spectrumで利用するRedshiftの外部データテーブルはデフォルトではAthena内に作成されている. http://docs.aws.amazon.com/ja_jp/redshift/latest/dg/c-spectrum-external-schemas.html
つまり,
- AthenaでS3のデータを参照するテーブルを作ってそれをRedshiftの外部データテーブルに指定すること
- Redshift内で
CREATE EXTERNAL TABLE
して外部テーブルを作成・管理すること
の2つは,ほとんど同じことをしているといえる.
テーブルをRedshift内で作成・管理したければRedshift内でCREATE EXTERNAL TABLE
すればいいし,Athenaの画面からポチポチしたい場合はAthena側で作れば良い.そこに大きな差はなかった.
最初ここがわからずに,迷子になっていた.
jsonのデータを読み込む
Athenaでjsonのデータを読み込む
というわけで,まずはAthenaでデータを読めるようにしたい.
jsonはAthenaで扱う上ではそこまで効率の良いフォーマットではないが,とりあえず読めるところまで行きたいので気にせずにjsonのまま読ませる(もっと高速なColumn-orientedなフォーマットについては後述). https://qiita.com/moaikids/items/e91b1bcb17458d865beb
重要なこととしてパーティションがある.パーティションを切っておかないとAthenaは対象S3ディレクトリの下をフルスキャンすることになり,データ量によってはかなりの時間がかかってしまう.
というわけでパーティションを切るわけだが,Athenaは裏側でHiveが動いており,Hiveのフォーマットに則ったディレクトリ構成にしておくと非常にパーティションを作りやすい. https://dev.classmethod.jp/cloud/aws/athena-partition-reinvent/
s3://akira-playground/nginx/dt=2017-11-21-14
というようなディレクトリ構成にしておくと,parition keyをdtにするだけでパーティションを作ってくれる.
なので,できるだけこの形式でログを保存しておこう.
もう一点補足がある.
timestampを保存するときにdatetime型を指定すると思うのだが,timestampの形式はHiveのdatetime形式である必要がある.
yyyy-mm-dd hh:mm:ss[.fffffffff]
というような形式に収めて置くと,datetime型として検索できる.
パーティションを設定した場合は,先にLoad partitionしてMSCK REPAIR TABLE
を流す必要がある.
Redshift spectrumで触りたい
と思ってRedshiftからAthenaのテーブルを参照してみようと思った.
https://dev.classmethod.jp/cloud/aws/amazon-redshift-spectrum-with-amazon-athena/
ERROR: External Catalog Error: Unsupported file format. org.apache.hadoop.mapred.TextInputFormat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat org.openx.data.jsonserde.JsonSerDe
おかしい,エラーが出る.
たしかにRedshift spectrumはjsonに対応していない.
https://dev.classmethod.jp/cloud/aws/amazon-redshift-spectrum-is-released/ https://www.gixo.jp/blog/10094/
結局jsonについての明確な言及記事は今のところ見つからなかった. ただ,エラーから察するに,やっぱりまだspectrumではjsonを読むことは出来ないらしい,たとえAthenaのテーブルを参照していたとしても.
諦めてparquetにする
ちなみに何度調べてもすぐに読み方を忘れるのだが,「パーケイ」と読むらしい. parquetの詳細についてはこちらを参照してもらうとして. http://labotech.dmm.com/entry/2015/09/08/1642
変換する
json -> parquetを行う. embulk等を使っても良いのだが,Redshift spectrumを使うようなケースを考えると,日常的に大量のログ変換が必要になる気がしていて,Glueを使ってみた. https://qiita.com/hideji2/items/85747e3d66026045614d
ちなみに,まだ東京には来ていないので,us-eastのGlueを使うしかなかった.
大体上記の記事と同じ作り方で変換できた.
import sys from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from pyspark.sql import SQLContext from datetime import datetime, timedelta, tzinfo sc = SparkContext() glueContext = GlueContext(sc) sqlContext = SQLContext(sc) # datetime.nowで取ってきた日時をJSTにしたいがために用意しておく class JST(tzinfo): def utcoffset(self, dt): return timedelta(hours=9) def dst(self, dt): return timedelta(0) def tzname(self, dt): return 'JST' now = datetime.now(tz=JST()) target_date = now - timedelta(hours=1) date_str = target_date.strftime("%Y-%m-%d-%H") ## @params: [IN_PATH, IN_PATH] args = getResolvedOptions(sys.argv, ['IN_PATH', 'OUT_PATH']) # 引数でもらったディレクトリ内は日時でパーティションされたディレクトリが並んでいる in_path = args['IN_PATH'] + "dt=" + date_str + "/*.json" # 出力先も日時パーティションで出力する out_path = args['OUT_PATH'] + "dt=" + date_str + "/" print(in_path) print(out_path) # http://qiita.com/ajis_ka/items/e2e5b759e77933b08687 sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2") # http://tech-blog.tsukaby.com/archives/1162 sc._jsc.hadoopConfiguration().set("spark.speculation", "false") sqlContext = SQLContext(sc) jsonDataFrame = sqlContext.read.json(in_path) jsonDataFrame.write.mode("overwrite").format("parquet").option("compression", "snappy").mode("overwrite").save(out_path)
こいつをETLのTriggersで毎時実行にしてやると,毎時のパーティションでparquetのログをS3に保存できる.
Athenaでparquetのデータを読み込む
Glueで変換したparquetのデータは,jsonの時と同じくHiveのパーティションに沿ったディレクトリ構成にしてあった. そのため,だいたいjsonのときと同じ手順でいけた.形式がparquetになるだけ.
Redshift spectrumでparquetのデータを読む
Schemaを作る.
sample=# create external schema spectrum sample-# from data catalog sample-# database 'akira_playground' sample-# iam_role 'arn:aws:iam::123456789012:role/mySpectrumRole'; CREATE SCHEMA
テーブルは先程Athenaにnginx_parquet
として用意してあるので,ここで作成する必要はない.
sample=# select count(status) from spectrum.nginx_parquet where dt='2017-11-21-11'; count ------- 360 (1 行)
無事こんな感じでspectrumすることができた.
まとめ
Redshift spectrumとAthena
https://qiita.com/moaikids/items/e91b1bcb17458d865beb#%E5%80%8B%E4%BA%BA%E7%9A%84%E3%81%AA%E3%81%BE%E3%81%A8%E3%82%81 こちらでも言われているとおり,spectrumを使うこと前提に立つと,Athenaはそれ単体による解析ツールというより,S3のデータをRedshift内のデータと結びつけ,spectrumしやするする補助ツールっぽい位置づけになる.
もちろん,計算リソースとしてどちらを使うか,どちらのほうが速いかは,Redshiftに普段からどのくらい課金しているかによるとは思うが.
Glueべんり
2017年11月21日現在,jsonはそのままではRedshift spectrumから触ることはできなかった. jsonを触るなら今まで通りのAthenaを使うしかない.
かといってnginx等のアプリケーションから吐き出すログは今まで通りjsonにするしかないし,たとえparquetに変換するにしても1時間とか1日分とか,まとまった単位で変換しておかないとパフォーマンスは出ないだろう.
だからこそ,Glueはとても便利だった.
一度jsonをparquetに変換してしまえばRedshift spectrumするのもかなり速くなって良いので,長期間溜めたログはparquetに変換すると良いかもしれない.
Glue早く東京に来て欲しい.