scalaでgRPCする

この記事はCrowdWorks Advent Calendar 2017 の3日目です.

CrowdWorksもマイクロサービスっぽい世界を目指してサービスを分けようという中で,うっかりgRPCが使いたくなったので,どんな感じで使っているのかを紹介しようと思う.

scalaでもgRPCはできる

scalaでgRPCを使うには,こちらの記事がまとまっていて大変参考になる. http://xuwei-k.github.io/scala-protobuf-docs/grpc.html

正直,普通にscalaでgRPCするだけなら,この記事で十分詳しく書いてあるかと思う.

今回はこんなprotocolファイルを用意した.

syntax = "proto3";

package proto;

service CustomerService {
  rpc ListPerson(RequestType) returns (stream Person) {};
  rpc AddPerson(Person) returns (ResponseType) {};
}

message ResponseType {
}

message RequestType {
}

message Person {
  string name = 1;
  int32 age = 2;
}

とりあえずprotocol bufferをコンパイルできる状態にしよう.

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.12")

libraryDependencies += "com.trueaccord.scalapb" %% "compilerplugin" % "0.6.6"
import com.trueaccord.scalapb.compiler.Version.{grpcJavaVersion, scalapbVersion, protobufVersion}

/* 中略 */

libraryDependencies ++= Seq(
/* 中略 */
  "io.grpc" % "grpc-netty" % grpcJavaVersion,
  "com.trueaccord.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
  "com.trueaccord.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,
  "io.grpc" % "grpc-all" % grpcJavaVersion
)

PB.targets in Compile := Seq(scalapb.gen() -> ((sourceManaged in Compile).value / "protobuf-scala"))
PB.protoSources in Compile += (baseDirectory in LocalRootProject).value / "protocol" // .protoファイルが置いてあるディレクトリ

これで, $ sbt compile すると,target/scala-2.12/src_managed/main/protobuf-scala あたりに .proto ファイルから自動生成されたコード群が配置される.

そうしたら,これらを使ったgRPCサーバを実装する.

package mygrpc

import io.grpc.{Server, ServerBuilder}

// ProtocolBufferから自動生成されたライブラリたち
import users.users.{RequestType, UsersGrpc}

import scala.concurrent.ExecutionContext


object GrpcServer {
  private val logger = Logger.getLogger(classOf[GrpcServer].getName)

  def main(args: Array[String]): Unit = {
    val server = new GrpcServer(ExecutionContext.global)
    server.start()
    server.blockUnitShutdown()
  }

  private val port = sys.env.getOrElse("SERVER_PORT", "50051").asInstanceOf[String].toInt
}

class GrpcServer(executionContext: ExecutionContext) { self =>
  private val port = sys.env.getOrElse("SERVER_PORT", "50051").asInstanceOf[String].toInt
  private[this] var server: Server = null

  def start(): Unit = {
    server = ServerBuilder.forPort(port).addService(
      UsersGrpc.bindService(new UsersImpl, executionContext)
    ).build.start
    Logger.info("gRPC server started, listening on " + port)
    sys.addShutdownHook {
      Logger.info("*** shutting down gRPC server since JVM is shutting down")
      self.stop()
    }
  }

  def stop(): Unit = {
    if (server != null) {
      Logger.info("*** gRPC server shutdown")
      server.shutdown()
    }
  }

  def blockUnitShutdown(): Unit = {
    if (server != null) {
      server.awaitTermination()
    }
  }

  private class UsersImpl extends UsersGrpc.Users {
  /* 中略 */
  }
}

で, $ sbt run するとこのgRPCサーバが起動する.

PlayFrameworkしながらgRPCしたい

gRPCをするとして,そもそもWebアプリケーションフレームワークのようなものは提供されていない.

また,「AというサービスとはgRPCしたいけど,BというサービスからはRESTでアクセスしたい」というような要求がある場合がある(やめてほしいけど).

というわけでPlayFrameworkと共存させてみようと思う.

ちなみに,今回PlayFrameworks 2.6で試している.2.5だとgRPCとの共存ができないという話もあるので注意. https://stackoverflow.com/questions/35827782/conflict-between-play-framework-2-5-and-grpc-0-13

playのプロセスで動かす

playを使う場合,PlayScalaというpluginを使っていると思う.こいつのお陰で,PlayFrameworkではapp配下に置いたソース群をコンパイルしてくれている. 逆に,先程のように,src/main にソースを置いておいてもコンパイルはしてくれない.

というわけで,server.scala の場所を移動しつつちょっと書き換える.

package mygrpc

import io.grpc.{Server, ServerBuilder}

// ProtocolBufferから自動生成されたライブラリたち
import users.users.{RequestType, UsersGrpc}

import scala.concurrent.ExecutionContext

trait Runner {
  def start(): Unit
}

class RunnerImpl @Inject() (actorSystem: ActorSystem, lifecycle: ApplicationLifecycle)(implicit exec: ExecutionContext) extends Runner {
  val server = new GrpcServer(exec)

  def start(): Unit = {
    server.start()
    server.blockUnitShutdown()
  }
  // playが終了するときに呼ばれる
  lifecycle.addStopHook { () =>
    Future.successful(server.stop())
  }
  actorSystem.scheduler.scheduleOnce(1.seconds) {
    start()
  }
}

class GrpcServer(executionContext: ExecutionContext) { self =>
  private val port = sys.env.getOrElse("SERVER_PORT", "50051").asInstanceOf[String].toInt
  private[this] var server: Server = null

  def start(): Unit = {
    server = ServerBuilder.forPort(port).addService(
      UsersGrpc.bindService(new UsersImpl, executionContext)
    ).build.start
    Logger.info("gRPC server started, listening on " + port)
    sys.addShutdownHook {
      Logger.info("*** shutting down gRPC server since JVM is shutting down")
      self.stop()
    }
  }

  def stop(): Unit = {
    if (server != null) {
      Logger.info("*** gRPC server shutdown")
      server.shutdown()
    }
  }

  def blockUnitShutdown(): Unit = {
    if (server != null) {
      server.awaitTermination()
    }
  }

  private class UsersImpl extends UsersGrpc.Users {
  /* 中略 */
  }
}

わざわざ起動をtraitに包んでいる. そうすることで,

class Module extends AbstractModule {
  override def configure(): Unit = {
    bind(classOf[Runner]).to(classOf[RunnerImpl]).asEagerSingleton
  }
}

こうやってplay起動時にInjectしてgRPCサーバを起動できるようにしている.

嬉しいこと

  // playが終了するときに呼ばれる
  lifecycle.addStopHook { () =>
    Future.successful(server.stop())
  }

こうすることで,Ctrl +D したときでもgRPCのプロセスが止まるようになっている. Ctrl+C したときは

    sys.addShutdownHook {
      Logger.info("*** shutting down gRPC server since JVM is shutting down")
      self.stop()
    }

こっちが呼ばれる.

悲しいこと

ただし,普通のPlayFrameworkのようにコードを変更したときに再コンパイルが走ることがない.コードを変更するたびに,Ctrl+D してプロセスを止める必要がある.

また,初回起動時には http://localhost:9000 にアクセスしてplayを起動しないとコンパイルが走らず,gRPCも起動しない.

playとは別プロセスで動かす

わりかし不便だなーと思ったので,最近はplayとは別プロセスとしている.

package mygrpc

trait Runner extends Runnable {
  run()

  final def run() = {
    val app = application()
    try {
      task(app)
    } finally {
      Play.stop(app)
    }
  }

  def task(app: Application)

  def environment(): Mode = {
    System.getProperty("play.mode") match {
      case "Prod" => Mode.Prod
      case _ => Mode.Dev
    }
  }

  def application(): Application = {
    val env = Environment(new java.io.File("."), this.getClass.getClassLoader, environment)
    val context = ApplicationLoader.createContext(env)
    val loader = ApplicationLoader(context)
    loader.load(context)
  }
}

object RunnerImpl extends App with Runner {
  def task(app: Application) {
    val exec = ExecutionContext.global
    val server = new GrpcServer(exec)
    server.start()
    server.blockUnitShutdown()
  }
}

class GrpcServer(executionContext: ExecutionContext) { self =>
  // 中身は同じ
}

Module.scala を廃止して,こんなのを作っておく. すると $ sbt runMain mygrpc.RunnerImpl とかで起動できる. これは,中でPlayのアプリケーションをロードしてはいるけど,httpを受け付けることはない.

なので,もし普通にplayを起動してリクエストを受け付けたい場合には,別のプロセスとして $ sbt run するといいかもしれない.

嬉しいこと

そもそも別のサーバなんだし別プロセスで動くって理に適っているよね.

悲しいこと

結局コードを変更して再コンパイルしたい場合は,一度gRPCのプロセスを止めるしかない.そこはあまり変わってない.

アクセスログを吐きたい

さて,ここからはPlayに関係なく,scalaでgRPCするとき全体のお話.

Webアプリケーションフレームワークだと,よくデフォルトの状態でアクセスログを吐いてくれる. もちろん自分が「ここ!」と思ったタイミングでログを吐くこともできる.

gRPCを使う場合も,RESTではないにしろちゃんとメソッドの呼出が行われるわけだし,アクセスログを吐きたいと思う. それも,出来る限り自動でログを吐くようにしてもらいたい.

というわけでここからはInterceptorを使う.

Interceptor

interceptorとはその名の通り. リクエストの前後に任意の処理を挟み込むためのものだ.イメージとしては,railsで言うところのrack middlwareや,playで言うところのfilter,goのechoでいうところのmiddlewareに近い何かだ.

class Logging ServerInterceptor {
//...
}

こんなInterceptorを作ったら,

class GrpcServer(executionContext: ExecutionContext) { self =>
  private[this] var server: Server = null

  def start(): Unit = {
    server = ServerBuilder.forPort(55051).addService(
      ServerInterceptors.intercept(
        UsersGrpc.bindService(new UsersImpl, executionContext),
        new Logging
      )
    ).build.start
  }
  //...
}

こうやって差し込むことができる.

ログを吐く

package mygrpc.interceptors

class Logging() extends ServerInterceptor {

  override def interceptCall[ReqT, RespT](
    serverCall: ServerCall[ReqT, RespT],
    headers: Metadata,
    next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {

    val wrapperCall: ServerCall[ReqT, RespT] = new ForwardingServerCall.SimpleForwardingServerCall[ReqT, RespT](serverCall) {
      override def request(numMessages: Int) {
        Logger.info("Request: " + headers.toString() + ", " + serverCall.getAttributes().toString() + ", " + serverCall.getMethodDescriptor().getFullMethodName())
        super.request(numMessages)
      }

      override def sendMessage(message: RespT) {
        Logger.info("Response: " + message.toString())
        super.sendMessage(message)
      }

      override def close(status: Status, trailers: Metadata) {
        Logger.info("Response: " + status.toString())
        super.close(status, trailers)
      }
    }
    return next.startCall(wrapperCall, headers)
  }
}

こんなInterceptorを挟み込んでやると,各イベントごとにログを吐いてくれる.

リクエストIDを吐きたい!

マイクロサービスの運用をしようと思うと,サービスのログも分散しているのが当然の世界になる.そうすると,1つのアクセスが,結果的にどんなメソッドを呼んでいたのかを知る術がほしくなる.

そういうことを考え始めると,最初にリクエストIDを発行して,複数サービスでそれを持ち回してログに出したいと思うようになる.

というわけで,先程のログに「クライアント側から渡されたリクエストID」を吐き出す機能をつけよう.

MDCを使う

リクエストを受け付けた時にリクエストIDをheadersあたりから抜き取るとして,それをどこかに保持しておかないと各所のログで使うことができない.

というわけで,MDCを使おう. https://qiita.com/namutaka/items/c35c437b7246c5e4d729

まず,リクエストIDを埋めるMDCContextを作る.

package mygrpc.util

import io.grpc.{Metadata}
import java.util.UUID
import collection.JavaConverters._
import collection.mutable._

object MDCContext {
  object Request {
    val key: String = "request_id"
    
    // grpcクライアント側でrequest_idとして一意なIDをheadersに埋めている前提
    def id(headers: Metadata): String =  {
      val k: Metadata.Key[String] = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)
      headers.get(k)
    }
  }

  def buildMdcContext(headers: Metadata): java.util.Map[String, String] = {
    HashMap(Request.key -> Request.id(headers)).asJava
  }
}

これに合わせて,logback.xmlにrequest_idを吐き出すように記述してやる必要がある.

  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%coloredLevel %logger{15} - [%X{request_id}] %message%n%xException{10}</pattern>
    </encoder>
  </appender>

Interceptorを作る

先程のLoggingを以下のように書き換える.

package mygrpc.interceptors

class Logging() extends ServerInterceptor {

  override def interceptCall[ReqT, RespT](
    serverCall: ServerCall[ReqT, RespT],
    headers: Metadata,
    next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {

    // リクエストごとに固有のrequestIdを作る
    val mdcContext: java.util.Map[String, String] = MDCContext.buildMdcContext(headers)
    MDC.setContextMap(mdcContext)

    val wrapperCall: ServerCall[ReqT, RespT] = new ForwardingServerCall.SimpleForwardingServerCall[ReqT, RespT](serverCall) {
      override def request(numMessages: Int) {
        MDC.setContextMap(mdcContext)
        Logger.info("Request: " + headers.toString() + ", " + serverCall.getAttributes().toString() + ", " + serverCall.getMethodDescriptor().getFullMethodName())
        super.request(numMessages)
      }

      override def sendMessage(message: RespT) {
        MDC.setContextMap(mdcContext)
        Logger.info("Response: " + message.toString())
        super.sendMessage(message)
      }

      override def close(status: Status, trailers: Metadata) {
        MDC.setContextMap(mdcContext)
        Logger.info("Response: " + status.toString())
        super.close(status, trailers)
        MDC.clear()
      }
    }
    val listener = next.startCall(wrapperCall, headers)
    return new ForwardingServerCallListener.SimpleForwardingServerCallListener[ReqT](listener) {

      override def onMessage(message: ReqT) {
        MDC.setContextMap(mdcContext)
        super.onMessage(message)
      }

      override def onHalfClose() {
        MDC.setContextMap(mdcContext)
        super.onHalfClose()
      }

      override def onCancel() {
        MDC.setContextMap(mdcContext)
        super.onCancel()
        MDC.clear()
      }

      override def onComplete() {
        MDC.setContextMap(mdcContext)
        super.onComplete()
        MDC.clear()
      }

      override def onReady() {
        MDC.setContextMap(mdcContext)
        super.onReady()
      }
    }
  }
}

こうすることで,各所でリクエストIDがログに出力されるようになる.

また,gRPCで呼び出されたメソッド内で,自分でログを出力した場合でもちゃんとリクエストID付きのログが出る. もちろん,例外になったときにもリクエストIDがついているので,ログを追いかけるのが非常に楽になる.

例外ハンドリングをしたい

例外を捕まえる

ログと同じく,例外も共通してどこかでハンドリングをしたくなる. また,今使っているgrpc-javaでは,grpcサーバ内で例外がthrowされても,どこにも拾ってくれる場所がなく,grpcクライアント側は待たされ続けるという悲惨な状況になっている.

というわけで同じくInterceptorを作る.

class ErrorHandler extends ServerInterceptor {
  override def interceptCall[ReqT, RespT](
    serverCall: ServerCall[ReqT, RespT],
    headers: Metadata,
    next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
    val listener = next.startCall(serverCall, headers)
    return new ForwardingServerCallListener.SimpleForwardingServerCallListener[ReqT](listener) {
      override def onMessage(message: ReqT) {
        try {
          super.onMessage(message)
        } catch {
          case e: Exception =>
            closeWithException(e, headers)
        }
      }

      override def onHalfClose() {
        try {
          super.onHalfClose()
        } catch {
          case e: Exception =>
            closeWithException(e, headers)
        }
      }
      override def onCancel() {
        try {
          super.onCancel()
        } catch {
          case e: Exception =>
            closeWithException(e, headers)
        }
      }

      override def onComplete() {
        try {
          super.onComplete()
        } catch {
          case e: Exception =>
            closeWithException(e, headers)
        }
      }

      override def onReady() {
        try {
          super.onReady()
        } catch {
          case e: Exception =>
            closeWithException(e, headers)
        }
      }

      private def closeWithException(t: Exception, requestHeader: Metadata) {
        // 例外を拾ったのでログを出すなりする
      }
    }
  }
}

こうしておくと,grpcサーバ内で例外が起こった際には,closeWithException で拾うことができた.

実はgRPCにはステータスが用意されている. https://github.com/grpc/grpc-java/blob/166108a9438c22d06eb3b371b5ad34a75e14787c/core/src/main/java/io/grpc/Status.java なので,起こった例外種別によってステータスを変更したい場合には,このメソッド内で例外とステータスのマッピングをしてやればよい.

例外とステータスのマッピング

このステータスとのマッピングだが, https://github.com/grpc/grpc-java/blob/2b1eee90e5bd7f5ad905e34f73f2040d6c9a3568/core/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java

既にこういったものが用意されている.つまり,StatusRuntimeException をはけば,それを StatusマッピングしてgRPCのレスポンスを返してくれる.

これを自分で実装するのは無駄な気がするので,あるものは使おう.

というわけで,先程例外を捕まえたメソッドでは以下のようにして,再び例外を発生させる.ただしこのとき,StatusRuntimeException を発生させる.

      private def closeWithException(t: Exception, requestHeader: Metadata) {
        var status: Status = null

        t match {
          case e: ValidationError => status = Status.Code.INVALID_ARGUMENT.toStatus().withDescription(t.getMessage()).withCause(t)
          case _ => status = Status.Code.INTERNAL.toStatus().withDescription(t.getMessage()).withCause(t)
        }
        Logger.error(status.toString())
        throw new StatusRuntimeException(status)
      }

そしたら,こいつを拾うために TransmitStatusRuntimeExceptionInterceptor を呼ぶように,server.scala を書き換える.

// 中略

class GrpcServer(executionContext: ExecutionContext) { self =>
  private val port = sys.env.getOrElse("SERVER_PORT", "50051").asInstanceOf[String].toInt
  private[this] var server: Server = null
  private var users: Array[User] = Array.empty

  def start(): Unit = {
    server = ServerBuilder.forPort(port).addService(
      ServerInterceptors.intercept(
        UsersGrpc.bindService(new UsersImpl, executionContext),
        new Logging,
        new ErrorHandler,
        TransmitStatusRuntimeExceptionInterceptor.instance()
      )
    ).build.start
    Logger.info("gRPC server started, listening on " + port)

    // JVM自体がshutdownされた際に呼ばれる
    sys.addShutdownHook {
      Logger.info("*** shutting down gRPC server since JVM is shutting down")
      self.stop()
    }
  }

  def stop(): Unit = {
    if (server != null) {
      Logger.info("*** gRPC server shutdown")
      server.shutdown()
    }
  }

  def blockUnitShutdown(): Unit = {
    if (server != null) {
      server.awaitTermination()
    }
  }

  private class UsersImpl extends UsersGrpc.Users {
    override def create(request: User): scala.concurrent.Future[User] = {
      throw new ValidationError("hoge")
      Future.successful(request)
    }
  }
}

こうしておくと,ちゃんとcreate メソッド内の例外で発生したValidationErrorGRPC::INVALID_ARGUMENT としてgrpcクライアント側で受け取ることができる.

まとめ

だいたいこの記事で触れたような実装は全てこのリポジトリに詰め込んだ. https://github.com/h3poteto/play-grpc-example

上のリポジトリは,今回開発する上で調べたり試したりしたことを詰め込んだだけなのだが,社内ではこれを参考にして,本当にscala+gRPCで新しいマイクロサービスを作っているので,そのうちリリースされると思う.

javaとかkotlin羨ましい

https://github.com/LogNet/grpc-spring-boot-starter いや,有能すぎでしょ.これがあればそりゃー楽に開発できるだろうなーって感じがする.

なんでscalaにこういうのが生まれないのだろうか.俺もkotlinとかやったほうが良いのかもしれない.

やっぱgoだよgo

ここまでscalaの話してきてアレだけど,たぶんgRPCサーバ作るならgoで作るのが一番楽だし楽しいと思う. そもそもgRPC周りで用意されているものが豊富なので嬉しい.「これほしいなー」って思ったらだいたいgoで実装されている. っていう話を Go2 Advent Calendar 2017 でしようと思うのでよろしくお願いします.