この記事はCrowdWorks Advent Calendar 2017 の3日目です.
CrowdWorksもマイクロサービスっぽい世界を目指してサービスを分けようという中で,うっかりgRPCが使いたくなったので,どんな感じで使っているのかを紹介しようと思う.
scalaでgRPCを使うには,こちらの記事がまとまっていて大変参考になる.
http://xuwei-k.github.io/scala-protobuf-docs/grpc.html
正直,普通にscalaでgRPCするだけなら,この記事で十分詳しく書いてあるかと思う.
今回はこんなprotocolファイルを用意した.
syntax = "proto3";
package proto;
service CustomerService {
rpc ListPerson(RequestType) returns (stream Person) {};
rpc AddPerson(Person) returns (ResponseType) {};
}
message ResponseType {
}
message RequestType {
}
message Person {
string name = 1;
int32 age = 2;
}
とりあえずprotocol bufferをコンパイルできる状態にしよう.
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.12")
libraryDependencies += "com.trueaccord.scalapb" %% "compilerplugin" % "0.6.6"
import com.trueaccord.scalapb.compiler.Version.{grpcJavaVersion, scalapbVersion, protobufVersion}
/* 中略 */
libraryDependencies ++= Seq(
/* 中略 */
"io.grpc" % "grpc-netty" % grpcJavaVersion,
"com.trueaccord.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
"com.trueaccord.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,
"io.grpc" % "grpc-all" % grpcJavaVersion
)
PB.targets in Compile := Seq(scalapb.gen() -> ((sourceManaged in Compile).value / "protobuf-scala"))
PB.protoSources in Compile += (baseDirectory in LocalRootProject).value / "protocol" // .protoファイルが置いてあるディレクトリ
これで, $ sbt compile
すると,target/scala-2.12/src_managed/main/protobuf-scala
あたりに .proto
ファイルから自動生成されたコード群が配置される.
そうしたら,これらを使ったgRPCサーバを実装する.
package mygrpc
import io.grpc.{Server, ServerBuilder}
// ProtocolBufferから自動生成されたライブラリたち
import users.users.{RequestType, UsersGrpc}
import scala.concurrent.ExecutionContext
object GrpcServer {
private val logger = Logger.getLogger(classOf[GrpcServer].getName)
def main(args: Array[String]): Unit = {
val server = new GrpcServer(ExecutionContext.global)
server.start()
server.blockUnitShutdown()
}
private val port = sys.env.getOrElse("SERVER_PORT", "50051").asInstanceOf[String].toInt
}
class GrpcServer(executionContext: ExecutionContext) { self =>
private val port = sys.env.getOrElse("SERVER_PORT", "50051").asInstanceOf[String].toInt
private[this] var server: Server = null
def start(): Unit = {
server = ServerBuilder.forPort(port).addService(
UsersGrpc.bindService(new UsersImpl, executionContext)
).build.start
Logger.info("gRPC server started, listening on " + port)
sys.addShutdownHook {
Logger.info("*** shutting down gRPC server since JVM is shutting down")
self.stop()
}
}
def stop(): Unit = {
if (server != null) {
Logger.info("*** gRPC server shutdown")
server.shutdown()
}
}
def blockUnitShutdown(): Unit = {
if (server != null) {
server.awaitTermination()
}
}
private class UsersImpl extends UsersGrpc.Users {
/* 中略 */
}
}
で, $ sbt run
するとこのgRPCサーバが起動する.
PlayFrameworkしながらgRPCしたい
gRPCをするとして,そもそもWebアプリケーションフレームワークのようなものは提供されていない.
また,「AというサービスとはgRPCしたいけど,BというサービスからはRESTでアクセスしたい」というような要求がある場合がある(やめてほしいけど).
というわけでPlayFrameworkと共存させてみようと思う.
ちなみに,今回PlayFrameworks 2.6で試している.2.5だとgRPCとの共存ができないという話もあるので注意.
https://stackoverflow.com/questions/35827782/conflict-between-play-framework-2-5-and-grpc-0-13
playのプロセスで動かす
playを使う場合,PlayScalaというpluginを使っていると思う.こいつのお陰で,PlayFrameworkではapp配下に置いたソース群をコンパイルしてくれている.
逆に,先程のように,src/main
にソースを置いておいてもコンパイルはしてくれない.
というわけで,server.scala
の場所を移動しつつちょっと書き換える.
package mygrpc
import io.grpc.{Server, ServerBuilder}
// ProtocolBufferから自動生成されたライブラリたち
import users.users.{RequestType, UsersGrpc}
import scala.concurrent.ExecutionContext
trait Runner {
def start(): Unit
}
class RunnerImpl @Inject() (actorSystem: ActorSystem, lifecycle: ApplicationLifecycle)(implicit exec: ExecutionContext) extends Runner {
val server = new GrpcServer(exec)
def start(): Unit = {
server.start()
server.blockUnitShutdown()
}
// playが終了するときに呼ばれる
lifecycle.addStopHook { () =>
Future.successful(server.stop())
}
actorSystem.scheduler.scheduleOnce(1.seconds) {
start()
}
}
class GrpcServer(executionContext: ExecutionContext) { self =>
private val port = sys.env.getOrElse("SERVER_PORT", "50051").asInstanceOf[String].toInt
private[this] var server: Server = null
def start(): Unit = {
server = ServerBuilder.forPort(port).addService(
UsersGrpc.bindService(new UsersImpl, executionContext)
).build.start
Logger.info("gRPC server started, listening on " + port)
sys.addShutdownHook {
Logger.info("*** shutting down gRPC server since JVM is shutting down")
self.stop()
}
}
def stop(): Unit = {
if (server != null) {
Logger.info("*** gRPC server shutdown")
server.shutdown()
}
}
def blockUnitShutdown(): Unit = {
if (server != null) {
server.awaitTermination()
}
}
private class UsersImpl extends UsersGrpc.Users {
/* 中略 */
}
}
わざわざ起動をtraitに包んでいる.
そうすることで,
class Module extends AbstractModule {
override def configure(): Unit = {
bind(classOf[Runner]).to(classOf[RunnerImpl]).asEagerSingleton
}
}
こうやってplay起動時にInjectしてgRPCサーバを起動できるようにしている.
嬉しいこと
lifecycle.addStopHook { () =>
Future.successful(server.stop())
}
こうすることで,Ctrl +D
したときでもgRPCのプロセスが止まるようになっている.
Ctrl+C
したときは
sys.addShutdownHook {
Logger.info("*** shutting down gRPC server since JVM is shutting down")
self.stop()
}
こっちが呼ばれる.
悲しいこと
ただし,普通のPlayFrameworkのようにコードを変更したときに再コンパイルが走ることがない.コードを変更するたびに,Ctrl+D
してプロセスを止める必要がある.
また,初回起動時には http://localhost:9000
にアクセスしてplayを起動しないとコンパイルが走らず,gRPCも起動しない.
playとは別プロセスで動かす
わりかし不便だなーと思ったので,最近はplayとは別プロセスとしている.
package mygrpc
trait Runner extends Runnable {
run()
final def run() = {
val app = application()
try {
task(app)
} finally {
Play.stop(app)
}
}
def task(app: Application)
def environment(): Mode = {
System.getProperty("play.mode") match {
case "Prod" => Mode.Prod
case _ => Mode.Dev
}
}
def application(): Application = {
val env = Environment(new java.io.File("."), this.getClass.getClassLoader, environment)
val context = ApplicationLoader.createContext(env)
val loader = ApplicationLoader(context)
loader.load(context)
}
}
object RunnerImpl extends App with Runner {
def task(app: Application) {
val exec = ExecutionContext.global
val server = new GrpcServer(exec)
server.start()
server.blockUnitShutdown()
}
}
class GrpcServer(executionContext: ExecutionContext) { self =>
// 中身は同じ
}
Module.scala
を廃止して,こんなのを作っておく.
すると $ sbt runMain mygrpc.RunnerImpl
とかで起動できる.
これは,中でPlayのアプリケーションをロードしてはいるけど,httpを受け付けることはない.
なので,もし普通にplayを起動してリクエストを受け付けたい場合には,別のプロセスとして $ sbt run
するといいかもしれない.
嬉しいこと
そもそも別のサーバなんだし別プロセスで動くって理に適っているよね.
悲しいこと
結局コードを変更して再コンパイルしたい場合は,一度gRPCのプロセスを止めるしかない.そこはあまり変わってない.
さて,ここからはPlayに関係なく,scalaでgRPCするとき全体のお話.
Webアプリケーションフレームワークだと,よくデフォルトの状態でアクセスログを吐いてくれる.
もちろん自分が「ここ!」と思ったタイミングでログを吐くこともできる.
gRPCを使う場合も,RESTではないにしろちゃんとメソッドの呼出が行われるわけだし,アクセスログを吐きたいと思う.
それも,出来る限り自動でログを吐くようにしてもらいたい.
というわけでここからはInterceptorを使う.
Interceptor
interceptorとはその名の通り. リクエストの前後に任意の処理を挟み込むためのものだ.イメージとしては,railsで言うところのrack middlwareや,playで言うところのfilter,goのechoでいうところのmiddlewareに近い何かだ.
class Logging ServerInterceptor {
}
こんなInterceptorを作ったら,
class GrpcServer(executionContext: ExecutionContext) { self =>
private[this] var server: Server = null
def start(): Unit = {
server = ServerBuilder.forPort(55051).addService(
ServerInterceptors.intercept(
UsersGrpc.bindService(new UsersImpl, executionContext),
new Logging
)
).build.start
}
}
こうやって差し込むことができる.
ログを吐く
package mygrpc.interceptors
class Logging() extends ServerInterceptor {
override def interceptCall[ReqT, RespT](
serverCall: ServerCall[ReqT, RespT],
headers: Metadata,
next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
val wrapperCall: ServerCall[ReqT, RespT] = new ForwardingServerCall.SimpleForwardingServerCall[ReqT, RespT](serverCall) {
override def request(numMessages: Int) {
Logger.info("Request: " + headers.toString() + ", " + serverCall.getAttributes().toString() + ", " + serverCall.getMethodDescriptor().getFullMethodName())
super.request(numMessages)
}
override def sendMessage(message: RespT) {
Logger.info("Response: " + message.toString())
super.sendMessage(message)
}
override def close(status: Status, trailers: Metadata) {
Logger.info("Response: " + status.toString())
super.close(status, trailers)
}
}
return next.startCall(wrapperCall, headers)
}
}
こんなInterceptorを挟み込んでやると,各イベントごとにログを吐いてくれる.
リクエストIDを吐きたい!
マイクロサービスの運用をしようと思うと,サービスのログも分散しているのが当然の世界になる.そうすると,1つのアクセスが,結果的にどんなメソッドを呼んでいたのかを知る術がほしくなる.
そういうことを考え始めると,最初にリクエストIDを発行して,複数サービスでそれを持ち回してログに出したいと思うようになる.
というわけで,先程のログに「クライアント側から渡されたリクエストID」を吐き出す機能をつけよう.
MDCを使う
リクエストを受け付けた時にリクエストIDをheadersあたりから抜き取るとして,それをどこかに保持しておかないと各所のログで使うことができない.
というわけで,MDCを使おう.
https://qiita.com/namutaka/items/c35c437b7246c5e4d729
まず,リクエストIDを埋めるMDCContextを作る.
package mygrpc.util
import io.grpc.{Metadata}
import java.util.UUID
import collection.JavaConverters._
import collection.mutable._
object MDCContext {
object Request {
val key: String = "request_id"
// grpcクライアント側でrequest_idとして一意なIDをheadersに埋めている前提
def id(headers: Metadata): String = {
val k: Metadata.Key[String] = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)
headers.get(k)
}
}
def buildMdcContext(headers: Metadata): java.util.Map[String, String] = {
HashMap(Request.key -> Request.id(headers)).asJava
}
}
これに合わせて,logback.xmlにrequest_idを吐き出すように記述してやる必要がある.
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%coloredLevel %logger{15} - [%X{request_id}] %message%n%xException{10}</pattern>
</encoder>
</appender>
Interceptorを作る
先程のLoggingを以下のように書き換える.
package mygrpc.interceptors
class Logging() extends ServerInterceptor {
override def interceptCall[ReqT, RespT](
serverCall: ServerCall[ReqT, RespT],
headers: Metadata,
next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
// リクエストごとに固有のrequestIdを作る
val mdcContext: java.util.Map[String, String] = MDCContext.buildMdcContext(headers)
MDC.setContextMap(mdcContext)
val wrapperCall: ServerCall[ReqT, RespT] = new ForwardingServerCall.SimpleForwardingServerCall[ReqT, RespT](serverCall) {
override def request(numMessages: Int) {
MDC.setContextMap(mdcContext)
Logger.info("Request: " + headers.toString() + ", " + serverCall.getAttributes().toString() + ", " + serverCall.getMethodDescriptor().getFullMethodName())
super.request(numMessages)
}
override def sendMessage(message: RespT) {
MDC.setContextMap(mdcContext)
Logger.info("Response: " + message.toString())
super.sendMessage(message)
}
override def close(status: Status, trailers: Metadata) {
MDC.setContextMap(mdcContext)
Logger.info("Response: " + status.toString())
super.close(status, trailers)
MDC.clear()
}
}
val listener = next.startCall(wrapperCall, headers)
return new ForwardingServerCallListener.SimpleForwardingServerCallListener[ReqT](listener) {
override def onMessage(message: ReqT) {
MDC.setContextMap(mdcContext)
super.onMessage(message)
}
override def onHalfClose() {
MDC.setContextMap(mdcContext)
super.onHalfClose()
}
override def onCancel() {
MDC.setContextMap(mdcContext)
super.onCancel()
MDC.clear()
}
override def onComplete() {
MDC.setContextMap(mdcContext)
super.onComplete()
MDC.clear()
}
override def onReady() {
MDC.setContextMap(mdcContext)
super.onReady()
}
}
}
}
こうすることで,各所でリクエストIDがログに出力されるようになる.
また,gRPCで呼び出されたメソッド内で,自分でログを出力した場合でもちゃんとリクエストID付きのログが出る.
もちろん,例外になったときにもリクエストIDがついているので,ログを追いかけるのが非常に楽になる.
例外ハンドリングをしたい
例外を捕まえる
ログと同じく,例外も共通してどこかでハンドリングをしたくなる.
また,今使っているgrpc-javaでは,grpcサーバ内で例外がthrowされても,どこにも拾ってくれる場所がなく,grpcクライアント側は待たされ続けるという悲惨な状況になっている.
というわけで同じくInterceptorを作る.
class ErrorHandler extends ServerInterceptor {
override def interceptCall[ReqT, RespT](
serverCall: ServerCall[ReqT, RespT],
headers: Metadata,
next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
val listener = next.startCall(serverCall, headers)
return new ForwardingServerCallListener.SimpleForwardingServerCallListener[ReqT](listener) {
override def onMessage(message: ReqT) {
try {
super.onMessage(message)
} catch {
case e: Exception =>
closeWithException(e, headers)
}
}
override def onHalfClose() {
try {
super.onHalfClose()
} catch {
case e: Exception =>
closeWithException(e, headers)
}
}
override def onCancel() {
try {
super.onCancel()
} catch {
case e: Exception =>
closeWithException(e, headers)
}
}
override def onComplete() {
try {
super.onComplete()
} catch {
case e: Exception =>
closeWithException(e, headers)
}
}
override def onReady() {
try {
super.onReady()
} catch {
case e: Exception =>
closeWithException(e, headers)
}
}
private def closeWithException(t: Exception, requestHeader: Metadata) {
// 例外を拾ったのでログを出すなりする
}
}
}
}
こうしておくと,grpcサーバ内で例外が起こった際には,closeWithException
で拾うことができた.
実はgRPCにはステータスが用意されている.
https://github.com/grpc/grpc-java/blob/166108a9438c22d06eb3b371b5ad34a75e14787c/core/src/main/java/io/grpc/Status.java
なので,起こった例外種別によってステータスを変更したい場合には,このメソッド内で例外とステータスのマッピングをしてやればよい.
このステータスとのマッピングだが,
https://github.com/grpc/grpc-java/blob/2b1eee90e5bd7f5ad905e34f73f2040d6c9a3568/core/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java
既にこういったものが用意されている.つまり,StatusRuntimeException
をはけば,それを Status
にマッピングしてgRPCのレスポンスを返してくれる.
これを自分で実装するのは無駄な気がするので,あるものは使おう.
というわけで,先程例外を捕まえたメソッドでは以下のようにして,再び例外を発生させる.ただしこのとき,StatusRuntimeException
を発生させる.
private def closeWithException(t: Exception, requestHeader: Metadata) {
var status: Status = null
t match {
case e: ValidationError => status = Status.Code.INVALID_ARGUMENT.toStatus().withDescription(t.getMessage()).withCause(t)
case _ => status = Status.Code.INTERNAL.toStatus().withDescription(t.getMessage()).withCause(t)
}
Logger.error(status.toString())
throw new StatusRuntimeException(status)
}
そしたら,こいつを拾うために TransmitStatusRuntimeExceptionInterceptor
を呼ぶように,server.scala
を書き換える.
// 中略
class GrpcServer(executionContext: ExecutionContext) { self =>
private val port = sys.env.getOrElse("SERVER_PORT", "50051").asInstanceOf[String].toInt
private[this] var server: Server = null
private var users: Array[User] = Array.empty
def start(): Unit = {
server = ServerBuilder.forPort(port).addService(
ServerInterceptors.intercept(
UsersGrpc.bindService(new UsersImpl, executionContext),
new Logging,
new ErrorHandler,
TransmitStatusRuntimeExceptionInterceptor.instance()
)
).build.start
Logger.info("gRPC server started, listening on " + port)
// JVM自体がshutdownされた際に呼ばれる
sys.addShutdownHook {
Logger.info("*** shutting down gRPC server since JVM is shutting down")
self.stop()
}
}
def stop(): Unit = {
if (server != null) {
Logger.info("*** gRPC server shutdown")
server.shutdown()
}
}
def blockUnitShutdown(): Unit = {
if (server != null) {
server.awaitTermination()
}
}
private class UsersImpl extends UsersGrpc.Users {
override def create(request: User): scala.concurrent.Future[User] = {
throw new ValidationError("hoge")
Future.successful(request)
}
}
}
こうしておくと,ちゃんとcreate
メソッド内の例外で発生したValidationError
は GRPC::INVALID_ARGUMENT
としてgrpcクライアント側で受け取ることができる.
まとめ
だいたいこの記事で触れたような実装は全てこのリポジトリに詰め込んだ.
https://github.com/h3poteto/play-grpc-example
上のリポジトリは,今回開発する上で調べたり試したりしたことを詰め込んだだけなのだが,社内ではこれを参考にして,本当にscala+gRPCで新しいマイクロサービスを作っているので,そのうちリリースされると思う.
javaとかkotlin羨ましい
https://github.com/LogNet/grpc-spring-boot-starter
いや,有能すぎでしょ.これがあればそりゃー楽に開発できるだろうなーって感じがする.
なんでscalaにこういうのが生まれないのだろうか.俺もkotlinとかやったほうが良いのかもしれない.
やっぱgoだよgo
ここまでscalaの話してきてアレだけど,たぶんgRPCサーバ作るならgoで作るのが一番楽だし楽しいと思う.
そもそもgRPC周りで用意されているものが豊富なので嬉しい.「これほしいなー」って思ったらだいたいgoで実装されている.
っていう話を Go2 Advent Calendar 2017 でしようと思うのでよろしくお願いします.