grpc 专题介绍篇三:grpc 流式传输
简单演示 grpc 采用流式协议通信的传输效果

在上一篇文章中介绍了 grpc 的基础使用后,再来介绍下 grpc 的流式传输特性,本文已服务端流式传输的方式为例进行介绍。

流式传输是 rpc 方案相比于传统 http/1.1 request 在进行微服务通信上更灵活的特性之一,在数据传输上可以实现的更加灵活。当然,现有的 http 技术栈也支持了 websocket 这样的特性,同样也可以做到双向实时通信,但是其更多面向的场景更多是偏向浏览器/客户端通信,并不是为了承载大数据量而设计。

服务端的流式传输可以有多种用途,例如:

  • 返回任务实时的处理进度反馈,以帮助交互上获得更好的体验。
  • 对于批量返回的多条数据,采用流式方式可以避免等待所有数据的处理完成,可以边处理,边传输。
  • 通过流式传输,可以减少数据集缓存的内存资源开销,提高效率。这一点对于文件数据同样有效。

流式传输模式,在应对较大数据量传输的场景下可以带来明显优化。

grpc 流式模式通信示例

在之前对于简单请求模式的基础上进行改造,即可实现 grpc 的服务端向客户端流式返回数据的通信方式。

  • Proto 定义 - worker.proto
service Task {
    rpc runTaskProgress(TaskConf) returns (stream TaskReply) {}
}

message TaskReply {
    string message = 1;
}

注意到这次定义的 service 中,rpc 的返回参数使用 stream 状态标识,表示服务端的返回结果是通过流式传输的形式处理的。

  • ts 类型申明 - task.types.ts

在 ts 环境下申明 rpc 通信的返回结构。

export interface TaskReply {
    message: string
}
  • 服务端 - server.ts

服务端采用 handleServerStreamingCall 的函数接口来定义流式处理方法,通过 handler 方法中 call 对象提供的 write 方法来进行指定格式数据的流式返回。

const runTaskProgress: handleServerStreamingCall<TaskConf, TaskReply> = (call) => {
    const { id, job } = call.request;
    async.eachSeries(_.times(11), (idx, callback) => {
        call.write({ message: `${ idx * 10 }% task: #${ id } ${ job }` });
        setTimeout(callback, 200);
    }, (err) => {
        call.end();
    });
};

const server = new grpc.Server();
server.addService(proto.Task.service, {
    runTaskProgress
});
server.bind('0.0.0.0:9050', grpc.ServerCredentials.createInsecure());
server.start();

在服务端完成了实际业务处理的数据传输后,通过调用 call.end 方法,关闭连接的数据流。

  • 客户端 - client.stream.ts

对于定义为服务端流式传输的通信方法,在 proto buffer 配置解析后,同样会生成对应的客户端调用方法。只不过在流式场景下,发送客户端请求后,会得到一个用于监听服务器响应的 EventEmitter,可以对对应事件进行监听,来事项相关业务的处理。

import grpc from 'grpc';

import { TaskReply } from './task.types';
import { proto } from './proto';

const client = new proto.Task('localhost:9050', grpc.credentials.createInsecure());

const call = client.runTaskProgress({
    id: '001',
    job: 'test'
});
call.on('data', (reply: TaskReply) => {
    console.log(reply.message);
});
call.on('end', () => {
    console.log('client done');
});
call.on('error', (err: Error) => {
    console.error(err.stack);
});

本身 ClientReadableStream 对象继承了 node.js 原生的 Readable 对象,可以按照 Readable 的使用方式来监听其客户端的事件行为。

运行示例

分别启动服务端与客户端,可在客户端反馈输出中,看到模拟的服务端处理进度。

另两种通信模式介绍

除了目前已经演示过的简单模式与服务端流式模式以外,grpc 还提供了客户端流式模式,以及双向流式传输模式的处理方式。

  • 客户端流式传输 - Client streaming RPC

    与服务端的流式传输方式类似,客户端的流式传输方式相当于是把协议进行了交换,由发起方决定通信传输的数据流,服务端在处理完数据后,最后进行统一的返回,返回的形式与普通的请求调用方式类似。

  • 双向流式传输 - Bidirectional streaming RPC

    客户端与服务端都通过流式的方式进行数据传输。此方式通信在实现上最为灵活,客户端和服务端双方都可以自己选择发送/响应数据的时机,可以实现更灵活的通信协议。例如可以实现对数据库操作的持久连接,用于同一会话的多次数据库查询,而避免每次重建连接的性能开销。

    这种模式可在不中断连接的状态下,进行双向的实时通信。

另外,由于 grpc 本身是建立在 HTTP/2 的 TCP 通信上的,所以在数据处理过程中如果出现较长时间内无通信的情况,连接本身可能会中断。目前 grpc 内部通过一个内置的 timer 实现了对于长时间通信的 keepalive 心跳包发送行为,用于连接的维持。不过在部分特殊网络状态下,keepalive 可能会被阻止,所以在相关场景下仍需注意连接中断的问题。

grpc 默认的客户端通行时间长度原则上没有限制,取决于各语言环境的情况,但是服务端默认的最长时间是 2 小时,正常情况下已经足够用了。本身如果一次通信的连接时间过长,失败重传的代价会变得非常巨大。

参考文档:

  1. https://grpc.io/docs/guides/concepts.html
  2. https://github.com/grpc/grpc/blob/master/doc/keepalive.md

最后修改于 2019-04-16