gRPC服务端如何响应请求简介。
1 2 3 server = NettyServerBuilder.forPort(port) .addService(ServerInterceptors.intercept(GreeterGrpc.bindService(new GreeterImpl()), new HeaderServerInterceptor())) .build().start();
在服务端,创建server的时候绑定service和添加可选的ServerInterceptor。GreeterGrpc.bindService方法返回一个ServerServiceDefinition,ServerServiceDefinition绑定了处理请求的ServerCallHandler。ServerInterceptors.intercept方法将ServerInterceptor和ServerCallHandler包装成一个InterceptorCallHandler(实现了ServerCallHandler)。build方法创建一个ServerImpl对象,该对象中包含一个NettyServer对象。start方法中调用的是NettyServer对象的start方法并传入ServerListenerImpl对象。
start方法创建NettyServerTransport,设置NettyServerHandler,绑定监听地址,启动监听。
1 2 3 4 5 6 7 Http2Stream http2Stream = requireHttp2Stream(streamId); NettyServerStream stream = new NettyServerStream(ctx.channel(), http2Stream, this ); http2Stream.setProperty(streamKey, stream); String method = determineMethod(streamId, headers); ServerStreamListener listener = transportListener.streamCreated(stream, method, Utils.convertHeaders(headers)); stream.setListener(listener);
当有一个请求过来的时候,收到Headers Frame会先调用NettyServerHandler中的onHeadersRead方法,创建NettyServerStream并设置ServerStreamListener(JumpToApplicationThreadServerStreamListener对象)。
streamCreated方法的部分代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 final JumpToApplicationThreadServerStreamListener jumpListener = new JumpToApplicationThreadServerStreamListener(serializingExecutor, stream); serializingExecutor.execute(new Runnable() { @Override public void run () { ServerStreamListener listener = NOOP_LISTENER; try { HandlerRegistry.Method method = registry.lookupMethod(methodName); if (method == null ) { stream.close( Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName), new Metadata.Trailers()); timeout.cancel(true ); return ; } listener = startCall(stream, methodName, method.getMethodDefinition(), timeout, headers); } catch (Throwable t) { stream.close(Status.fromThrowable(t), new Metadata.Trailers()); timeout.cancel(true ); throw Throwables.propagate(t); } finally { jumpListener.setListener(listener); } return jumpListener;
调用startCall方法将引起一系列InterceptorCallHandler的startCall方法,并返回一个ServerStreamListener(ServerStreamListenerImpl),并将其设置到JumpToApplicationThreadServerStreamListener中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private <ReqT, RespT> ServerStreamListener startCall (ServerStream stream, String fullMethodName, ServerMethodDefinition<ReqT, RespT> methodDef, Future<?> timeout, Metadata.Headers headers) { final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<ReqT, RespT>( stream, methodDef.getMethodDescriptor()); ServerCall.Listener<ReqT> listener = methodDef.getServerCallHandler().startCall(fullMethodName, call, headers); if (listener == null ) { throw new NullPointerException( "startCall() returned a null listener for method " + fullMethodName); } return call.newServerStreamListener(listener, timeout); }
当读到Data数据时,将交由NettyServerStream的inboundDataReceived方法处理,该方法会对数据进行解码,并触发JumpToApplicationThreadServerStreamListener的messageRead方法。
1 2 3 4 5 6 7 8 9 private void onDataRead (int streamId, ByteBuf data, boolean endOfStream) throws Http2Exception { try { NettyServerStream stream = serverStream(requireHttp2Stream(streamId)); stream.inboundDataReceived(data, endOfStream); } catch (Throwable e) { logger.log(Level.WARNING, "Exception in onDataRead()" , e); throw newStreamException(streamId, e); } }
JumpToApplicationThreadServerStreamListener的messageRead方法使用ServerStreamListenerImpl的messageRead方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void messageRead (final InputStream message) { try { if (cancelled) { return ; } listener.onPayload(method.parseRequest(message)); } finally { try { message.close(); } catch (IOException e) { throw new RuntimeException(e); } } }
这里的listener是EmptyServerCallListener对象,该对象中的onPayload方法并没有调用目标方法,而是在onHalfClose方法中调用了目标方法。
1 2 3 4 5 6 7 8 public void onHalfClose () { if (request != null ) { method.invoke(request, responseObserver); } else { call.close(Status.INVALID_ARGUMENT.withDescription("Half-closed without a request" ), new Metadata.Trailers()); } }
method.invokde方法的内容在一开始的bindService中定义,该方法中就是serviceImpl.sayHello(request, responseObserver);。
sayHello方法如下:
1 2 3 4 5 public void sayHello (HelloRequest req, StreamObserver<HelloResponse> responseObserver) { HelloResponse reply = HelloResponse.newBuilder().setMessage("Hello " + req.getName()).build(); responseObserver.onValue(reply); responseObserver.onCompleted(); }
在ResponseObserver的onValue和onCompleted方法中调用ServerCallImpl的对应方法完成向客户端发送数据的操作。
1 2 3 4 5 6 7 8 9 10 11 12 public void sendPayload (RespT payload) { Preconditions.checkState(!closeCalled, "call is closed" ); sendPayloadCalled = true ; try { InputStream message = method.streamResponse(payload); stream.writeMessage(message); stream.flush(); } catch (Throwable t) { close(Status.fromThrowable(t), new Metadata.Trailers()); throw Throwables.propagate(t); } }
method.streamResponse对响应数据进行ProtoBuf的编码,然后由stream写向客户端,与客户端往服务器端写数据一样,封装成一个SendGrpcFrameCommand放入WriteQueue中。