RabbitMQ 增加多消费者导致生产者推送消息阻塞(10s~30s) 设备上传数据到系统 A(netty),系统 A 根据数据类型推送消息到不同的队列,因为设备量增多的原因,之前单消费者开始处理的不及时,就想着多增加个消费者(和之前的消费者代码一样),然后系统 A 推送消息开始出现卡顿,数据帧应答的很慢,感觉不像是流控的事,管理端看着也没问题 相关代码: 系统 A:
channelRead(ChannelHandlerContext ctx, Object msg){
....
sendAck(ctx,ack);
switch (data.getClass().getName()) {
case "realTimeData":
RabbitUtil.getInstance().publish(realTimeData);
}
}
publish(RealTimeData realTimeData){
.......
Map<String, Object> header = new HashMap<String, Object>();
header.put("DataType", "RealTimeData");
BasicProperties props = new BasicProperties().builder().headers(header).build();
channel.basicPublish(exchangeName, routeKey_CollectedData, props, CollectedRealTimeDataPackageTransform.toBytes(data));
}
channel init:
private Channel channel;
private ConnectionFactory factory = new ConnectionFactory();
@PostConstruct
public void init() {
instance = this;
factory.setUsername(mqUserName);
factory.setPassword(mqPassword);
factory.setHost(mqHost);
factory.setVirtualHost(mqVirtualHost);
factory.setPort(mqPort);
}
channel = factory.newConnection().createChannel();
}
消费者代码:
@Autowired
DataProcessor processor;
@Autowired
@Qualifier("threadpool")
ThreadPoolExecutor threadPool;
@RabbitListener(queues = "${mq.queue.Original.CollectedData}", ackMode = "MANUAL")
public void process(Message msg, Channel channel) {
MessageProperties mp = msg.getMessageProperties();
Map<String, Object> headers = mp.getHeaders();
String dataType = (String) headers.get("DataType");
switch (dataType) {
case "RealTimeData":
CompletableFuture.runAsync(() -> {
try {
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
CollectedRealTimeData crtd = CollectedRealTimeDataPackageTransform.fromBytes(msg.getBody());
processor.process(crtd);
} catch (Exception e) {
try {
channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
} catch (IOException ioException) {
ioException.printStackTrace();
}
e.printStackTrace();
}
}, threadPool);
break;
}
}
1
qW7bo2FbzbC0 2022-06-21 09:51:09 +08:00
用 rabbit 经常会遇到 server no response 的情况,只能手动 kill 进程重新启动,改用 kafka 后未出现同样问题。我们的场景比较简单,切换起来很快。我们这边观测 rabbit client 很多或消息大的话,对 server 有压力。和 rabbit 的性能宣传语不同,可能使我们的使用方式有问题吧
|
2
wupher 2022-06-21 10:27:15 +08:00
没碰到类似的情况,可能是量级未到?
之前写的一个系统,使用 RabbitMQ 进行多端通讯。日常大约在 5000 ~ 8000 个客户端进行数据交换,同步消息和异步消息都有。 消费者一直都是多节点通过 RabbitListener 连接 RabbitMQ 。刚才又看了一下 application.yml 批量获取一次 10 条,concurrency 5 max 10 之前未碰到过 publish 超时的情况。 你用的版本是? |
3
withBruce OP concurrency 这个属性开多线程不是消费的同一批数据把
concurrency 5 max 10 配置好这个问题解决了 还是自己对于 mq 没弄明白 谢谢了! |
5
withBruce OP @qW7bo2FbzbC0 谢谢
|