scalaのgRPCで例外ハンドリングをする

またまたscalaでgRPCする話.

サーバを運用する以上,例外が起こることは避けられないと思う. そしてその例外,どこかでキャッチして適切に処理したいと思う.

gRPCサーバの例外

試しにこんなメソッドを仕込んで見るといい.

  private class UsersImpl extends UsersGrpc.Users {
    override def create(request: User): scala.concurrent.Future[User] = {
      throw new RuntimeException("hoge")
      Future.successful(request)
  }

gRPCサーバ側ではもちろんちゃんと例外を吐き出す.

しかし,呼び出したgRPCクライアント側では,応答がいつになっても帰ってこない.

悲しいことにタイムアウトまで放置された.

gRPCサーバは close メソッドを呼び出す前に例外でストップしてしまったので,応答を何も返せなくなってしまっている.

救いだそう

このままでは使えないので,例外を救いだそう.

前回interceptorの話がでてきたけど,

h3poteto.hatenablog.com

これを使うと,gRPCサーバ内で発生した例外を拾うことができる.

class ErrorHandler extends ServerInterceptor {
  override def interceptCall[ReqT, RespT](
    serverCall: ServerCall[ReqT, RespT],
    headers: Metadata,
    next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
    val listener = next.startCall(serverCall, headers)
    return new ForwardingServerCallListener.SimpleForwardingServerCallListener[ReqT](listener) {
      override def onMessage(message: ReqT) {
        try {
          super.onMessage(message)
        } catch {
          case e: Exception =>
            closeWithException(e, headers)
        }
      }

      override def onHalfClose() {
        try {
          super.onHalfClose()
        } catch {
          case e: Exception =>
            closeWithException(e, headers)
        }
      }
      override def onCancel() {
        try {
          super.onCancel()
        } catch {
          case e: Exception =>
            closeWithException(e, headers)
        }
      }

      override def onComplete() {
        try {
          super.onComplete()
        } catch {
          case e: Exception =>
            closeWithException(e, headers)
        }
      }

      override def onReady() {
        try {
          super.onReady()
        } catch {
          case e: Exception =>
            closeWithException(e, headers)
        }
      }

      private def closeWithException(t: Exception, requestHeader: Metadata) {
        // 例外を拾ったのでログを出すなりする
      }
    }
  }
}

と,こんな感じで例外を拾うことができる. あとは, closeWithException メソッドで例外を出したり,close メソッドを呼びだせば良い,はずだった.

確かにここで serverCall.close(Status, Metadata) を呼ぶとちゃんとgRPCサーバとの接続は閉じられる. しかし,任意にStatus をセットしても,それを無視してgRPCクライアントは GRPC::Unknown を受け取る.

このとき,stream removed というメッセージが出ている.

俺が求めているのは,たとえば自作の ValidationError という例外をキャッチした場合,GRPC::INVALID_ARGUMENT を返して欲しいんだ. ここの場合分けは自分で書くにしても,そういった例外機構を欲している.

TransmitStatusRuntimeExceptionInterceptorを使う

ここに非常によく似た実装を,実はgrpc-javaで見つけた.

github.com

これは,StatusRuntimeException な例外を投げておけば,その Statusに応じた例外を返しつつ,接続をcloseしてくれる.

これと同じ実装をすれば問題なさそうに見える.

が,やたら長い.javaだし,これをscalaで全部書き直す……のはなんか不毛な気がした.

interceptorを複数使おう

そもそも公式が用意してくれているなら,自作するのは最小限に止めよう.

先ほどの closeWithException を以下のように書き換える.

      private def closeWithException(t: Exception, requestHeader: Metadata) {
        var status: Status = null

        t match {
          case e: ValidationError => status = Status.Code.INVALID_ARGUMENT.toStatus().withDescription(t.getMessage()).withCause(t)
          case _ => status = Status.Code.INTERNAL.toStatus().withDescription(t.getMessage()).withCause(t)
        }
        Logger.error(status.toString())
        throw new StatusRuntimeException(status)
      }

こうすることで,サーバ内で発生した例外を,StatusRuntimeException の例外に書き換えている.

そして,interceptorを差し込むとき,

      ServerInterceptors.intercept(
        UsersGrpc.bindService(new UsersImpl, executionContext),
        new ErrorHandler,
        TransmitStatusRuntimeExceptionInterceptor.instance()
      )

として,自分が書き換えた例外を TransmitStatusRuntimeExceptionInterceptor に拾ってもらう.

こうすることで,

  private class UsersImpl extends UsersGrpc.Users {
    override def create(request: User): scala.concurrent.Future[User] = {
      throw new ValidationError("hoge")
      Future.successful(request)
  }

というような例外を出すと,gRPCクライアント側でも GRPC::INVALID_ARGUMENT を受け取ることが出来た.