V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
• 请不要在回答技术问题时复制粘贴 AI 生成的内容
simonlu9
V2EX  ›  程序员

微服务中,消息队列要单独拆一个服务进行消费吗

  •  
  •   simonlu9 · 2023-02-02 17:00:55 +08:00 · 3512 次点击
    这是一个创建于 725 天前的主题,其中的信息可能已经有所发展或是发生改变。

    如题,有两个单体项目,一个是管理后台,一个是接口服务,有一个子模块群组消息队列,管理后台应用和接口都引用了,现在是同一个消费组,同一个消费者,因为不能重复消费,消费的时候进行轮询,现在的问题是

    1. 如果接口部署了多实例,同一个消费者会争夺同一个消息进行处理,浪费了大量线程资源,也不能提高消费效率
    2. 如果消费者的代码更改了,有时候没有更新到位,接口更新了,可能管理后台没更新,可能消费逻辑又不一样

    大家是否会碰到这种问题,docker 多实例部署会考虑定时任务,消息队列,在多个容器运行的情况吗, 定时任务已经通过 xxl-job 去解决这个问题,但消息队列不知道怎么处理,望赐教

    第 1 条附言  ·  2023-02-03 15:52:43 +08:00
    谢谢大家,springcloud stream 解决了我的问题,大概看了其中原理,不用自己配置消费者名称,底层已经帮你绑定了,在多实例的时候会部署也很方便,

    同时 springcloud stream 在消费失败的时候也作了相关处理,比如 重试机制,死信队列,相比我原生的用 redis stream 这些功能都要自己实现
    34 条回复    2023-03-21 21:45:53 +08:00
    199808lanlan1111
        1
    199808lanlan1111  
       2023-02-02 17:02:57 +08:00 via Android
    分成两个项目就意味着每次要打开两个 idea ,分支要高俩,发板要搞俩等等,可以先搞在一起,但是模块分开,后面请求量上来可以单独拆分保证稳定性可靠性
    wangxin3
        2
    wangxin3  
       2023-02-02 17:04:05 +08:00
    具体什么消息队列呢,rabbitmq 绑定在同一个队列上的消费者组是不会重复消费的
    kafka 也是同理,消费者配置为同一个消费者组也是不会重复消费的
    mooyo
        3
    mooyo  
       2023-02-02 17:05:21 +08:00
    前司分开了,现司没分开。感觉没区别。
    dolorain
        4
    dolorain  
       2023-02-02 17:06:17 +08:00
    看体量多大了,小打小闹肯定没必要了
    simonlu9
        5
    simonlu9  
    OP
       2023-02-02 17:06:44 +08:00
    @199808lanlan1111 现在就是同一个项目的,只是不同 application ,就消费队列这个问题不好处理,能不能中心化
    199808lanlan1111
        6
    199808lanlan1111  
       2023-02-02 17:08:41 +08:00 via Android
    @199808lanlan1111 没审题 说错了,但 op 的问题我只看懂了最后一个问题,需不需要考虑多实例问题。

    首先肯定要考虑的,这就是分布式系统的特性。消息队列你不需要考虑,消息会靠队列进行负载均衡,每个实例会会处理一个或者多个队列的消息
    koloonps
        7
    koloonps  
       2023-02-02 17:10:24 +08:00
    “如果接口部署了多实例,同一个消费者会争夺同一个消息进行处理,浪费了大量线程资源,也不能提高消费效率” rabbitmq 在消息没有退回 /超时之前 mq 服务器不会重新推送
    simonlu9
        8
    simonlu9  
    OP
       2023-02-02 17:10:37 +08:00
    @wangxin3 同一个消费组,是用 redis stream,只是消费者名称都是写死在代码里面,所以多实例,最终还是同时消费一条消息,除非部署多实例的时候消费者名称动态配置
    wangxin3
        9
    wangxin3  
       2023-02-02 17:18:52 +08:00
    @simonlu9 #8 原文:“@wangxin3 同一个消费组,是用 redis stream,只是消费者名称都是写死在代码里面,所以多实例,最终还是同时消费一条消息,除非部署多实例的时候消费者名称动态配置”
    ======
    回复:不理解你说的 可以画个架构图?
    colincat
        10
    colincat  
       2023-02-02 17:22:05 +08:00
    @simonlu9 该消息模块是否可以动态传入 groupName ,当管理后台引用是使用管理后台消费组-配置到配置文件中,当接口服务使用时使用 接口消费组
    simonlu9
        11
    simonlu9  
    OP
       2023-02-02 17:25:56 +08:00
    @wangxin3 代码逻辑大概是这样,考虑以下方法再多实例运行,消费组是同一个,消费者名称是写死
    @Override
    public void run(ApplicationArguments args) throws Exception {
    StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, ChatGroupUserDTO>> options =
    StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
    .batchSize(10)
    .executor(executor)
    .pollTimeout(Duration.ofSeconds(5))
    .targetType(ChatGroupUserDTO.class)
    .build();
    StreamMessageListenerContainer<String, ObjectRecord<String, ChatGroupUserDTO>> container =
    StreamMessageListenerContainer.create(redisConnectionFactory, options);

    prepareChannelAndGroup(redisTemplate.opsForStream(), MESSAGE_STREAM, MESSAGE_GROUP);

    container.receive(Consumer.from(MESSAGE_GROUP, "consumer-1"),
    StreamOffset.create(MESSAGE_STREAM, ReadOffset.lastConsumed()),
    messageListener);
    this.container = container;
    // 启动监听
    this.container.start();
    logger.info("{}启动成功",MESSAGE_STREAM);
    }
    leeraya
        12
    leeraya  
       2023-02-02 17:30:07 +08:00
    现司就是专门搞了一个消息队列中转服务,根据不同的策略接收转发消息。
    好处就是整个服务网格内几乎所有用到消息的服务都走中转,问题排查集中在中转站存的消息日志。
    不过就是要专门有人维护这种基础设施服务,又是搞消息的,费人工。
    wangxin3
        13
    wangxin3  
       2023-02-02 17:30:54 +08:00
    @simonlu9 #11 原文:
    回复:consumer-1 保证消费者名称不重复不就行了?你现在可以单实例,把这个代码在运行一份,两份不同的消费者名称,但在同一个消费者组,看看是否重复消费了。
    simonlu9
        14
    simonlu9  
    OP
       2023-02-02 17:38:33 +08:00
    @wangxin3 在同一个消费组,消费者名称一样的话,假设有三个实例,thread-1 thread-2 thread-3, 每条消息只会投递到某个实例,不会三个实例都投放,我的主要问题是如果像这种多个实例,好像会饿死线程,大量浪费
    wangxin3
        15
    wangxin3  
       2023-02-02 17:44:21 +08:00
    @simonlu9 #14
    ======
    回复:怎么会呢,发布订阅模式不就是 redis 有消息才会给消费者发消息吗,redis 只有一条消息,轮询到实例 A 了,就发给实例 A ,实例 B 和 C 该干啥干啥呀,怎么会饿死线程,大量浪费。实例 B 和 C 又不会因为消费者线程阻塞在等 redis 发消息,有消息才会处理呀。
    nothingistrue
        16
    nothingistrue  
       2023-02-02 17:54:46 +08:00   ❤️ 1
    微服务是跟着业务走,不跟着技术实现方式走的。消息队列消费者,绝大多数情况下,都不对应一种业务(具体的说就是实体、实体表、限界上下文这些),当然不能单独拆成一个服务。

    问题 1 ,可以通过配置消费者组进行解决,一个消费者组,同一个消费者只会按调度规则扔给唯一的消费者。RabbitMq 、Kafka 都这只,Spring Cloud Stream 还提供了超简单的实现方式(不过运维要麻烦点,后面说)。

    问题 2 ,我没明白你说得是什么,看起来像是系统升级时候的配合问题,这个解决起来稍微麻烦但也不是不能解决。生产者消费者如果不能同时更新,那么消息协议上,就要考虑多版本同时兼容的问题了。

    最后说说运维麻烦的地方。多实例的时候,对于接口调用,是不用区分具体哪个示例的,负载均衡机制随便选一个就行了,所以实例无需明确 ID ,随机生成都可以。但是对于消费者组,就不能那么随意了,通常都是要明确给出实例 ID 的,不能随机生成,这会增加部署的麻烦成都。。
    simonlu9
        17
    simonlu9  
    OP
       2023-02-02 18:07:43 +08:00
    @wangxin3 我算一下资源成本,一个业务一个主题,10 个业务就 10 线程在监听队列,如果再单台机器部署多实例,10*n 个线程在跑,没意义
    simonlu9
        18
    simonlu9  
    OP
       2023-02-02 18:09:52 +08:00
    @nothingistrue 但是对于消费者组,就不能那么随意了,通常都是要明确给出实例 ID 的,不能随机生成,可以说说这块运维一般怎么搞的吗,公司运维也是我
    liyanggyang
        19
    liyanggyang  
       2023-02-02 18:42:45 +08:00
    消费过后,数据存储的在一个地方,那不就没任何问题了。
    关于消费者放在哪儿的问题,我理解,看这个消息的类型:
    1. 这个消息是用户业务相关的,那么放在接口服务。比如接口服务是账单服务,那么支付系统发送过来的支付消息,就在接口服务。
    2. 这个消息只是后台管理相关的,那么就放在管理后台。比如消息队列是 xxx 公司报表系统发送过来的,需要做 xxx 管控,那么就在管理后台,因为它与用户业务无关。
    simonlu9
        20
    simonlu9  
    OP
       2023-02-02 18:57:01 +08:00
    @liyanggyang 举一个很简单例子,比如群吧,解散后会有后续动作,接口端可以解散, 管理后台也可以解散,都是共用一个逻辑
    wolfie
        21
    wolfie  
       2023-02-02 18:59:37 +08:00
    > 如果接口部署了多实例,同一个消费者会争夺同一个消息进行处理,浪费了大量线程资源,也不能提高消费效率

    广播?
    liyanggyang
        22
    liyanggyang  
       2023-02-02 19:21:28 +08:00
    @simonlu9 这样来说,业务功能是面向用户的,管理后台是面向软件运维者。
    这个例子,那这个很好理解了,群创建或解散,属于业务的功能(当然你说后台也具备功能,这说法没问题,但是后台只是给软件的管理员提供一个集中管理的便捷),所以这个肯定是接口服务,一个群解散的接口服务,消息在接口服务里面消费。管理后台走内部通道去调用接口服务(即接口服务是 api1 ,可以专门写一个内部接口 api2 ,但是 api1 2 里面的实现 service.dell()是一个 )
    kjstart
        23
    kjstart  
       2023-02-03 05:21:07 +08:00
    根据异步负载决定, 因为要分别做弹性伸缩
    kjstart
        24
    kjstart  
       2023-02-03 05:22:48 +08:00   ❤️ 1
    可以用一套代码, 用配置文件决定是否拉取消息就可以了
    nothingistrue
        25
    nothingistrue  
       2023-02-03 09:22:09 +08:00
    @simonlu9 #17 生产者、消费者、消息中间件之间,只有消息中间件是服务器端需要维持多个连接,生产者、消费者都是客户端,只需要用一个线程维持一条连接,不管有多少消息队列。你把消费者监听消息,跟处理消息这两个阶段混在一起看了,这俩不能放到一个线程上同步搞,不然就算你一个队列一个线程,都要严重阻塞。通常来说,消费者收到消息后,是要把实际处理,再转交给异步执行器(对 Java 来说就是线程池)的,这样只需一个线程负责接受消息,一个执行器(线程池)负责处理消息。
    @simonlu9 #18 如果是单纯的随机分配或均衡分配的消费者组的话,应该也是无需给出实例 ID 的。
    winglight2016
        26
    winglight2016  
       2023-02-03 09:34:41 +08:00
    @nothingistrue #25 说得对,我司以前就是把监听消息和业务处理放在一起,会导致非常严重的问题。我现在正在做优化,完全分离 MQ 和业务,专门做一个单独的应用来负责映射转发。

    另外,lz 担心的应用如何确定具体实例问题,一般业务后台如果是多实例部署,前面会放一个 LB ,由 LB 来决定具体哪个实例来响应请求
    nothingistrue
        27
    nothingistrue  
       2023-02-03 09:41:48 +08:00   ❤️ 1
    关于实例 ID 这部分,这个重点是,消费者必须明确的告知消息中间件两个属性:一共有多少个实例,当前实例区分其他实例的标识(可以简单的只是个序号)。这是业务无关的,只跟运维实例的部署配置有关。配置的内容也不多,只需要消费者多加两个配置项,但是这俩配置项管理起来比较麻烦,因为它们的值不可预定义,只能在部署的时候动态决定,且每次部署都要重新决定。

    单纯负载均衡或者随机分配的多实例消费者组的话,理论上是无需理会实例 ID 的。当消息不是自动均衡分配,而是按规则分区分配(比如说 1-5 给实例 1 ,6-10 给实例 2 )的时候,就必须明确给出实例 ID 了。

    我这里参考的是 Spring Cloud Stream ( https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-partitioning )。请注意这些规则不是 Spring Cloud Stream 决定的,是消息中间件决定的,Spring Cloud Stream 只是做了上层抽象使其用起来更简单些。
    nothingistrue
        28
    nothingistrue  
       2023-02-03 09:49:08 +08:00
    @winglight2016 #25
    只需要把监听消息和业务处理在应用内部解开就行了,不要拆成不同应用。监听消息和业务处理放在一个服务中,就只是底层的一个线程和异步线程池,对上层业务逻辑没影响。你要分成两个应用,先不考虑资源浪费问题,业务逻辑就搞复杂了。通常来说,映射转发也就几条规则,处理几十个队列,不管是业务逻辑还是性能上,都不具备独立出去的需要。只有映射转发规则多到需要专门的管理界面的时候,才能考虑独立出去。
    morty0
        29
    morty0  
       2023-02-03 10:16:56 +08:00
    @nothingistrue 全部消息都异步处理, 监听服务收到消息就直接 ack 吗, 还是等异步处理完 ack 呢?
    NoKey
        30
    NoKey  
       2023-02-03 10:19:49 +08:00
    没怎么看懂,大概给你出个主意:
    1. 把 kakfa 拿来好好研究一下
    2. 如果有很多服务器,然后更新可能不及时,消息连结构都改了,会导致旧的消费者拿到之后出错,那么,新增一个 topic ,新旧 topic 共存一段时间,等全部更新完,应该就 ok 了。
    看看有没其他大神有好办法。
    nothingistrue
        31
    nothingistrue  
       2023-02-03 10:39:37 +08:00
    @morty0 #28 如果是异步处理,那么收到即表示成功,自然是收到就 ack 。但不是所有消息都要异步处理,这个具体要看是啥消息。ack 或者 死信机制,只对同步处理的消息有作用,异步处理的消息,需要有其他机制做异常处理。
    winglight2016
        32
    winglight2016  
       2023-02-03 11:38:35 +08:00
    @nothingistrue #28 我们的场景不太一样,需要容器化部署在 k8s 上,而且基于 python ,必须在单独的容器中启动监听服务。然后,这个容器进程如果爆出异常又没有捕获会导致整个容器都不能正常启动,所以必须和业务代码分离。另外,虽然只有几条规则,但是不能写在代码里,不然规则变化也会导致重新打包和部署。
    zeonll
        33
    zeonll  
       2023-02-03 16:49:05 +08:00
    要分开,方便隔灾和扩容
    cnlinjie
        34
    cnlinjie  
       2023-03-21 21:45:53 +08:00
    spring cloud stream 官方好像没有支持 redis 的库,你是用 `https://github.com/spring-attic/spring-cloud-stream-binder-redis` 这套吗?
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1146 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 16:58 · PVG 00:58 · LAX 08:58 · JFK 11:58
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.