gRPC如何发起远程调用

gRPC如何发起远程调用简介。

在客户端创建一个Stub,blockingStub = GreeterGrpc.newBlockingStub(channel);调用对应的方法就可以发起远程调用,HelloResponse response = blockingStub.sayHello(request);

sayHello方法中先创建一个代表远程调用的ClientCall对象,然后调用ClientCalls的远程调用帮助方法,并传入ClientCall对象和请求参数对象。

1
blockingUnaryCall(channel.newCall(METHOD_SAY_HELLO, callOptions), request);
1
2
3
4
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions) {
return interceptorChannel.newCall(method, callOptions);
}

如果之前在创建Channel的时候有设置Interceptor,会将Interceptor和channel(是RealChannel对象)封装成一个InterceptorChannel,调用它的newCall方法来创建ClientCall对象(如果加了HeaderClientInterceptor这里就是SimpleForwardingClientCall对象,ClientCall对象也是嵌套的)。

创建好ClientCall后,经过调用一系列的call方法,在asyncUnaryRequestCall方法中,调用了startCall方法,然后发送数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static <ReqT, RespT> void asyncUnaryRequestCall(
ClientCall<ReqT, RespT> call,
ReqT param,
ClientCall.Listener<RespT> responseListener,
boolean streamingResponse)
{

startCall(call, responseListener, streamingResponse);
try {
call.sendPayload(param);
call.halfClose();
} catch (Throwable t) {
call.cancel();
throw Throwables.propagate(t);
}
}

startCall方法中执行了ClientCall的start方法,执行该方法后,会执行嵌套的ClientCall的start方法,在最后一个ClientCallClientCallImpl对象,RealChannel创建的)的start方法中进行填充Header,使用transport(NettyClientTransport对象)来创建stream( NettyClientStream对象 ),传入Listener(ClientStreamListenerImpl对象,在收到服务端返回的数据时,对数据进行解码,然后将返回值设置到StreamObserver中)。

在transport创建stream的方法中将CreateStreamCommand对象(是个Headers Frame)放入WriteQueue中,WriteQueue是用来存放往服务端写操作的队列,其中的操作会使用Netty的Channel对象来执行。

startCall方法中

1
2
3
4
5
6
call.start(responseListener, new Metadata.Headers());
if (streamingResponse) {
call.request(1);
} else {
call.request(2);
}

call.start方法设置listener和请求头。call.request方法中调用stream(NettyClientStream)的request方法,stream的request的方法会将一个RequestMessagesCommand放入WriteQueue中,然后刷新这个WriteQueue,即将里面的数据都发送给服务端。

在sendPayload(param)方法中

1
2
InputStream payloadIs = method.streamRequest(payload);
stream.writeMessage(payloadIs);

method就是MethodDescriptor对象描述调用的方法,payload就是传入的参数param。streamRequest方法中return requestMarshaller.stream(requestMessage);将payload转成一个InputSteam(ProtoInputStream)。这里的requestMarshaller是ProtoUtils中的marshaller(parser)方法返回的Marshaller对象,parser是HelloRequest类中PARSER。

然后调用stream的writeMessage方法,这个stream就是之前transport创建的NettyClientStream对象,NettyClientStream继承自Http2ClientStream,Http2ClientStream继承自AbstractStream。writeMessage方法就是AbstractStream中的方法。

1
2
3
4
5
6
7
public void writeMessage(InputStream message) {
checkNotNull(message);
outboundPhase(Phase.MESSAGE);
if (!framer.isClosed()) {
framer.writePayload(message);
}
}

在framer.writePayload(message)方法中,调用message(ProtoInputStream对象)的drainTo(OutputStream)方法往outputStream(OutputStreamAdapter对象)中写数据。

1
2
3
4
5
6
7
8
9
10
11
12
public int drainTo(OutputStream target) throws IOException {
int written;
if (message != null) {
written = message.getSerializedSize();
message.writeTo(target);
message = null;
} else {
written = (int) ByteStreams.copy(partial, target);
partial = null;
}
return written;
}

这里的message就是最开始调用远程方法sayHello(request)中传过来的参数request(HelloRequest对象)。HelloRequest继承GeneratedMessage,GenenratedMessage继承了AbstractMessage,在AbstractMessage中有writeTo(OutputStream)方法,这里就是调用的该方法,在该方法中,调用了AbstractMessage的父类MessageLite的writeTo(CodedOutputStream)抽象方法,该方法由HelloRequest实现。在HelloRequest的实现中直接调用了CodedOutputStream的writeByte方法,交由protobuf框架来进行处理。

call.halfClose()方法会往WriteQueue中加入一个SendGrpcFrameCommand,并立即刷新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf();
final int numBytes = bytebuf.readableBytes();
if (numBytes > 0) {
// Add the bytes to outbound flow control.
onSendingBytes(numBytes);
writeQueue.enqueue(
new SendGrpcFrameCommand(this, bytebuf, endOfStream),
channel.newPromise().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Remove the bytes from outbound flow control, optionally notifying
// the client that they can send more bytes.
onSentBytes(numBytes);
}
}), flush);
} else {
// The frame is empty and will not impact outbound flow control. Just send it.
writeQueue.enqueue(new SendGrpcFrameCommand(this, bytebuf, endOfStream), flush);
}
}