Docker for macに乗り換えようかと迷った

もともとmacでのdockerはdinghyをずっと使っている. docker-syncはなんか違うなーと思っているし,dockerを使いはじめる段階ではそもそもDocker for macがまだ公開されていなかった.

で,Docker for macが公開されたのはいいんだけど,ずっとファイル共有が遅いという問題が解決していなかったように思う.

最近は,cachedとかdelegatedというのがでてきて

docs.docker.com

速くなってきたらしいというので,いよいよDocker for macに乗り換えられるかなーと思った.

続きを読む

goのgRPCで便利ツールを使う

この記事は Go2 Advent Calendar 2017の24日目です.

クリスマスイブにこんばんは.

今日はgRPCの話を書きます.

gRPC

gRPCについてはいろいろな記事で既に説明されているのでここではあまり詳しく説明しないが,googleが公開しているRPCフレームワークである. このRPCの定義をProtocol Bufferで定義でき,そこからサーバ&クライアントのライブラリを生成できる. ちなみにHTTP/2で通信するらしい.

で,仕事では最近scalaでgRPCしていたりしたのだけれど,やっぱりgoで書くのが一番いろんなことができて嬉しいなーと改めて思ったので,goでgRPCサーバを書くことを前提にして,ちょっと便利ライブラリを使ってみたいと思う. なお,gRPCサーバをgoで書くだけであり,クライアントはrubyを使ったりjsを使ったりしている.

goでgRPCする

gRPCサーバ

まずは基本となるgRPCサーバを作ろう.

とてもいいサンプルは公式にあるので,ここで載せるコードは目安程度に考えてほしい.

今回はこんなprotoファイルを作った.

syntax = "proto3";

package protocol;

service CustomerService {
  rpc ListPerson(RequestType) returns (stream Person) {}
  rpc AddPerson(Person) returns (ResponseType) {}
}

message ResponseType {
}

message RequestType {
}

message Person {
  string name = 1;
  int32 age = 2;
}

そうしたら,ここからgrpcサーバ用のライブラリを生成する.

$ protoc --proto_path=. --go_out=plugins=grpc:./ protocol/*.proto

するとprotocol/配下にcustomer_service.pb.goが生成されると思う.これがライブラリとなる.

で,これをimportしてgrpcサーバを書こう.

package main

import (
    "log"
    "net"
    "os"
    "os/signal"
    "sync"

    pb "github.com/h3poteto/go-grpc-example/protocol"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
)

type customerService struct {
    customers []*pb.Person
    m         sync.Mutex
}

func (cs *customerService) ListPerson(p *pb.RequestType, stream pb.CustomerService_ListPersonServer) error {
    cs.m.Lock()
    defer cs.m.Unlock()
    for _, p := range cs.customers {
        if err := stream.Send(p); err != nil {
            return err
        }
    }
    return nil

}

func (cs *customerService) AddPerson(c context.Context, p *pb.Person) (*pb.ResponseType, error) {
    cs.m.Lock()
    defer cs.m.Unlock()
    cs.customers = append(cs.customers, p)
    return new(pb.ResponseType), nil
}

func main() {
    port := os.Getenv("SERVER_PORT")
    lis, err := net.Listen("tcp", ":"+port)
    if err != nil {
        log.Fatalf("faild to listen: %v", err)
    }
    server := grpc.NewServer()
    pb.RegisterCustomerServiceServer(server, new(customerService))

    go func() {
        log.Printf("start grpc server port: %s", port)
        server.Serve(lis)
    }()

    quit := make(chan os.Signal)
    signal.Notify(quit, os.Interrupt)
    <-quit
    log.Println("stopping grpc server...")
    server.GracefulStop()
}

GracefulStopまでやっているのは完全に趣味だ.

あとはこいつを走らせるとgRPCサーバがスタートする.

$ go run server/server.go
2017/12/23 16:56:11 starting grpc server port: 9090

gRPCクライアント

さて,サーバをスタートさせてもこいつはcurlもできないしブラウザからアクセスすることもできない.なぜならRPCだから. だから,こいつはなにか別のプログラムから呼び出してやらなきゃならない.

今回はrubyで呼び出してみよう.

まず,rubyのクライアントを生成してみよう.

source 'https://rubygems.org'

gem 'grpc', '~> 1.8'
gem 'grpc-tools', '~> 1.8'

こんなGemfileを用意して,

$ cd client/ruby && bundle install

する.

そしたら,

$ cd client/ruby && protoc -I ../../protocol --proto_path=. --ruby_out=lib --grpc_out=lib --plugin=protoc-gen-grpc=$(bundle exec which grpc_tools_ruby_protoc_plugin) ../../protocol/*.proto

というコマンドによりruby用のライブラリを生成しよう.client/ruby/libcustomer_service_pb.rbcustomer_service_services_pb.rbが出来上がる.

これを使ったクライアントプログラムを書こう.

#!/usr/bin/env ruby

$LOAD_PATH.push('./lib')

require 'grpc'
require 'customer_service_services_pb'

def main
  stub = Protocol::CustomerService::Stub.new("#{ENV["SERVER_IP"]}:#{ENV["SERVER_PORT"]}", :this_channel_is_insecure)
  if ARGV.size == 2
    stub.add_person(Protocol::Person.new(name: ARGV[0], age: ARGV[1].to_i))
  else
    stub.list_person(Protocol::RequestType.new).each do |x|
      puts "name=#{x.name}, age=#{x.age}"
    end
  end
end

main

とりあえずテストなので,引数がなかったらListPersonを,引数が2つあればAddPersonを呼び出すことにした.

やってみる.

$ bundle exec ruby client.rb
$ bundle exec ruby client.rb h3poteto 28
$ bundle exec ruby client.rb
name=h3poteto, age=28

こんな感じでgo側で実装したメソッドを呼び出すことができた.

バリデーションをかけたい

バリデーションをどこでかけるか問題

gRPCは基本的にはRPCだ.なのでRESTのAPIとは少し違った感覚でメソッドを実装すると思う. そうなると,APIではよく作っているバリデーションというは一体どこにどう実装したらいいんだろう?

gRPCの定義はProtocol Bufferで行っており,これ自体にはちゃんと型を定義している.

しかし,型よりももう少し詳しく,たとえば先の例であれば,「ageは0以上,200以下くらいの数値に収めたい」と思うだろう. そう思った時,順当に考えたら,呼ばれるメソッド側でバリデーションを作るしかない.バリデーションライブラリについては各言語で様々な実装があるので,それを使うとして.

それにしても,せっかくProtocol Bufferでリクエスト,レスポンスの形を定義できるのに,バリデーションはサーバ側の実装を読まないとわからないってのは,なかなかイケてないよね.

あれ,この情報,Protocol Bufferに書けたら最高じゃね?

はい,あります.

https://github.com/grpc-ecosystem/go-grpc-middleware/tree/master/validator

go-grpc-middlewareという,goでgrpcするときの便利ライブラリの中に,validatorがある.

こいつは,https://github.com/mwitkow/go-proto-validators を使ってProtocol Bufferの定義からバリデーションを作り出している.

validatorを使ってみる

では,次はこれを使ってバリデーションをかけてみよう.

protoファイルを以下のように書き換える.

syntax = "proto3";

package protocol;
import "github.com/mwitkow/go-proto-validators/validator.proto";

service CustomerService {
  rpc ListPerson(RequestType) returns (stream Person) {}
  rpc AddPerson(Person) returns (ResponseType) {}
}

message ResponseType {
}

message RequestType {
}

message Person {
  string name = 1;
  int32 age = 2 [(validator.field) = {int_gt: 0, int_lt: 200}];
}

ここからライブラリを生成しよう.

$ protoc --proto_path=. --go_out=plugins=grpc:./ --govalidators_out=./ protocol/*.proto

これでprotocol配下には,customer_service.pb.gocustomer_service.validator.pb.goが生成される.

先ほどのgRPCサーバに少し手を加える.

package main

import (
    "log"
    "net"
    "os"
    "os/signal"
    "sync"

    "github.com/grpc-ecosystem/go-grpc-middleware"
    "github.com/grpc-ecosystem/go-grpc-middleware/validator"
    pb "github.com/h3poteto/go-grpc-example/protocol"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
)

type customerService struct {
    customers []*pb.Person
    m         sync.Mutex
}

func (cs *customerService) ListPerson(p *pb.RequestType, stream pb.CustomerService_ListPersonServer) error {
    cs.m.Lock()
    defer cs.m.Unlock()
    for _, p := range cs.customers {
        if err := stream.Send(p); err != nil {
            return err
        }
    }
    return nil

}

func (cs *customerService) AddPerson(c context.Context, p *pb.Person) (*pb.ResponseType, error) {
    cs.m.Lock()
    defer cs.m.Unlock()
    cs.customers = append(cs.customers, p)
    return new(pb.ResponseType), nil
}

func main() {
    port := os.Getenv("SERVER_PORT")
    lis, err := net.Listen("tcp", ":"+port)
    if err != nil {
        log.Fatalf("faild to listen: %v", err)
    }
    server := grpc.NewServer(
        grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
            grpc_validator.StreamServerInterceptor(),
        )),
        grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
            grpc_validator.UnaryServerInterceptor(),
        )),
    )
    pb.RegisterCustomerServiceServer(server, new(customerService))

    go func() {
        log.Printf("start grpc server port: %s", port)
        server.Serve(lis)
    }()

    quit := make(chan os.Signal)
    signal.Notify(quit, os.Interrupt)
    <-quit
    log.Println("stopping grpc server...")
    server.GracefulStop()
}

grpc.NewServerするときに,grpc.StreamInterceptorgrpc.UnaryInterceptorを追加している.

これでサーバを起動してみよう.

$ go run server/server.go
2017/12/23 17:46:22 start grpc server port: 50051

そして,rubyクライアントから叩いてみよう.

$ bundle exec ruby client.rb
$ bundle exec ruby client.rb h3poteto 28
$ bundle exec ruby client.rb
name=h3poteto, age=28
$ bundle exec ruby client.rb akira 255
/home/akira/src/github.com/h3poteto/go-grpc-example/client/ruby/vendor/bundle/ruby/2.3.0/gems/grpc-1.8.0-x86_64-linux/src/ruby/lib/grpc/generic/active_call.rb:31:in `check_status': 3:invalid field Age: value '255' must be less than '100' (GRPC::InvalidArgument)
    from /home/akira/src/github.com/h3poteto/go-grpc-example/client/ruby/vendor/bundle/ruby/2.3.0/gems/grpc-1.8.0-x86_64-linux/src/ruby/lib/grpc/generic/active_call.rb:180:in `attach_status_results_and_complete_call'
    from /home/akira/src/github.com/h3poteto/go-grpc-example/client/ruby/vendor/bundle/ruby/2.3.0/gems/grpc-1.8.0-x86_64-linux/src/ruby/lib/grpc/generic/active_call.rb:372:in `request_response'
    from /home/akira/src/github.com/h3poteto/go-grpc-example/client/ruby/vendor/bundle/ruby/2.3.0/gems/grpc-1.8.0-x86_64-linux/src/ruby/lib/grpc/generic/client_stub.rb:178:in `block in request_response'
    from /home/akira/src/github.com/h3poteto/go-grpc-example/client/ruby/vendor/bundle/ruby/2.3.0/gems/grpc-1.8.0-x86_64-linux/src/ruby/lib/grpc/generic/interceptors.rb:170:in `intercept!'
    from /home/akira/src/github.com/h3poteto/go-grpc-example/client/ruby/vendor/bundle/ruby/2.3.0/gems/grpc-1.8.0-x86_64-linux/src/ruby/lib/grpc/generic/client_stub.rb:177:in `request_response'
    from /home/akira/src/github.com/h3poteto/go-grpc-example/client/ruby/vendor/bundle/ruby/2.3.0/gems/grpc-1.8.0-x86_64-linux/src/ruby/lib/grpc/generic/service.rb:170:in `block (3 levels) in rpc_stub_class'
    from client.rb:11:in `main'
    from client.rb:19:in `<main>'

ちゃんとGRPC::InvalidArgumentの例外を返してくれた!

RESTのアクセスも受け付けたい

まったくおかしな話だが,gRPCしつつも,「このメソッドRESTで呼びたい,json返したい」という要求が存在する. 例えば,「マクロサービスをgRPCで連携させつつ,裏側に管理画面を作りたい」みたいなことがたまにある.

そういうちょっと変な要求に答えるものも用意されている.

大きくは,grpc-gatewayとgrpc-webというのがある.

grpc-gateway

https://github.com/grpc-ecosystem/grpc-gateway

公式の図が大変よくできているので拝借. grpc-gateway.png

こちらは,RESTの要求とgRPCを上手いこと繋ぎ込んでくれるgatewayサーバを作ろうという思想である.

そのため裏側に一旦gRPCサーバを作っておく必要がある.

そしてProtocol Bufferにも手を入れる必要がある.

syntax = "proto3";

package protocol;
import "google/api/annotations.proto";

service CustomerService {
  rpc ListPerson(RequestType) returns (stream Person) {
    option (google.api.http) = {
      get: "/v1/customer_service/list_person"
    };
  };
  rpc AddPerson(Person) returns (ResponseType) {
    option (google.api.http) = {
      post: "/v1/customer_service/add_person"
      body: "*"
    };
  };
}

message ResponseType {
}

message RequestType {
}

message Person {
  string name = 1;
  int32 age = 2;
}

また生成コマンドが少し面倒になる.

$ protoc -I. -I$(GOPATH)/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis --proto_path=$(GOPATH)/src --proto_path=$(GOPATH)/src/github.com/gogo/protobuf/protobuf --proto_path=. --go_out=plugins=grpc:./ --grpc-gateway_out=logtostderr=true:./ protocol/*.proto

proto_pathgithub.com/gogo/protobufを使っている.これは,github.com/google/protobufを使っても構わない. これに関しては,grpc-gatewayのREADMEを参照して欲しい.

こうすることで,protocol配下にcustomer_service.pb.gocustomer_service.pb.gw.goが生成されるようになる.

今回,grpcサーバ側のコード変更は必要ない.なぜなら,gatewayサーバを立ててそいつがREST<->gRPCを中継してくれるので,grpc側は通常のgrpcサーバを提供してくれれば十分である.

ただ,gatewayサーバが必要になるのでそれを作ろう.

package main

import (
    "log"
    "net/http"
    "os"

    "github.com/grpc-ecosystem/grpc-gateway/runtime"
    gw "github.com/h3poteto/go-grpc-example/protocol"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
)

func run() error {
    ctx := context.Background()
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    mux := runtime.NewServeMux()
    opts := []grpc.DialOption{grpc.WithInsecure()}
    err := gw.RegisterCustomerServiceHandlerFromEndpoint(ctx, mux, "localhost:"+os.Getenv("SERVER_PORT"), opts)
    if err != nil {
        return err
    }

    log.Printf("gateway server start port: %s", os.Getenv("GATEWAY_PORT"))
    return http.ListenAndServe(":"+os.Getenv("GATEWAY_PORT"), mux)
}

func main() {
    if err := run(); err != nil {
        log.Fatal(err)
    }
}

そしたら,まずgrpcサーバを起動しておく.

$ go run server/server.go
2017/12/23 18:11:43 start grpc server port: 50051

次にgatewayサーバを起動する.

$ go run gateway/gateway.go
2017/12/23 18:12:33 gateway server start port: 9090

そしてcurlする.

$ curl http://localhost:9090/v1/customer_service/list_person
$ curl -X POST http://localhost:9090/v1/customer_service/add_person -H "Content-Type: text/plain" -d '{"name": "h3poteto", "age": 28}'
{}
$ curl http://localhost:9090/v1/customer_service/list_person
{"result":{"name":"h3poteto","age":28}}

見事curlで同じメソッドを叩くことが出来た.

grpc-web

https://github.com/improbable-eng/grpc-web

こちらはブラウザから叩けるようにHTTP/1.1の通信を提供し,jsのライブラリを提供してくれる. 公式でもjs(typescriptでもいい)はもちろんサポートしているのだが,それはnodejsとして使えるだけであり,ブラウザの,フロントエンドで動いているjsから叩けるという意味ではない.

grpc-webはそこを疎通させてくれる.

f:id:h3poteto:20200327215521p:plain

そのため,Protocol Bufferには一切手を入れないが,実装するgRPCサーバはgrpcwebによるラッパーをかませたhttpサーバを立ち上げる必要がある.

grpcサーバを以下のように変更しよう.

package main

import (
    "fmt"
    "log"
    "net/http"
    "os"
    "sync"

    pb "github.com/h3poteto/grpc-web-example/protocol"
    "github.com/improbable-eng/grpc-web/go/grpcweb"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
)

type customerService struct {
    customers []*pb.Person
    m         sync.Mutex
}

func (cs *customerService) ListPerson(p *pb.RequestType, stream pb.CustomerService_ListPersonServer) error {
    cs.m.Lock()
    defer cs.m.Unlock()
    for _, p := range cs.customers {
        if err := stream.Send(p); err != nil {
            return err
        }
    }
    return nil

}

func (cs *customerService) AddPerson(c context.Context, p *pb.Person) (*pb.ResponseType, error) {
    cs.m.Lock()
    defer cs.m.Unlock()
    cs.customers = append(cs.customers, p)
    return new(pb.ResponseType), nil
}

func main() {
    port := os.Getenv("SERVER_PORT")

    grpcServer := grpc.NewServer()
    pb.RegisterCustomerServiceServer(grpcServer, new(customerService))

    wrappedServer := grpcweb.WrapServer(grpcServer)
    handler := func(resp http.ResponseWriter, req *http.Request) {
        wrappedServer.ServeHttp(resp, req)
    }

    httpServer := http.Server{
        Addr:    fmt.Sprintf(":%s", port),
        Handler: http.HandlerFunc(handler),
    }
    log.Printf("starting http server port: %s", port)

    if err := httpServer.ListenAndServe(); err != nil {
        log.Fatalf("failed to start http server:%v", err)
    }
}

そうしたら,サーバを起動する.

$ go run server/server.go
2017/12/23 18:25:30 starting http server port: 9090

次にjsのクライアントライブラリを生成しよう.

必要なパッケージをインストールしておく.

$ npm install --save @types/google-protobuf google-protobuf grpc-web-client ts-protoc-gen

最終的にこんなpackage.jsonを生み出す.

{
  "name": "grpc-web-example",
  "version": "1.0.0",
  "description": "",
  "main": "client/js/index.js",
  "dependencies": {
    "@types/google-protobuf": "^3.2.7",
    "google-protobuf": "^3.5.0",
    "grpc-web-client": "^0.3.1"
  },
  "devDependencies": {
    "babel-core": "^6.26.0",
    "babel-preset-env": "^1.6.1",
    "babelify": "^8.0.0",
    "browserify": "^14.5.0",
    "ts-protoc-gen": "^0.4.0"
  },
  "scripts": {
    "build": "browserify client/js/index.js -t babelify --outfile client/js/bundle.js",
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "repository": {
    "type": "git",
    "url": "git+https://github.com/h3poteto/grpc-web-example.git"
  },
  "author": "h3poteto",
  "license": "MIT",
  "bugs": {
    "url": "https://github.com/h3poteto/grpc-web-example/issues"
  },
  "homepage": "https://github.com/h3poteto/grpc-web-example#readme",
  "babel": {
    "presets": ["env"]
  }
}

あとは,ブラウザ上で動くjsを作ろう.

import {grpc} from "grpc-web-client";

import {CustomerService} from "../../protocol/customer_service_pb_service.js";
import {RequestType, Person} from "../../protocol/customer_service_pb.js";

function ListPersonCall() {
  // テストなのでまずpersonを追加しておく
  const person = new Person();
  person.setName("akira");
  person.setAge(28);
  grpc.invoke(CustomerService.AddPerson, {
    request: person,
    host: "http://localhost:9090",
    onMessage: (message) => {
      console.log("onMessage", message.toObject());
    },
    onEnd: (code, msg, trailers) => {
      console.log("onEnd", code, msg, trailers);
    }
  });

  // personが追加されているかを確認する
  const req = new RequestType();

  grpc.invoke(CustomerService.ListPerson, {
    request: req,
    host: "http://localhost:9090",
    onMessage: (message) => {
      console.log("onMessage", message.toObject());
      alert(message.getName());
    },
    onEnd: (code, msg, trailers) => {
      console.log("onEnd", code, msg, trailers);
    }
  });
}

ListPersonCall();

これを読み込むhtmlを作って,ブラウザを起動してみると......

f:id:h3poteto:20200327215549p:plain

呼べている!

これでフロント側をVue.jsで作ったりするときに,gRPCサーバ側のメソッドを呼び出すことができる.

ちなみに今回gRPCサーバ側もクライアント側も全部insecureを許可しているけど,本番に載せるときには大変よろしくないのでTLSを使いましょう.

まとめ

今回はgoでgRPCするときに使えるものを紹介した. 一部,grpc-gatewayあたりは裏側のgRPCサーバはどんな言語で実装しても問題ないだろう. が,やっぱりgo以外でgRPCサーバを実装しようと思うと,「validator使いたい」とかいろいろ思うことが多い.

やっぱりgrpc-ecosystemのリポジトリは素晴らしい.

サンプルとして実装したリポジトリを置いておく.

validatorとgrpc-gatewayを使っているパターン. https://github.com/h3poteto/go-grpc-example

grpc-webを使ったパターン. https://github.com/h3poteto/grpc-web-example

ただ,こういうライブラリを使えば使うほど,protocの生成コマンドがどんどん複雑になっていく. go用の生成コマンドはまだマシな方で,rubyなんかどんどん長くなりすぎなので,多分覚えるのは無理だと思う. そのため最近はMakefileを書くようにしている.

https://github.com/h3poteto/go-grpc-example/blob/master/Makefile

こんな感じ.

ああdocker swarm,俺はkubernetesに行くよ

今年10月,dockerはkubernetesとの統合を発表した. http://www.publickey1.jp/blog/17/dockerkubernetesdockercon_eu_2017.html

しかもローカルではdocker環境と同時にkubernetes環境が構築されるとか,わけわからんことも発表されている.

kubernetesは更に流れに乗って,先日のAWS re:InventではEKSが発表された. https://aws.amazon.com/jp/eks/

これにより,おそらく将来的にはほとんどのdocker利用者がkubernetesを利用することになるんじゃないだろうか.

続きを読む

Redshift spectrumでnginxのログ解析をする

最近東京でも利用できるようになったRedshift spectrumを使ってみた. やりたいこととしては以下の通り.

  • 適当なnginxのログがS3に溜まっているとする
  • 形式は改行区切りのjson
  • それをRedshift spectrumから触れるようにしたい

Redshift spectrumとAthena

これは最初に知りたかったことなのだが,Redshift spectrumで利用するRedshiftの外部データテーブルはデフォルトではAthena内に作成されているhttp://docs.aws.amazon.com/ja_jp/redshift/latest/dg/c-spectrum-external-schemas.html

つまり,

  • AthenaでS3のデータを参照するテーブルを作ってそれをRedshiftの外部データテーブルに指定すること
  • Redshift内でCREATE EXTERNAL TABLE して外部テーブルを作成・管理すること

の2つは,ほとんど同じことをしているといえる.

テーブルをRedshift内で作成・管理したければRedshift内でCREATE EXTERNAL TABLE すればいいし,Athenaの画面からポチポチしたい場合はAthena側で作れば良い.そこに大きな差はなかった.

最初ここがわからずに,迷子になっていた.

jsonのデータを読み込む

Athenaでjsonのデータを読み込む

というわけで,まずはAthenaでデータを読めるようにしたい.

jsonはAthenaで扱う上ではそこまで効率の良いフォーマットではないが,とりあえず読めるところまで行きたいので気にせずにjsonのまま読ませる(もっと高速なColumn-orientedなフォーマットについては後述). https://qiita.com/moaikids/items/e91b1bcb17458d865beb

f:id:h3poteto:20200327215029p:plain

重要なこととしてパーティションがある.パーティションを切っておかないとAthenaは対象S3ディレクトリの下をフルスキャンすることになり,データ量によってはかなりの時間がかかってしまう.

というわけでパーティションを切るわけだが,Athenaは裏側でHiveが動いており,Hiveのフォーマットに則ったディレクトリ構成にしておくと非常にパーティションを作りやすい. https://dev.classmethod.jp/cloud/aws/athena-partition-reinvent/

s3://akira-playground/nginx/dt=2017-11-21-14

というようなディレクトリ構成にしておくと,parition keyをdtにするだけでパーティションを作ってくれる.

f:id:h3poteto:20200327215042p:plain

なので,できるだけこの形式でログを保存しておこう.

もう一点補足がある. timestampを保存するときにdatetime型を指定すると思うのだが,timestampの形式はHiveのdatetime形式である必要がある. yyyy-mm-dd hh:mm:ss[.fffffffff] というような形式に収めて置くと,datetime型として検索できる.

f:id:h3poteto:20200327215054p:plain

パーティションを設定した場合は,先にLoad partitionしてMSCK REPAIR TABLE を流す必要がある.

Redshift spectrumで触りたい

と思ってRedshiftからAthenaのテーブルを参照してみようと思った.

https://dev.classmethod.jp/cloud/aws/amazon-redshift-spectrum-with-amazon-athena/

ERROR: External Catalog Error: Unsupported file format. org.apache.hadoop.mapred.TextInputFormat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat org.openx.data.jsonserde.JsonSerDe

おかしい,エラーが出る.

たしかにRedshift spectrumはjsonに対応していない.

https://dev.classmethod.jp/cloud/aws/amazon-redshift-spectrum-is-released/ https://www.gixo.jp/blog/10094/

結局jsonについての明確な言及記事は今のところ見つからなかった. ただ,エラーから察するに,やっぱりまだspectrumではjsonを読むことは出来ないらしい,たとえAthenaのテーブルを参照していたとしても

諦めてparquetにする

ちなみに何度調べてもすぐに読み方を忘れるのだが,「パーケイ」と読むらしい. parquetの詳細についてはこちらを参照してもらうとして. http://labotech.dmm.com/entry/2015/09/08/1642

変換する

json -> parquetを行う. embulk等を使っても良いのだが,Redshift spectrumを使うようなケースを考えると,日常的に大量のログ変換が必要になる気がしていて,Glueを使ってみた. https://qiita.com/hideji2/items/85747e3d66026045614d

ちなみに,まだ東京には来ていないので,us-eastのGlueを使うしかなかった.

大体上記の記事と同じ作り方で変換できた.

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SQLContext
from datetime import datetime, timedelta, tzinfo

sc = SparkContext()
glueContext = GlueContext(sc)
sqlContext = SQLContext(sc)

# datetime.nowで取ってきた日時をJSTにしたいがために用意しておく
class JST(tzinfo):
    def utcoffset(self, dt):
        return timedelta(hours=9)

    def dst(self, dt):
        return timedelta(0)

    def tzname(self, dt):
        return 'JST'
        
now = datetime.now(tz=JST())
target_date = now - timedelta(hours=1)
date_str = target_date.strftime("%Y-%m-%d-%H")

## @params: [IN_PATH, IN_PATH]
args = getResolvedOptions(sys.argv, ['IN_PATH', 'OUT_PATH'])


# 引数でもらったディレクトリ内は日時でパーティションされたディレクトリが並んでいる
in_path  = args['IN_PATH'] + "dt=" + date_str + "/*.json"
# 出力先も日時パーティションで出力する
out_path = args['OUT_PATH'] + "dt=" + date_str + "/"
print(in_path)
print(out_path)

# http://qiita.com/ajis_ka/items/e2e5b759e77933b08687
sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")
# http://tech-blog.tsukaby.com/archives/1162
sc._jsc.hadoopConfiguration().set("spark.speculation", "false")

sqlContext = SQLContext(sc)

jsonDataFrame = sqlContext.read.json(in_path)

jsonDataFrame.write.mode("overwrite").format("parquet").option("compression", "snappy").mode("overwrite").save(out_path)

こいつをETLのTriggersで毎時実行にしてやると,毎時のパーティションでparquetのログをS3に保存できる.

Athenaでparquetのデータを読み込む

Glueで変換したparquetのデータは,jsonの時と同じくHiveのパーティションに沿ったディレクトリ構成にしてあった. そのため,だいたいjsonのときと同じ手順でいけた.形式がparquetになるだけ.

f:id:h3poteto:20200327215111p:plain

Redshift spectrumでparquetのデータを読む

Schemaを作る.

sample=# create external schema spectrum
sample-# from data catalog
sample-# database 'akira_playground'
sample-# iam_role 'arn:aws:iam::123456789012:role/mySpectrumRole';
CREATE SCHEMA

テーブルは先程Athenaにnginx_parquetとして用意してあるので,ここで作成する必要はない.

sample=# select count(status) from spectrum.nginx_parquet where dt='2017-11-21-11';
 count
-------
   360
(1 行)

無事こんな感じでspectrumすることができた.

まとめ

Redshift spectrumとAthena

https://qiita.com/moaikids/items/e91b1bcb17458d865beb#%E5%80%8B%E4%BA%BA%E7%9A%84%E3%81%AA%E3%81%BE%E3%81%A8%E3%82%81 こちらでも言われているとおり,spectrumを使うこと前提に立つと,Athenaはそれ単体による解析ツールというより,S3のデータをRedshift内のデータと結びつけ,spectrumしやするする補助ツールっぽい位置づけになる.

もちろん,計算リソースとしてどちらを使うか,どちらのほうが速いかは,Redshiftに普段からどのくらい課金しているかによるとは思うが.

Glueべんり

2017年11月21日現在,jsonはそのままではRedshift spectrumから触ることはできなかった. jsonを触るなら今まで通りのAthenaを使うしかない.

かといってnginx等のアプリケーションから吐き出すログは今まで通りjsonにするしかないし,たとえparquetに変換するにしても1時間とか1日分とか,まとまった単位で変換しておかないとパフォーマンスは出ないだろう.

だからこそ,Glueはとても便利だった.

一度jsonをparquetに変換してしまえばRedshift spectrumするのもかなり速くなって良いので,長期間溜めたログはparquetに変換すると良いかもしれない.

Glue早く東京に来て欲しい.