RPC ( Remote Procedure Call ),翻译过来为“远程过程调用”,是一种分布式系统中服务或节点之间的有效通信机制。通过 RPC,某个节点(或客户端)可以很轻松的调用远端(或服务端)的方法或服务,就像在本地调用一样简单。现有的很多 RPC 框架都要求暴露服务端地址,也就是需要知道服务器的 IP 和 RPC 端口。而本篇文章将介绍一种不需要暴露 IP 地址和端口的 RPC 通信方式。这种方式是基于 Redis BRPOP/BLPOP 操作实现的延迟队列,以及 Golang 中的 goroutine 协程异步机制,整个框架非常简单和易于理解,同时也很高效、稳定和安全。这种方式已经应用到了 Crawlab 中的节点通信当中,成为了各节点即时传输信息的主要方式。下面我们将从 Crawlab 早期节点通信方案 PubSub 开始,介绍当时遇到的问题和解决方案,然后如何过渡到现在的 RPC 解决方案,以及它是如何在 Crawlab 中发挥作用的。
早期的 Crawlab 是基于 Redis 的 PubSub,也就是发布订阅模式。这是 Redis 中主要用于一对多的单向通信的方案。其用法非常简单:
SUBSCRIBE channel1 channel2 ...
来订阅一个或多个频道;PUBLISH channelx message
来发布消息给该频道的订阅者。Redis 的PubSub
可以用作广播模式,即一个发布者对应多个订阅者。而在 Crawlab 中,我们只有一个订阅者对应一个发布者的情况(主节点->工作节点:nodes:<node_id>
)或一个订阅者对应多个发布者的情况(工作节点->主节点:nodes:master>
)。这是为了方便双向通信。
以下为节点通信原理示意图。
各个节点会通过 Redis 的PubSub
功能来做相互通信。
所谓PubSub
,简单来说是一个发布订阅模式。订阅者( Subscriber )会在 Redis 上订阅( Subscribe )一个通道,其他任何一个节点都可以作为发布者( Publisher )在该通道上发布( Publish )消息。
在 Crawlab 中,主节点会订阅 nodes:master
通道,其他节点如果需要向主节点发送消息,只需要向 nodes:master
发布消息就可以了。同理,各工作节点会各自订阅一个属于自己的通道 nodes:<node_id>
(node_id
是 MongoDB 里的节点 ID,是 MongoDB ObjectId ),如果需要给工作节点发送消息,只需要发布消息到该通道就可以了。
一个网络请求的简单过程如下:
PubSub
的 <nodes:<node_id>
通道发布消息给相应的工作节点;<nodes:master>
通道发布给主节点;不是所有节点通信都是双向的,也就是说,主节点只会单方面对工作节点通信,工作节点并不会返回响应给主节点,所谓的单向通信。以下是 Crawlab 的通信类型。
chan
和 goroutine
如果您在阅读 Crawlab 源码,会发现节点通信中有大量的 chan
语法,这是 Golang 的一个并发特性。
chan
表示为一个通道,在 Golang 中分为无缓冲和有缓冲的通道,我们用了无缓冲通道来阻塞协程,只有当 chan
接收到信号(chan <- "some signal"
),该阻塞才会释放,协程进行下一步操作)。在请求响应模式中,如果为双向通信,主节点收到请求后会起生成一个无缓冲通道来阻塞该请求,当收到来自工作节点的消息后,向该无缓冲通道赋值,阻塞释放,返回响应给客户端。
go
命令会起一个 goroutine
(协程)来完成并发,配合 chan
,该协程可以利用无缓冲通道挂起,等待信号执行接下来的操作。
PubSub 这种消息订阅-发布设计模式是一种有效的实现节点通信的方式,但是它有两个问题:
goroutine
和 channel
,这加大了开发难度,降低了可维护性。其中,第二个问题是比较棘手的。如果我们希望加入更多的功能,需要写大量的异步代码,这会加大系统模块间的耦合度,造成扩展性很差,而且代码阅读起来很痛苦。
因此,为了解决这个问题,我们采用了基于 Redis 延迟消息队列的 RPC 服务。
下图是基于延迟队列架构的 RPC 实现示意图。
每一个节点都有一个客户端( Client )和服务端( Server )。客户端用于发送消息到目标节点( Target Node )并接收其返回的消息,服务端用于接收、处理源节点( Source Node )的消息并返回消息给源节点的客户端。
整个 RPC 通信的流程如下:
LPUSH
将消息推送到 Redis 的 nodes:<node_id>
中,并执行 BRPOP nodes:<node_id>:<msg_id>
阻塞并监听这个消息队列;BRPOP
一直在监听 nodes:<node_id>
,收到消息后,通过消息中的 Method
字段执行对应的程序;LPUSH
将消息推送到 Redis 的 nodes:<node_id>:<msg_id>
中;nodes:<node_id>:<msg_id>
这个消息队列,当目标节点服务端推送消息到这个队列后,源节点客户端将立即收到返回的消息,再做后续处理。这样,整个节点的通信流程就通过 Redis 完成了。这样做的好处在于不用暴露 HTTP 的 IP 地址和端口,只需要知道节点 ID 即可完成 RPC 通信。
这样设计后的 RPC 代码比较容易理解和维护。每次需要扩展新的通信类别时,只需要继承 rpc.Service
类,实现 ClientHandle
(客户端处理方法)和 ServerHandle
(服务端处理方法)方法就可以了。
这里多说一下 BRPOP
。它将移出并获取消息队列的最后一个元素, 如果消息队列没有元素会阻塞队列直到等待超时或发现可弹出元素为止。因此,使用 BRPOP
命令相对于轮训或其他方式,可以避免不间断的请求 Redis,避免浪费网络和计算资源。
如果对 Redis 的操作命令不熟悉的,可以参考一下掘金小册《 Redis 深度历险:核心原理与应用实践》,这本小册深入介绍了 Redis 的原理以及工程实践,对于应用 Redis 到实际开发中非常实用。
讲了这么多理论知识,我们还是需要看看代码的。老师常教育我们:“Talk is cheap. Show me the code.”
由于 Crawlab 后端是 Golang 开发的,要理解以下代码需要一些 Golang 的基础知识。
首先我们需要定一个传输消息的数据结构。代码如下。
package entity
type RpcMessage struct {
Id string `json:"id"` // 消息 ID
Method string `json:"method"` // 消息方法
NodeId string `json:"node_id"` // 节点 ID
Params map[string]string `json:"params"` // 参数
Timeout int `json:"timeout"` // 超时
Result string `json:"result"` // 结果
Error string `json:"error"` // 错误
}
这里,我们定义了消息 ID、方法、节点 ID、参数等字段。消息 ID 是 UUID,保证了消息 ID 的唯一性。
首先,我们定义一个抽象基础接口,方便让实际业务逻辑模块继承。服务端的处理逻辑在 ServerHandle
中,返回 entity
里的 RpcMessage
,而客户端的逻辑在 ClientHandle
中。
// RPC 服务基础类
type Service interface {
ServerHandle() (entity.RpcMessage, error)
ClientHandle() (interface{}, error)
}
当我们调用客户端的通用方法的时候,需要实现两个逻辑:
以下是实现的代码。
// 客户端处理消息函数
func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) {
return func() (replyMsg entity.RpcMessage, err error) {
// 请求 ID
msg.Id = uuid.NewV4().String()
// 发送 RPC 消息
msgStr := utils.ObjectToString(msg)
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", msg.NodeId), msgStr); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 获取 RPC 回复消息
dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s:%s", msg.NodeId, msg.Id), msg.Timeout)
if err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 反序列化消息
if err := json.Unmarshal([]byte(dataStr), &replyMsg); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 如果返回消息有错误,返回错误
if replyMsg.Error != "" {
return replyMsg, errors.New(replyMsg.Error)
}
return
}
}
服务端处理的逻辑如下,大致的逻辑是:
您可以在 InitRpcService
这个方法中看到上述逻辑。私有方法 handleMsg
实现了序列化、调用服务端 RPC 服务方法、发送返回消息的逻辑。如果需要拓展 RPC 方法类型,在工厂类方法 GetService
里添加就可以了。
// 获取 RPC 服务
func GetService(msg entity.RpcMessage) Service {
switch msg.Method {
case constants.RpcInstallLang:
return &InstallLangService{msg: msg}
case constants.RpcInstallDep:
return &InstallDepService{msg: msg}
case constants.RpcUninstallDep:
return &UninstallDepService{msg: msg}
case constants.RpcGetLang:
return &GetLangService{msg: msg}
case constants.RpcGetInstalledDepList:
return &GetInstalledDepsService{msg: msg}
}
return nil
}
// 处理 RPC 消息
func handleMsg(msgStr string, node model.Node) {
// 反序列化消息
var msg entity.RpcMessage
if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
// 获取 service
service := GetService(msg)
// 根据 Method 调用本地方法
replyMsg, err := service.ServerHandle()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
// 发送返回消息
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
}
// 初始化服务端 RPC 服务
func InitRpcService() error {
go func() {
for {
// 获取当前节点
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
// 获取获取消息队列信息
msgStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
if err != nil {
if err != redis.ErrNil {
log.Errorf(err.Error())
debug.PrintStack()
}
continue
}
// 处理消息
go handleMsg(msgStr, node)
}
}()
return nil
}
Crawlab 的节点上经常需要为爬虫安装一些第三方依赖,例如 pymongo、requests 等。而其中,我们也需要直到某个节点上是否已经安装了某个依赖,这需要跨服务器通信,也就是需要在分布式网络中进行双向通信。而这个逻辑是通过 RPC 实现的。主节点向目标节点发起 RPC 调用,目标节点运行被调用方法,将运行结果也就是安装的依赖列表返回给客户端,客户端再返回给调用者。
下面的代码实现了获取目标节点上已安装的依赖的 RPC 方法。
// 获取已安装依赖服务
// 继承 Service 基础类
type GetInstalledDepsService struct {
msg entity.RpcMessage
}
// 服务端处理方法
// 重载 ServerHandle
func (s *GetInstalledDepsService) ServerHandle() (entity.RpcMessage, error) {
lang := utils.GetRpcParam("lang", s.msg.Params)
deps, err := GetInstalledDepsLocal(lang)
if err != nil {
s.msg.Error = err.Error()
return s.msg, err
}
resultStr, _ := json.Marshal(deps)
s.msg.Result = string(resultStr)
return s.msg, nil
}
// 客户端处理方法
// 重载 ClientHandle
func (s *GetInstalledDepsService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
s.msg, err = ClientFunc(s.msg)()
if err != nil {
return o, err
}
// 反序列化
var output []entity.Dependency
if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil {
return o, err
}
o = output
return
}
写好了 RPC 服务端和客户端处理方法,就可以轻松编写调用逻辑了。以下是调用获取远端已安装依赖列表的方法。首先由 GetService
工厂类获取之前定义好的 GetInstalledDepsService
,再调用其客户端处理方法 ClientHandle
,然后返回结果。这就像在本地调用方法一样。是不是很简单?
// 获取远端已安装依赖
func GetInstalledDepsRemote(nodeId string, lang string) (deps []entity.Dependency, err error) {
params := make(map[string]string)
params["lang"] = lang
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcGetInstalledDepList,
Params: params,
Timeout: 60,
})
o, err := s.ClientHandle()
if err != nil {
return
}
deps = o.([]entity.Dependency)
return
}
本篇文章主要介绍了一种基于 Redis 延迟队列的 RPC 通信方式,这种方式不用暴露各个节点或服务的 IP 地址或端口,是一种非常安全的方式。而且,这种方式已经用 Golang 在 Crawlab 中实现了双向通信,特别是 Golang 中的天生支持异步的 goroutine,让这种方式的实现变得简单。实际上,这种方式理论上是非常高效的,能够支持高并发数据传输。
但是,在 Crawlab 的实现中还存在一些隐患,也就是它并没有限制服务端的处理并发数量。因此如果传输消息过多时,服务端资源会被占满,导致处理速度变慢甚至宕机的风险。修复方式是在服务端限制并发数量。另外,限于时间的原因,作者还没有来得及测试这种 RPC 通信方式的实际传输效率,容错机制也没有加入。因此总的来说还有很大的提升和优化空间。
虽然如此,这种方式对于 Crawlab 的低并发远程通信来说是足够的了,在实际使用中也没有出现问题,非常稳定。对于隐秘性有要求、希望不暴露地址信息的开发者,我们也推荐将该种方式在实际应用中尝试。
1
xiaofan2 2020-03-17 09:48:32 +08:00
想法不错 不过适用范围有限
|
3
jelipo 2020-03-17 10:16:20 +08:00
原理还是使用队列实现,为什么不直接用现成的队列中间件
|
4
dapang1221 2020-03-17 10:18:35 +08:00
所以就是给 redis 套了个壳再封装一次吗…
|
5
bbao 2020-03-17 10:20:07 +08:00
看了标题,瞬间拉倒评论;
楼主 GRPC 不香么?要自己造轮子 |
6
chendy 2020-03-17 10:34:02 +08:00
基于 Redis 的 RPC ???
|
8
tikazyq OP @dapang1221 有不套壳的实现?
|
11
gowk 2020-03-17 10:51:52 +08:00 via Android
图画的不错,我想知道用什么工具画的
|
13
ayavvv 2020-03-17 10:59:24 +08:00
基于 redis 套壳做了消息队列,完了又基于消息队列做了 rpc。。。老哥你真的是人才。。
|
15
index90 2020-03-17 11:17:12 +08:00
这种设计是有价值的,把 client 和 server 当作是一个节点上的 reciver 和 sender,就实现了一个并行处理服务了。利用多进程代替了多线程,从而可以方便进行调度管理。微信的后台就有类似这样的实现
|
16
pkwenda 2020-03-17 12:07:54 +08:00
思路不错,可以举一反三
|
17
myzWILLmake 2020-03-17 12:59:27 +08:00
前公司的游戏服务端消息队列+rpc 就是这么实现的,实际用下来还可以。
|
18
tikazyq OP @myzWILLmake 也是用 Redis 么,性能怎么样
|
19
kaneg 2020-03-17 13:19:49 +08:00 via iPhone 1
最近做了一个项目需要解决两个服务之间相互调用的问题。服务之间互相都不知道对方的地址,唯一有联系的是它们都连同一个 Kafka。最后的方案也是在 Kafka 上套了个 rpc 的壳。
如果不看实际场景,这种方案是挺奇怪的。但是它给我们的项目在不可能完成的的情况下创造了一种可能性。 |
20
iyaozhen 2020-03-17 13:20:30 +08:00
你这感觉实际有很多问题
1. 用队列没有常规 rpc 调用响应快 2. redis 容易单点故障,而且一旦服务异常,redis 内存就爆了(亲身经历) 3. 消息序列化使用 json 性能也不好,最好用 pb。json 另外一个坏处就是格式是松散的,没有严格限制,开发效率也不高,没有代码提示 |
22
tikazyq OP @iyaozhen 这种方案性能上肯定是没有常规 RPC 高的,Redis 单点故障可能可以通过集群的方式来解决,json 的方式确实问题很多,但这是最简单的方式了。
总的来说,优化空间很多,这种方式比起传统的 RPC 来说肯定是有局限性的,但在实际特殊的应用场景中却可以发挥作用 |
23
xwhxbg 2020-03-17 16:18:23 +08:00
支持 crawlab 先,我在用 node 爬虫,要是能做成支持 docker 爬虫就好了,用户直接丢个镜像过去,也不用装依赖了
|
25
zunceng 2020-03-17 17:52:27 +08:00
|
26
myzWILLmake 2020-03-17 18:09:05 +08:00
@tikazyq 是的用的 Redis。队列性能的话只有单机测试 LPOP/LPUSH,基本可以达到每秒 10w 请求,够我们业务模型用了,实际业务压力测试瓶颈不在消息队列上。
|
28
tikazyq OP @myzWILLmake 那说明性能还是不错的,值得深入研究一下
|
29
huahuacui 2020-03-18 09:46:21 +08:00
我觉得可以,有些服务可以借鉴一下
|
30
piglovesx 2020-03-18 14:53:33 +08:00
学习了
|
31
123444a 2020-03-22 01:57:47 +08:00 via Android
楼主,我是服了,你不考虑 1%节点挂了的情况么,明明可以直连,却多此一举 Redis,你要是握手阶段就算了,居然一直不让双方直连,打算任务超时设多久?
|
33
123444a 2020-03-24 19:29:41 +08:00 via Android
@tikazyq 不止高可用问题,还有如何重试,如何调度,要不要幂等,有些 worker 如果阻塞,系统就歇菜了,其实比喻很简单就是我和你通过 email 来交互执行银行转账,缺点就是慢和不好处理异常情况
|
34
tikazyq OP @123444a 是有各种各样的问题存在,不过我们的项目已经这样用了,目前还没遇到什么问题,https://github.com/crawlab-team/crawlab
|