gRPC服务端如何响应请求简介。
1 2 3 server = NettyServerBuilder.forPort(port) .addService(ServerInterceptors.intercept(GreeterGrpc.bindService(new GreeterImpl()), new HeaderServerInterceptor())) .build().start();
在服务端,创建server的时候绑定service和添加可选的ServerInterceptor
。GreeterGrpc.bindService
方法返回一个ServerServiceDefinition
,ServerServiceDefinition
绑定了处理请求的ServerCallHandler
。ServerInterceptors.intercept
方法将ServerInterceptor
和ServerCallHandler
包装成一个InterceptorCallHandler
(实现了ServerCallHandler
)。build方法创建一个ServerImpl
对象,该对象中包含一个NettyServer
对象。start方法中调用的是NettyServer
对象的start方法并传入ServerListenerImpl
对象。
start方法创建NettyServerTransport
,设置NettyServerHandler
,绑定监听地址,启动监听。
1 2 3 4 5 6 7 Http2Stream http2Stream = requireHttp2Stream(streamId); NettyServerStream stream = new NettyServerStream(ctx.channel(), http2Stream, this ); http2Stream.setProperty(streamKey, stream); String method = determineMethod(streamId, headers); ServerStreamListener listener = transportListener.streamCreated(stream, method, Utils.convertHeaders(headers)); stream.setListener(listener);
当有一个请求过来的时候,收到Headers Frame会先调用NettyServerHandler
中的onHeadersRead
方法,创建NettyServerStream
并设置ServerStreamListener
(JumpToApplicationThreadServerStreamListener
对象)。
streamCreated方法的部分代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 final JumpToApplicationThreadServerStreamListener jumpListener = new JumpToApplicationThreadServerStreamListener(serializingExecutor, stream); serializingExecutor.execute(new Runnable() { @Override public void run () { ServerStreamListener listener = NOOP_LISTENER; try { HandlerRegistry.Method method = registry.lookupMethod(methodName); if (method == null ) { stream.close( Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName), new Metadata.Trailers()); timeout.cancel(true ); return ; } listener = startCall(stream, methodName, method.getMethodDefinition(), timeout, headers); } catch (Throwable t) { stream.close(Status.fromThrowable(t), new Metadata.Trailers()); timeout.cancel(true ); throw Throwables.propagate(t); } finally { jumpListener.setListener(listener); } return jumpListener;
调用startCall方法将引起一系列InterceptorCallHandler
的startCall方法,并返回一个ServerStreamListener
(ServerStreamListenerImpl
),并将其设置到JumpToApplicationThreadServerStreamListener
中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private <ReqT, RespT> ServerStreamListener startCall (ServerStream stream, String fullMethodName, ServerMethodDefinition<ReqT, RespT> methodDef, Future<?> timeout, Metadata.Headers headers) { final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<ReqT, RespT>( stream, methodDef.getMethodDescriptor()); ServerCall.Listener<ReqT> listener = methodDef.getServerCallHandler().startCall(fullMethodName, call, headers); if (listener == null ) { throw new NullPointerException( "startCall() returned a null listener for method " + fullMethodName); } return call.newServerStreamListener(listener, timeout); }
当读到Data数据时,将交由NettyServerStream
的inboundDataReceived方法处理,该方法会对数据进行解码,并触发JumpToApplicationThreadServerStreamListener
的messageRead方法。
1 2 3 4 5 6 7 8 9 private void onDataRead (int streamId, ByteBuf data, boolean endOfStream) throws Http2Exception { try { NettyServerStream stream = serverStream(requireHttp2Stream(streamId)); stream.inboundDataReceived(data, endOfStream); } catch (Throwable e) { logger.log(Level.WARNING, "Exception in onDataRead()" , e); throw newStreamException(streamId, e); } }
JumpToApplicationThreadServerStreamListener
的messageRead方法使用ServerStreamListenerImpl
的messageRead方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void messageRead (final InputStream message) { try { if (cancelled) { return ; } listener.onPayload(method.parseRequest(message)); } finally { try { message.close(); } catch (IOException e) { throw new RuntimeException(e); } } }
这里的listener是EmptyServerCallListener
对象,该对象中的onPayload方法并没有调用目标方法,而是在onHalfClose方法中调用了目标方法。
1 2 3 4 5 6 7 8 public void onHalfClose () { if (request != null ) { method.invoke(request, responseObserver); } else { call.close(Status.INVALID_ARGUMENT.withDescription("Half-closed without a request" ), new Metadata.Trailers()); } }
method.invokde
方法的内容在一开始的bindService中定义,该方法中就是serviceImpl.sayHello(request, responseObserver);
。
sayHello方法如下:
1 2 3 4 5 public void sayHello (HelloRequest req, StreamObserver<HelloResponse> responseObserver) { HelloResponse reply = HelloResponse.newBuilder().setMessage("Hello " + req.getName()).build(); responseObserver.onValue(reply); responseObserver.onCompleted(); }
在ResponseObserver
的onValue和onCompleted方法中调用ServerCallImpl
的对应方法完成向客户端发送数据的操作。
1 2 3 4 5 6 7 8 9 10 11 12 public void sendPayload (RespT payload) { Preconditions.checkState(!closeCalled, "call is closed" ); sendPayloadCalled = true ; try { InputStream message = method.streamResponse(payload); stream.writeMessage(message); stream.flush(); } catch (Throwable t) { close(Status.fromThrowable(t), new Metadata.Trailers()); throw Throwables.propagate(t); } }
method.streamResponse
对响应数据进行ProtoBuf
的编码,然后由stream写向客户端,与客户端往服务器端写数据一样,封装成一个SendGrpcFrameCommand
放入WriteQueue
中。