またまたscalaでgRPCする話.
サーバを運用する以上,例外が起こることは避けられないと思う. そしてその例外,どこかでキャッチして適切に処理したいと思う.
gRPCサーバの例外
試しにこんなメソッドを仕込んで見るといい.
private class UsersImpl extends UsersGrpc.Users { override def create(request: User): scala.concurrent.Future[User] = { throw new RuntimeException("hoge") Future.successful(request) }
gRPCサーバ側ではもちろんちゃんと例外を吐き出す.
しかし,呼び出したgRPCクライアント側では,応答がいつになっても帰ってこない.
悲しいことにタイムアウトまで放置された.
gRPCサーバは close
メソッドを呼び出す前に例外でストップしてしまったので,応答を何も返せなくなってしまっている.
救いだそう
このままでは使えないので,例外を救いだそう.
前回interceptorの話がでてきたけど,
これを使うと,gRPCサーバ内で発生した例外を拾うことができる.
class ErrorHandler extends ServerInterceptor { override def interceptCall[ReqT, RespT]( serverCall: ServerCall[ReqT, RespT], headers: Metadata, next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = { val listener = next.startCall(serverCall, headers) return new ForwardingServerCallListener.SimpleForwardingServerCallListener[ReqT](listener) { override def onMessage(message: ReqT) { try { super.onMessage(message) } catch { case e: Exception => closeWithException(e, headers) } } override def onHalfClose() { try { super.onHalfClose() } catch { case e: Exception => closeWithException(e, headers) } } override def onCancel() { try { super.onCancel() } catch { case e: Exception => closeWithException(e, headers) } } override def onComplete() { try { super.onComplete() } catch { case e: Exception => closeWithException(e, headers) } } override def onReady() { try { super.onReady() } catch { case e: Exception => closeWithException(e, headers) } } private def closeWithException(t: Exception, requestHeader: Metadata) { // 例外を拾ったのでログを出すなりする } } } }
と,こんな感じで例外を拾うことができる.
あとは, closeWithException
メソッドで例外を出したり,close
メソッドを呼びだせば良い,はずだった.
確かにここで serverCall.close(Status, Metadata)
を呼ぶとちゃんとgRPCサーバとの接続は閉じられる.
しかし,任意にStatus
をセットしても,それを無視してgRPCクライアントは GRPC::Unknown
を受け取る.
このとき,stream removed
というメッセージが出ている.
俺が求めているのは,たとえば自作の ValidationError
という例外をキャッチした場合,GRPC::INVALID_ARGUMENT
を返して欲しいんだ.
ここの場合分けは自分で書くにしても,そういった例外機構を欲している.
TransmitStatusRuntimeExceptionInterceptorを使う
ここに非常によく似た実装を,実はgrpc-javaで見つけた.
これは,StatusRuntimeException
な例外を投げておけば,その Status
に応じた例外を返しつつ,接続をcloseしてくれる.
これと同じ実装をすれば問題なさそうに見える.
が,やたら長い.javaだし,これをscalaで全部書き直す……のはなんか不毛な気がした.
interceptorを複数使おう
そもそも公式が用意してくれているなら,自作するのは最小限に止めよう.
先ほどの closeWithException
を以下のように書き換える.
private def closeWithException(t: Exception, requestHeader: Metadata) { var status: Status = null t match { case e: ValidationError => status = Status.Code.INVALID_ARGUMENT.toStatus().withDescription(t.getMessage()).withCause(t) case _ => status = Status.Code.INTERNAL.toStatus().withDescription(t.getMessage()).withCause(t) } Logger.error(status.toString()) throw new StatusRuntimeException(status) }
こうすることで,サーバ内で発生した例外を,StatusRuntimeException
の例外に書き換えている.
そして,interceptorを差し込むとき,
ServerInterceptors.intercept( UsersGrpc.bindService(new UsersImpl, executionContext), new ErrorHandler, TransmitStatusRuntimeExceptionInterceptor.instance() )
として,自分が書き換えた例外を TransmitStatusRuntimeExceptionInterceptor
に拾ってもらう.
こうすることで,
private class UsersImpl extends UsersGrpc.Users { override def create(request: User): scala.concurrent.Future[User] = { throw new ValidationError("hoge") Future.successful(request) }
というような例外を出すと,gRPCクライアント側でも GRPC::INVALID_ARGUMENT
を受け取ることが出来た.