gRPC服务端如何响应请求

gRPC服务端如何响应请求简介。

1
2
3
server = NettyServerBuilder.forPort(port)
.addService(ServerInterceptors.intercept(GreeterGrpc.bindService(new GreeterImpl()), new HeaderServerInterceptor()))
.build().start();

在服务端,创建server的时候绑定service和添加可选的ServerInterceptorGreeterGrpc.bindService方法返回一个ServerServiceDefinition,ServerServiceDefinition绑定了处理请求的ServerCallHandlerServerInterceptors.intercept方法将ServerInterceptorServerCallHandler包装成一个InterceptorCallHandler(实现了ServerCallHandler)。build方法创建一个ServerImpl对象,该对象中包含一个NettyServer对象。start方法中调用的是NettyServer对象的start方法并传入ServerListenerImpl对象。

start方法创建NettyServerTransport,设置NettyServerHandler,绑定监听地址,启动监听。

1
2
3
4
5
6
7
Http2Stream http2Stream = requireHttp2Stream(streamId);
NettyServerStream stream = new NettyServerStream(ctx.channel(), http2Stream, this);
http2Stream.setProperty(streamKey, stream);
String method = determineMethod(streamId, headers);
ServerStreamListener listener =
transportListener.streamCreated(stream, method, Utils.convertHeaders(headers));
stream.setListener(listener);

当有一个请求过来的时候,收到Headers Frame会先调用NettyServerHandler中的onHeadersRead方法,创建NettyServerStream并设置ServerStreamListenerJumpToApplicationThreadServerStreamListener对象)。

streamCreated方法的部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final JumpToApplicationThreadServerStreamListener jumpListener
= new JumpToApplicationThreadServerStreamListener(serializingExecutor, stream);
serializingExecutor.execute(new Runnable() {
@Override
public void run() {
ServerStreamListener listener = NOOP_LISTENER;
try {
HandlerRegistry.Method method = registry.lookupMethod(methodName);
if (method == null) {
stream.close(
Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName),
new Metadata.Trailers());
timeout.cancel(true);
return;
}
listener = startCall(stream, methodName, method.getMethodDefinition(), timeout,
headers);
} catch (Throwable t) {
stream.close(Status.fromThrowable(t), new Metadata.Trailers());
timeout.cancel(true);
throw Throwables.propagate(t);
} finally {
jumpListener.setListener(listener);
}
return jumpListener;

调用startCall方法将引起一系列InterceptorCallHandler的startCall方法,并返回一个ServerStreamListenerServerStreamListenerImpl),并将其设置到JumpToApplicationThreadServerStreamListener中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
ServerMethodDefinition<ReqT, RespT> methodDef, Future<?> timeout,
Metadata.Headers headers)
{


final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<ReqT, RespT>(
stream, methodDef.getMethodDescriptor());
ServerCall.Listener<ReqT> listener
= methodDef.getServerCallHandler().startCall(fullMethodName, call, headers);
if (listener == null) {
throw new NullPointerException(
"startCall() returned a null listener for method " + fullMethodName);
}
return call.newServerStreamListener(listener, timeout);
}

当读到Data数据时,将交由NettyServerStream的inboundDataReceived方法处理,该方法会对数据进行解码,并触发JumpToApplicationThreadServerStreamListener的messageRead方法。

1
2
3
4
5
6
7
8
9
private void onDataRead(int streamId, ByteBuf data, boolean endOfStream) throws Http2Exception {
try {
NettyServerStream stream = serverStream(requireHttp2Stream(streamId));
stream.inboundDataReceived(data, endOfStream);
} catch (Throwable e) {
logger.log(Level.WARNING, "Exception in onDataRead()", e);
throw newStreamException(streamId, e);
}
}

JumpToApplicationThreadServerStreamListener的messageRead方法使用ServerStreamListenerImpl的messageRead方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void messageRead(final InputStream message) {
try {
if (cancelled) {
return;
}

listener.onPayload(method.parseRequest(message));
} finally {
try {
message.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

这里的listener是EmptyServerCallListener对象,该对象中的onPayload方法并没有调用目标方法,而是在onHalfClose方法中调用了目标方法。

1
2
3
4
5
6
7
8
public void onHalfClose() {
if (request != null) {
method.invoke(request, responseObserver);
} else {
call.close(Status.INVALID_ARGUMENT.withDescription("Half-closed without a request"),
new Metadata.Trailers());
}
}

method.invokde方法的内容在一开始的bindService中定义,该方法中就是serviceImpl.sayHello(request, responseObserver);

sayHello方法如下:

1
2
3
4
5
public void sayHello(HelloRequest req, StreamObserver<HelloResponse> responseObserver) {
HelloResponse reply = HelloResponse.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onValue(reply);
responseObserver.onCompleted();
}

ResponseObserver的onValue和onCompleted方法中调用ServerCallImpl的对应方法完成向客户端发送数据的操作。

1
2
3
4
5
6
7
8
9
10
11
12
public void sendPayload(RespT payload) {
Preconditions.checkState(!closeCalled, "call is closed");
sendPayloadCalled = true;
try {
InputStream message = method.streamResponse(payload);
stream.writeMessage(message);
stream.flush();
} catch (Throwable t) {
close(Status.fromThrowable(t), new Metadata.Trailers());
throw Throwables.propagate(t);
}
}

method.streamResponse对响应数据进行ProtoBuf的编码,然后由stream写向客户端,与客户端往服务器端写数据一样,封装成一个SendGrpcFrameCommand放入WriteQueue中。