V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
EricJia
V2EX  ›  宽带症候群

rust 组播/多播 延迟疑问

  •  
  •   EricJia · 240 天前 · 1433 次点击
    这是一个创建于 240 天前的主题,其中的信息可能已经有所发展或是发生改变。

    最近在测试 低延迟 rust tokio 组播,用 tokio 和 socket2 ,

    设备是 aws c7gn.metal arm 架构 64 核心。 1 个 sender ,n 个 receiver 。发送时间戳。 设置 linux route 走 lo 网卡。

    当 receiver20 个以内时。延迟大概是 40us 以内可控。 当 receiver 个数增加到 50-100 个以后, 延迟逐渐上升到 100~200us 以上。cpu 仍然占用很低,延迟显著增加,这是为什么呢?

    求解 如何能让 50+ receiver 同时接收, 还能保持 50us 以下延迟?

    use serde::{Deserialize, Serialize};
    use std::net::{Ipv4Addr, SocketAddrV4};
    const MULTICAST_ADDR: Ipv4Addr = Ipv4Addr::new(239, 255, 0, 1);
    const MULTICAST_PORT: u16 = 3001;
    const BIND_ADDR: Ipv4Addr = Ipv4Addr::LOCALHOST;
    
    pub fn timestamp16() -> u128 {
        SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_micros()
    }
    
    /// Networking options.
    #[derive(argh::FromArgs)]
    struct Args {
        /// multicast address that the socket must join
        #[argh(option, short = 'a', default = "MULTICAST_ADDR")]
        addr: Ipv4Addr,
        /// specific port to bind the socket to
        #[argh(option, short = 'p', default = "MULTICAST_PORT")]
        port: u16,
        /// whether or not to allow the UDP socket
        /// to be reused by another application
        #[argh(switch)]
        is_sender: bool,
    }
    
    fn main() -> std::io::Result<()> {
        init_logger();
        use socket2::{Domain, Protocol, Socket, Type};
        let Args {
            addr,
            port,
            is_sender,
        } = argh::from_env();
        println!("{} {} is_sender: {}", addr, port, is_sender);
        let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
        socket.set_nonblocking(true)?;
        socket.set_reuse_address(true)?;
        socket.set_reuse_port(true)?;
        socket.set_multicast_loop_v4(true)?;
        socket.set_multicast_ttl_v4(1)?;
    
        socket.join_multicast_v4(&addr, &Ipv4Addr::LOCALHOST)?;
        let fin_addr = SocketAddrV4::new(addr, port);
        if is_sender {
            socket.bind(&SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into())?;
        } else {
            socket.bind(&SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port).into())?;
        }
    
        let runtime = tokio::runtime::Builder::new_current_thread()
            .thread_name("network")
            .enable_all()
            .build()?;
        let udp = {
            let _guard = runtime.enter();
            tokio::net::UdpSocket::from_std(socket.into())?
        };
    
        runtime.block_on(async move {
            let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(1000));
            if is_sender == false {
                interval = tokio::time::interval(tokio::time::Duration::from_secs(60 * 60 * 24));
            }
            interval.tick().await;
    
            let mut buf = [0; 16];
            loop {
                tokio::select! {
                    recv_res = udp.recv_from(&mut buf) => {
                        let (count, remote_addr) = recv_res.expect("cannot receive from socket");
                        let parsed = u128::from_be_bytes(buf[..count].try_into().unwrap());
                        let cost = timestamp16() - parsed;
                        println!("{:?}", );!("recv {remote_addr} {parsed} {count} {cost}")
                    }
                    _ = interval.tick() => {
                        let cur = timestamp16();
                        let input = cur.to_be_bytes();
                        udp.send_to(&input, fin_addr).await.expect("cannot send message to socket");
                        println!("{:?}", );!("send: {}", cur);
                    }
                }
            }
        });
        Ok(())
    }
    
    7 条回复    2024-03-27 11:12:55 +08:00
    lsk569937453
        1
    lsk569937453  
       240 天前
    https://stackoverflow.com/questions/76589659/does-multithreaded-tokio-run-task-on-a-single-os-thread

    把 tokio::runtime::Builder::new_current_thread() 换成 new_multi_thread 试试??
    EricJia
        2
    EricJia  
    OP
       240 天前
    @lsk569937453 试过, 这个我测试下来结果 new_current_thread 延迟会更低。
    EricJia
        3
    EricJia  
    OP
       240 天前
    部署方式: supervisor 开了 60 个 receiver 程序, 用 args 指定的 sender receiver
    ```
    [program:sender1]
    stderr_logfile = /data/sender1
    command = /home/ubuntu/test_udp --is_sender

    [program:receiver0]
    stderr_logfile = /data/receiver0
    command = /home/ubuntu/test_udp
    [program:receiver1]
    stderr_logfile = /data/receiver1
    command = /home/ubuntu/test_udp
    ...
    ```
    rainmote
        4
    rainmote  
       239 天前
    检查下网卡软中断绑定 cpu 问题,有些机器绑定到 cpu0 可能出现性能瓶颈。
    everfly
        5
    everfly  
       239 天前
    是不是 udp.send_to 导致的延时增加?可以考虑改成 unbound_channel 异步发送看看。
    EricJia
        6
    EricJia  
    OP
       239 天前
    @everfly https://stackoverflow.com/questions/6866611/linux-multicast-sendto-performance-degrades-with-local-listeners 问题跟这个很像,listeners 增加后, 延迟以每个 2us 成倍增加。 已经设置 no_blocking, 跟 channel 没关系
    EricJia
        7
    EricJia  
    OP
       239 天前
    @rainmote https://stackoverflow.com/questions/6866611/linux-multicast-sendto-performance-degrades-with-local-listeners 问题跟这个很像,listeners 增加后, 延迟以每个 listener 2us 线性增加。 似乎跟 cpu 没关系?
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3011 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 21ms · UTC 14:09 · PVG 22:09 · LAX 06:09 · JFK 09:09
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.