V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
Aidenboss
V2EX  ›  Java

轻量级 Java 应用消息通知中心

  •  
  •   Aidenboss · 2021-07-09 00:30:23 +08:00 · 2455 次点击
    这是一个创建于 1233 天前的主题,其中的信息可能已经有所发展或是发生改变。

    轻量级 Java 应用消息通知中心

    项目地址

    https://github.com/yemingfeng/kit-message

    项目背景
    1. 应用集群部署,并且使用了 local cache 。当要清除缓存时,通过 rpc / 消息队列清除,只能清除接收到消息的那个节点,无法清除整个应用集群的 local cache 导致,节点 2 、节点 3 存在脏数据。

    1. 应用集群部署,存在耗时的计算,为了减少计算资源浪费,某个节点更新后需要通知集群内其他节点更新

    这里的场景本质是,消息如何广播? 那么会有人问为什么不使用消息队列? 因为消息队列无法很优美的实现这里的场景。比如说 kafka,使用不同的 consumer_group 就可以实现,但不优雅。所以开启了 kit-message 这个项目。

    技术清单
    • java11
    • springboot
    • netty
    • redis
    核心逻辑

    • kit-message-center: 消息中心服务,接收 kit-message-client 订阅消息或者发布消息
    • kit-message-client:一个 Java 的轻量级 client 实现
    • kit-message-producer:基于 kit-message-client 发布消息
    • kit-message-consumer:基于 kit-message-client 订阅消息
    快速使用
    server 启动

    本地启动

    1. 修改 kit-message-center / application.yml 中 redis 的配置,配置遵循 springboot 规范
    2. mvn clean package
    3. java -jar kit-message-server/target/kit-message-center.jar
    4. server 会监听 8800 端口
    client 使用

    增加依赖

    
    <dependency>
      <groupId>io.github.yemingfeng</groupId>
      <artifactId>kit-message-client</artifactId>
      <version>1.0.0</version>
    </dependency>
    

    发布消息

    MessageProducer messageProducer = new MessageProducer("127.0.0.1", 8800);
    messageProducer.pub("topic1", "topic1:" + i);
    messageProducer.close();
    

    订阅消息

    MessageConsumer messageConsumer = new MessageConsumer("127.0.0.1", 8800);
    messageConsumer.sub("topic1", new BiConsumer<String, String>() {
        @Override
        public void accept(String topic, String payload) {
          System.out.println("topic1_1:" + topic + "\t" + payload);
        }
    });
    
    线上环境部署

    kit-message-server 支持集群部署,建议使用 nginx 做转发。

    stream {
        upstream kit-message-server {
            server server_ip1:8800;
            server server_ip2:8800; 
        }
    
        server {
           listen 8800;
           proxy_connect_timeout 1s;
           proxy_timeout 3s;
           proxy_pass kit-message-server;
        }
    }
    
    Q&A
    1. 这个项目使用场景?使用消息队列等中间件不香吗?

    答:这个项目是基于服务间消息通知这个场景的。解决问题更加明确,也更加轻量。

    1. 为什么不直接封装一个 redis-client 进行消息的收发?而是使用 client/server 的模式?

    答:kit-message-server 让项目更加通用,接入更加方便,依赖更少,管理维护成本更低。

    欢迎提反馈

    14 条回复    2021-07-23 08:55:27 +08:00
    MidCoder
        1
    MidCoder  
       2021-07-21 14:01:42 +08:00
    所有的有点都是自己 YY 的,你有相关一个系统引入一个新的组件带来的各种风险吗?你能确保你的这个组件有多少个 9 的可用性?以及面对真实的生产场景,面对上亿的消息量,你的 redis 机器,和你的 message-center 集群如何实现横向扩展?能否通过简单的加机器就能解决?我看现在你的架构都不具备。你当前的架构就是一个 Toy,练手我觉得可以,但是千万不要用于实际业务场景,否则业务会被害惨的。所以上面说的第一点,纯属自己 YY,当前哪个消息中间件整个稳定性和横向扩展性没有 99%以上?请用成人思维去思考。

    架构的本质是去繁化简,多增加一个组件和一行代码都是需要经过仔细思考的,所以你上面说的第二点,无疑是为了自己的技术热情而引入的没必要的架构复杂度

    欣赏你对技术的热情,可以作为技术分享对你的技术理解,但不要轻易的将自己的东西当做框架或者开源推出去,这是一种对技术不负责任的态度。
    Aidenboss
        2
    Aidenboss  
    OP
       2021-07-21 20:21:41 +08:00
    @MidCoder
    1. 引入的风险就看各自的承接了。只要达到 99.9% 可用即可
    2. 横向拓展是通过 redis 做的,这里的逻辑推演下即可
    3. 可以通过简单的加机器完成

    请你先了解好以上再来评价。
    MidCoder
        3
    MidCoder  
       2021-07-21 22:23:37 +08:00
    @Aidenboss
    1 、你这里 99.9%的可用率在亿级别场景肯定达不到,首先你的 center 如何做到容错和负载?这一点你的架构一点都没实现,怎么保障你说的三个九的可用?至少机器挂了你这一点容错能力都没有。
    2 、你这里不只是 redis 水平,还有你的 center,你的 center 只是单点?单点你就敢承载亿级别的场景?太异想天开了。如果你 center 要做分布式,那就涉及到 center 的发现和负载,以及 center 和 redis 之间的归属如何分配?不是你想的这么简单。

    学习角度我是赞赏的,但是不要那这个去做实际生产场景,赞赏你的分享精神,但是如果你想证明自己,请把这个东西做到你说的三个九。如果这样简单搞一下就保障了三个九,你以为大厂哪些人是吃素的?
    Aidenboss
        4
    Aidenboss  
    OP
       2021-07-21 22:52:02 +08:00
    @MidCoder
    你再仔细看下为啥 center 不是单点的吧
    MidCoder
        5
    MidCoder  
       2021-07-22 09:55:42 +08:00
    @Aidenboss 那整个架构 redis 将会是单点,虽然你的 center 可以通过 niginx 负载,但是如何解决单个 topic 消息量倾斜,导致 redis 集群负载不均衡,如果基于 redis,那 topic 的负载如何基于 reids 来实现,是你这个能否规模化的关键,至少这里没有实现或者体现。

    如果想真正研究大厂的基础架构,欢迎加 Vbieber-cn,来我厂一起搞事情
    Aidenboss
        6
    Aidenboss  
    OP
       2021-07-22 10:07:24 +08:00
    @MidCoder
    1. redis 单点就由 redis-cluster 解决。为啥要让应用层解决中间件单点的问题。
    2. center 是可以水平扩展的,已经解决了多 topic 的问题了。如果是单 topic 的消息负载,这点确实提醒了我,没有做。但使用 center + redis 参考 redis 模糊匹配的订阅模式即可。

    以上已经解决了你的问题。
    Aidenboss
        7
    Aidenboss  
    OP
       2021-07-22 11:31:56 +08:00
    Aidenboss
        8
    Aidenboss  
    OP
       2021-07-22 11:43:55 +08:00
    @Aidenboss
    重新补充下细节,topic 消息负载使用 topic 内消息分片解决。怎么解决呢?
    默认分成 8 个分片数「具体随意调整」
    前置条件:
    redis-cluster 保存每个 topic 的订阅者信息,以及订阅者的 key 。比如 key = topic_s_list,value = [c1:1,3,6,7,c2:2,4,5,8]
    每个消息都会按照 routing 算法计算出 key,每个消息都会发送到对应的 key 订阅通道,如:msg:1,代表发送到 msg1 发送给 topic1,只有 c1 才能接收到 msg1 的消息。


    当加入一个新的 topic 订阅者,就先发送 topic:stop 指令,c1 、c2 接收到指令后,之后的消息先缓存在 redis 的待发送 list topic_s_pengding 中
    c3 通过 lua 脚本,将 topic_s_list 修改为:[c1:1,3,4,c2:2,5,c3:6,7,8],并发送 sub_update 指令
    c1 、c2 接收指令后,重新订阅的 key 变成 topic:1 、topic:3 、topic:4 ; c2 的订阅的 key 是:topic:2 、topic:5 ; c3 订阅的 key 是 topic:6 、topic:7 、topic:8
    将 topic_s_pengding 中的 key 重新发送到 topic 中

    中间会有类似 kafka rebalance 的现象。但其实并不影响消息的生产。

    技术是为了解决问题的,提出没解决的点,照着点去设计就好。
    MidCoder
        9
    MidCoder  
       2021-07-22 13:23:12 +08:00
    @Aidenboss 是否通过真实的大规模场景验证你的这个方案?如果没有,如何验证你的方案是真的可行?
    第一:首先采用 redis 方案,看似把最难解决的消息存储交给 redis 已有解决方案来去解决。但是在真实大规模场景下,这会导致网络开销增加了一倍,因为多了一次 center 和 redis 的 request/response 。这种网络开销在亿级别的消息体量下,会严重影响性能
    第二:整个集群管理你如何保障?如何让全局感知整个 topic 分片的负载策略?以及当出现网络异常(你的 stop 命令都无法发出的时候)如何保障集群的一致性?以及消息的消费顺序如何保障,如何记录消息消费到了哪里?以及当消费端重启的时候,如何找到之前的消费位置?

    只能说你在逐步完善一个消息消费的最基本能力(消息通信),但是对于一个简单的 MQ 场景来说,这只是最简单的部分
    Aidenboss
        10
    Aidenboss  
    OP
       2021-07-22 16:47:58 +08:00
    @MidCoder
    你没理解这个场景,既然使用了 redis,就不需要保障消息可靠和一致性。
    这个场景只需要解决消息传递即可,这也是这个项目的本意。
    如果要做更牛的功能,直接用 RabittMQ 解决好了。
    不需要什么事情都推到大规模、亿万消息、机制性能。技术是为了解决问题而存在的。只要解决那个场景的问题,自然就有存在的价值。如果脱离的场景,只考虑技术难点,会忽略了解决这个问题的本质。
    MidCoder
        11
    MidCoder  
       2021-07-22 17:42:24 +08:00
    @Aidenboss 那你是如何解决消息消费者消费到一半,宕机或者重新部署,消息不丢失的?如果你不记录消息消费位置的?
    Aidenboss
        12
    Aidenboss  
    OP
       2021-07-22 19:04:47 +08:00
    @MidCoder 如果你需要解决,可以用: https://www.xuxueli.com/xxl-mq/
    MidCoder
        13
    MidCoder  
       2021-07-22 21:35:18 +08:00
    @Aidenboss 看来研究的开源项目不少,来不来我们这边搞事情?我们这边是搞基础架构的
    Jwyt
        14
    Jwyt  
       2021-07-23 08:55:27 +08:00
    好家伙,楼上是来面试的?
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2699 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 12:33 · PVG 20:33 · LAX 04:33 · JFK 07:33
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.