Redshift spectrumでnginxのログ解析をする

最近東京でも利用できるようになったRedshift spectrumを使ってみた. やりたいこととしては以下の通り.

  • 適当なnginxのログがS3に溜まっているとする
  • 形式は改行区切りのjson
  • それをRedshift spectrumから触れるようにしたい

Redshift spectrumとAthena

これは最初に知りたかったことなのだが,Redshift spectrumで利用するRedshiftの外部データテーブルはデフォルトではAthena内に作成されているhttp://docs.aws.amazon.com/ja_jp/redshift/latest/dg/c-spectrum-external-schemas.html

つまり,

  • AthenaでS3のデータを参照するテーブルを作ってそれをRedshiftの外部データテーブルに指定すること
  • Redshift内でCREATE EXTERNAL TABLE して外部テーブルを作成・管理すること

の2つは,ほとんど同じことをしているといえる.

テーブルをRedshift内で作成・管理したければRedshift内でCREATE EXTERNAL TABLE すればいいし,Athenaの画面からポチポチしたい場合はAthena側で作れば良い.そこに大きな差はなかった.

最初ここがわからずに,迷子になっていた.

jsonのデータを読み込む

Athenaでjsonのデータを読み込む

というわけで,まずはAthenaでデータを読めるようにしたい.

jsonはAthenaで扱う上ではそこまで効率の良いフォーマットではないが,とりあえず読めるところまで行きたいので気にせずにjsonのまま読ませる(もっと高速なColumn-orientedなフォーマットについては後述). https://qiita.com/moaikids/items/e91b1bcb17458d865beb

f:id:h3poteto:20200327215029p:plain

重要なこととしてパーティションがある.パーティションを切っておかないとAthenaは対象S3ディレクトリの下をフルスキャンすることになり,データ量によってはかなりの時間がかかってしまう.

というわけでパーティションを切るわけだが,Athenaは裏側でHiveが動いており,Hiveのフォーマットに則ったディレクトリ構成にしておくと非常にパーティションを作りやすい. https://dev.classmethod.jp/cloud/aws/athena-partition-reinvent/

s3://akira-playground/nginx/dt=2017-11-21-14

というようなディレクトリ構成にしておくと,parition keyをdtにするだけでパーティションを作ってくれる.

f:id:h3poteto:20200327215042p:plain

なので,できるだけこの形式でログを保存しておこう.

もう一点補足がある. timestampを保存するときにdatetime型を指定すると思うのだが,timestampの形式はHiveのdatetime形式である必要がある. yyyy-mm-dd hh:mm:ss[.fffffffff] というような形式に収めて置くと,datetime型として検索できる.

f:id:h3poteto:20200327215054p:plain

パーティションを設定した場合は,先にLoad partitionしてMSCK REPAIR TABLE を流す必要がある.

Redshift spectrumで触りたい

と思ってRedshiftからAthenaのテーブルを参照してみようと思った.

https://dev.classmethod.jp/cloud/aws/amazon-redshift-spectrum-with-amazon-athena/

ERROR: External Catalog Error: Unsupported file format. org.apache.hadoop.mapred.TextInputFormat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat org.openx.data.jsonserde.JsonSerDe

おかしい,エラーが出る.

たしかにRedshift spectrumはjsonに対応していない.

https://dev.classmethod.jp/cloud/aws/amazon-redshift-spectrum-is-released/ https://www.gixo.jp/blog/10094/

結局jsonについての明確な言及記事は今のところ見つからなかった. ただ,エラーから察するに,やっぱりまだspectrumではjsonを読むことは出来ないらしい,たとえAthenaのテーブルを参照していたとしても

諦めてparquetにする

ちなみに何度調べてもすぐに読み方を忘れるのだが,「パーケイ」と読むらしい. parquetの詳細についてはこちらを参照してもらうとして. http://labotech.dmm.com/entry/2015/09/08/1642

変換する

json -> parquetを行う. embulk等を使っても良いのだが,Redshift spectrumを使うようなケースを考えると,日常的に大量のログ変換が必要になる気がしていて,Glueを使ってみた. https://qiita.com/hideji2/items/85747e3d66026045614d

ちなみに,まだ東京には来ていないので,us-eastのGlueを使うしかなかった.

大体上記の記事と同じ作り方で変換できた.

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SQLContext
from datetime import datetime, timedelta, tzinfo

sc = SparkContext()
glueContext = GlueContext(sc)
sqlContext = SQLContext(sc)

# datetime.nowで取ってきた日時をJSTにしたいがために用意しておく
class JST(tzinfo):
    def utcoffset(self, dt):
        return timedelta(hours=9)

    def dst(self, dt):
        return timedelta(0)

    def tzname(self, dt):
        return 'JST'
        
now = datetime.now(tz=JST())
target_date = now - timedelta(hours=1)
date_str = target_date.strftime("%Y-%m-%d-%H")

## @params: [IN_PATH, IN_PATH]
args = getResolvedOptions(sys.argv, ['IN_PATH', 'OUT_PATH'])


# 引数でもらったディレクトリ内は日時でパーティションされたディレクトリが並んでいる
in_path  = args['IN_PATH'] + "dt=" + date_str + "/*.json"
# 出力先も日時パーティションで出力する
out_path = args['OUT_PATH'] + "dt=" + date_str + "/"
print(in_path)
print(out_path)

# http://qiita.com/ajis_ka/items/e2e5b759e77933b08687
sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")
# http://tech-blog.tsukaby.com/archives/1162
sc._jsc.hadoopConfiguration().set("spark.speculation", "false")

sqlContext = SQLContext(sc)

jsonDataFrame = sqlContext.read.json(in_path)

jsonDataFrame.write.mode("overwrite").format("parquet").option("compression", "snappy").mode("overwrite").save(out_path)

こいつをETLのTriggersで毎時実行にしてやると,毎時のパーティションでparquetのログをS3に保存できる.

Athenaでparquetのデータを読み込む

Glueで変換したparquetのデータは,jsonの時と同じくHiveのパーティションに沿ったディレクトリ構成にしてあった. そのため,だいたいjsonのときと同じ手順でいけた.形式がparquetになるだけ.

f:id:h3poteto:20200327215111p:plain

Redshift spectrumでparquetのデータを読む

Schemaを作る.

sample=# create external schema spectrum
sample-# from data catalog
sample-# database 'akira_playground'
sample-# iam_role 'arn:aws:iam::123456789012:role/mySpectrumRole';
CREATE SCHEMA

テーブルは先程Athenaにnginx_parquetとして用意してあるので,ここで作成する必要はない.

sample=# select count(status) from spectrum.nginx_parquet where dt='2017-11-21-11';
 count
-------
   360
(1 行)

無事こんな感じでspectrumすることができた.

まとめ

Redshift spectrumとAthena

https://qiita.com/moaikids/items/e91b1bcb17458d865beb#%E5%80%8B%E4%BA%BA%E7%9A%84%E3%81%AA%E3%81%BE%E3%81%A8%E3%82%81 こちらでも言われているとおり,spectrumを使うこと前提に立つと,Athenaはそれ単体による解析ツールというより,S3のデータをRedshift内のデータと結びつけ,spectrumしやするする補助ツールっぽい位置づけになる.

もちろん,計算リソースとしてどちらを使うか,どちらのほうが速いかは,Redshiftに普段からどのくらい課金しているかによるとは思うが.

Glueべんり

2017年11月21日現在,jsonはそのままではRedshift spectrumから触ることはできなかった. jsonを触るなら今まで通りのAthenaを使うしかない.

かといってnginx等のアプリケーションから吐き出すログは今まで通りjsonにするしかないし,たとえparquetに変換するにしても1時間とか1日分とか,まとまった単位で変換しておかないとパフォーマンスは出ないだろう.

だからこそ,Glueはとても便利だった.

一度jsonをparquetに変換してしまえばRedshift spectrumするのもかなり速くなって良いので,長期間溜めたログはparquetに変換すると良いかもしれない.

Glue早く東京に来て欲しい.