V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
Joker123456789
V2EX  ›  Java

分享一个 Java 开发的并发编程工具包

  •  
  •   Joker123456789 · 50 天前 · 1483 次点击
    这是一个创建于 50 天前的主题,其中的信息可能已经有所发展或是发生改变。

    Magician-Concurrent

    Magician-Concurrent 是一个并发编程工具包,当你需要并发执行某些代码的时候,不需要自己创建和管理线程,除此之外里面还提供了生产者与消费者模型

    初始化配置

    导入依赖

    <dependency>
        <groupId>com.github.yuyenews</groupId>
        <artifactId>Magician-Concurrent</artifactId>
        <version>1.0.0</version>
    </dependency>
    

    并发处理任务

    MagicianConcurrent.getConcurrentTaskSync()
                    .setTimeout(1000) // 超时时间
                    .setTimeUnit(TimeUnit.MILLISECONDS) // 超时时间的单位
                    .add(() -> { // 添加一个任务
    
                        // 在这里可以写上任务的业务逻辑
    
                    }, (result, e) -> {
                        // 此任务处理后的回调
                        if(result.equals(ConcurrentTaskResultEnum.FAIL)){
                            // 任务失败,此时 e 里面有详细的异常信息
                        } else if(result.equals(ConcurrentTaskResultEnum.SUCCESS)) {
                            // 任务成功,此时 e 是空的
                        }
                    })
                    .add(() -> { // 添加一个任务
    
                        // 在这里可以写上任务的业务逻辑
    
                    }, (result, e) -> {
                        // 此任务处理后的回调
                        if(result.equals(ConcurrentTaskResultEnum.FAIL)){
                            // 任务失败,此时 e 里面有详细的异常信息
                        } else if(result.equals(ConcurrentTaskResultEnum.SUCCESS)) {
                            // 任务成功,此时 e 是空的
                        }
                    })
                    .start();
    

    添加进去的任务会并发执行,但是在它们执行完之前,这整个代码块会同步等待在这,一直等到所有任务执行完或者超时才会继续往下走。

    这里面的超时时间就是用来设置同步等待多久的。

    • 如果设置为 0 表示一直等到所有任务完成为止
    • 设置为大于 0 的时候,表示只等待这么久

    并发处理 List ,Set 等所有 Collection 类的集合里的元素

    同步执行

    // 假如有一个 List 需要并发处理里面的元素
    List<String> dataList = new ArrayList<>();
    

    每个元素并发执行

    // 只需要将他传入 syncRunner 方法即可
    MagicianConcurrent.getConcurrentCollectionSync()
            .syncRunner(dataList, data -> {
    
                // 这里可以拿到 List 里的元素,进行处理
                // List 里的元素是什么类型,这个 data 就是什么类型
                System.out.println(data);
            
            }, 
            10, // 每组多少条元素
            1, // 每组之间同步等待多久
            TimeUnit.MINUTES // 等待的时间单位
            );
    

    这个方法会将传进去的集合分成若干组,每组的大小由参数指定。

    这些组会排队执行,但是每一组在执行的时候都是并发的,里面的每一个元素都会由单独的线程去处理。

    需要等一组处理完了,才会处理下一组,但是有时候我们不想这么死板的等待,所以可以设置一个超时时间,超过了这个期限就不等了,直接进行下一组,所以这里的最后两个参数就是用来设置这个期限的。

    每一组并发执行

    // 也可以用 syncGroupRunner 方法
    MagicianConcurrent.getConcurrentCollectionSync()
            .syncGroupRunner(dataList, data -> {
    
                // 这里可以拿到每一组的元素,进行处理
                // 这个 data 就是每一组 List ,可以自己迭代处理
                System.out.println(data);
            
            }, 
            10, // 每组多少条元素
            1, // 每组之间同步等待多久
            TimeUnit.MINUTES // 等待的时间单位
            );
    

    这个方法会将传进去的集合分成若干组,每组的大小由参数指定。

    每一组由单独的线程处理。

    会一直同步等待在这里,直到所有组都处理完了才会进行下一步,但是有时候我们不想这么死板的等待,所以可以设置一个超时时间,超过了这个期限就不等了,直接执行下一步。所以这里的最后两个参数就是用来设置这个期限的。

    异步执行

    其实就是将上面 [同步处理] 的代码放到了一个线程里,内部处理依然是上面 [同步处理] 的逻辑,但是这整个代码块将会异步执行,不需要等在这。所以个别相同的参数就不再重复解释了。

    // 假如有一个 List 需要并发处理里面的元素
    List<String> dataList = new ArrayList<>();
    

    每个元素并发执行

    // 只需要将他传入 asyncRunner 方法即可
    MagicianConcurrent.ConcurrentCollectionAsync(
                    1, // 核心线程数
                    1, // 最大线程数
                    1, // 线程空闲时间
                    TimeUnit.MINUTES // 空闲时间单位
                    .asyncRunner(dataList, data -> {
    
                // 这里可以拿到 List 里的元素,进行处理
                System.out.println(data);
        
            }, 
            10, // 每组多少条元素
            1, // 每组之间同步等待多久
            TimeUnit.MINUTES // 等待的时间单位
            );
    

    ConcurrentCollectionAsync 里的参数其实就是线程池的参数,除了上面这种写法,还可以这样写。

    每调用一次 asyncRunner 都会占用一个线程,而这些线程都是由一个线程池在管理。

    ConcurrentCollectionAsync concurrentCollectionAsync = MagicianConcurrent.ConcurrentCollectionAsync(
                    1, // 核心线程数
                    1, // 最大线程数
                    1, // 线程空闲时间
                    TimeUnit.MINUTES // 空闲时间单位
                    );
    
    concurrentCollectionAsync.asyncRunner(dataList, data -> {
    
                // 这里可以拿到 List 里的元素,进行处理
                System.out.println(data);
        
            }, 
            10, // 每组多少条元素
            1, // 每组之间同步等待多久
            TimeUnit.MINUTES // 等待的时间单位
            );
    
    concurrentCollectionAsync.asyncRunner(dataList2, data -> {
    
                // 这里可以拿到 List 里的元素,进行处理
                System.out.println(data);
        
            }, 
            10, // 每组多少条元素
            1, // 每组之间同步等待多久
            TimeUnit.MINUTES // 等待的时间单位
            );
    
    concurrentCollectionAsync.asyncRunner(dataList3, data -> {
    
                // 这里可以拿到 List 里的元素,进行处理
                System.out.println(data);
        
            }, 
            10, // 每组多少条元素
            1, // 每组之间同步等待多久
            TimeUnit.MINUTES // 等待的时间单位
            );
    

    用这个方法可以管理线程池

    // 关闭线程池
    concurrentCollectionAsync.shutdown();
    
    // 立刻关闭线程池
    concurrentCollectionAsync.shutdownNow();
    
    // 获取线程池
    ThreadPoolExecutor threadPoolExecutor = concurrentCollectionAsync.getPoolExecutor();
    

    每一组并发执行

    // 也可以用 asyncGroupRunner 方法,每个参数的具体含义可以参考文档
    MagicianConcurrent.ConcurrentCollectionAsync(
                    1, // 核心线程数
                    1, // 最大线程数
                    1, // 线程空闲时间
                    TimeUnit.MINUTES // 空闲时间单位
                    .asyncGroupRunner(dataList, data -> {
            
                // 这里可以拿到 List 里的元素,进行处理
                System.out.println(data);
            
            }, 
            10, // 每组多少条元素
            1, // 每组之间同步等待多久
            TimeUnit.MINUTES // 等待的时间单位
    

    同上

    并发处理所有 Map 类的集合里的元素

    Map 的逻辑跟 Collection 一模一样,只不过是传入的集合变成了 Map ,就不再累述了,感谢理解。

    同步执行

    每个元素并发执行

    // 假如有一个 Map 需要并发处理里面的元素
    Map<String, Object> dataMap = new HashMap<>();
    
    // 只需要将他传入 syncRunner 方法即可
    MagicianConcurrent.getConcurrentMapSync()
            .syncRunner(dataMap, (key, value) -> {
    
                // 这里可以拿到 Map 里的元素,进行处理
                System.out.println(key);
                System.out.println(value);
            
            }, 10, 1, TimeUnit.MINUTES);
    

    每一组并发执行

    // 也可以用 syncGroupRunner 方法
    MagicianConcurrent.getConcurrentMapSync()
            .syncGroupRunner(dataMap, data -> {
    
                // 这里可以拿到每一组 Map 进行处理
                System.out.println(data);
            
            }, 10, 1, TimeUnit.MINUTES);
    

    异步执行

    每个元素并发执行

    // 假如有一个 Map 需要并发处理里面的元素
    Map<String, Object> dataMap = new HashMap<>();
    
    // 只需要将他传入 asyncRunner 方法即可
    MagicianConcurrent.getConcurrentMapAsync(
                    1,
                    1,
                    1,
                    TimeUnit.MINUTES
                    ).asyncRunner(dataMap, (key, value) -> {
    
                // 这里可以拿到 Map 里的元素,进行处理
                System.out.println(key);
                System.out.println(value);
        
            }, 10, 1, TimeUnit.MINUTES);
    

    每一组并发执行

    // 也可以用 asyncGroupRunner 方法
    MagicianConcurrent.getConcurrentMapAsync(
                    1,
                    1,
                    1,
                    TimeUnit.MINUTES
                    ).asyncGroupRunner(dataMap, data -> {
            
                // 这里可以拿到每一组 Map 进行处理
                System.out.println(data);
            
            }, 10, 1, TimeUnit.MINUTES);
    

    生产者与消费者

    这是一个多对多的模型,多个生产者可以给多个消费者推送不同类型的数据,

    // 创建一组生产者与消费者,而这样组可以创建无限个
    // 每一组的生产者都只会把数据推送给同一组的消费者
    MagicianConcurrent.getProducerAndConsumerManager()
                    .addProducer(new MagicianProducer() { // 添加一个生产者(可以添加多个)
    
                        /**
                         * 设置 ID ,必须全局唯一,默认是当前类的全名
                         * 如果采用默认值,可以不重写这个方法
                         * @return
                         */
                        @Override
                        public String getId() {
                            return super.getId();
                        }
    
                        /**
                         * 设置 producer 方法是否重复执行,默认重复
                         * 如果采用默认值,可以不重写这个方法
                         * @return
                         */
                        @Override
                        public boolean getLoop() {
                            return super.getLoop();
                        }
    
                        /**
                         * 设置 是否等消费者全部空闲了才继续生产下一轮数据,默认 false
                         * 如果采用默认值,可以不重写这个方法
                         * @return
                         */
                        @Override
                        public boolean getAllFree() {
                            return super.getAllFree();
                        }
    
                        /**
                         * 当生产者启动后,会自动执行这个方法,我们可以在这个方法里生产数据,并通过 publish 方法发布给消费者
                         *
                         * 这边举一个例子
                         * 假如我们需要不断地扫描某张表,根据里面的数据状态去执行一些业务逻辑
                         * 那么我们可以在这个方法里写一个查询的逻辑,然后将查询到数据发送给消费者
                         */
                        @Override
                        public void producer() {
                            // 根据上面的例子,我们可以查询这张表里符合条件的数据
                            List<Object> dataList = selectList();
    
                            // 然后将他推送给消费者
                            // 可以推送任意类型的数据
                            this.publish(dataList);
    
                            /*
                             * 如果你只需要执行一次,那么到此就结束了,这个生产者也可以被回收掉了
                             *
                             * 但是如果你需要不断地执行上述操作,来维护这张表里的数据,这个时候你有两种做法
                             * 第一种:加一个 while 循环
                             *      但是这种方式有个问题,如果消费者的消费速度跟不上,那么就很容易造成消费者队列积压,出现内存问题。
                             *      而数据积压太久又会影响时效性,可能你推送给消费者的时候,这条数据需要处理,但是等到被消费的时候又不需要处理了,这样容易出现数据错乱的问题。
                             *
                             * 第二种:等消费者把你推给他的数据消费完了,再推送下一轮,而我们就是采用的这种
                             *      如果你想用这种方式,那么你不需要再写其他的任何逻辑,只需要将上面提到的 getLoop 方法重写一下,并返回 true 即可
                             *      当你设置为 true 以后,生产者在推送完一轮后会不断地监视消费者,当发现了空闲的消费者才会推送下一轮数据,并且数据只会推送给这个空闲的消费者
                             *
                             * 如果你想等所有消费者都空闲了以后再推送下一轮,而不是发现一个空闲的就推送一轮
                             * 那么你可以重写上面提到的 getAllFree 方法,返回 true 即可
                             */
    
                        }
                    })
                    .addConsumer(new MagicianConsumer() { // 添加一个消费者,可以添加多个
    
                        /**
                         * 设置 ID ,必须全局唯一,默认是当前类的全名
                         * 如果采用默认值,可以不重写这个方法
                         * @return
                         */
                        @Override
                        public String getId() {
                            return super.getId();
                        }
    
                        /**
                         * 心跳通知,消费者每消费一个任务,都会触发一下这个方法
                         * 我们可以根据他触发的频率来判断这个消费者的活跃度
                         *
                         * 注意!!!
                         * 这个方法里不可以有耗时的操作,不然会将消费者阻塞的
                         * 如果一定要加耗时的操作,那么务必在新线程里搞
                         * @param id
                         */
                        @Override
                        public void pulse(String id) {
                            new Thread(()->{
                                // 如果你需要在这个方法里搞一些耗时的操作,那么务必要像这样开启一个新线程
                                // 不然消费者会被阻塞的
                            }).start();
                        }
    
                        /**
                         * 消费频率限制,默认 10 毫秒,取值范围:0 - long 的最大值,单位:毫秒
                         *
                         * 如果任务执行的耗时小于 execFrequencyLimit ,则等待 execFrequencyLimit 毫秒后再消费下一个任务
                         *
                         * 首先这是一个生产者和消费者多对多的模型结构,我们以一个生产者对多个消费者来举例
                         * 生产者生产的数据只有一份,但是他会推送给多个消费者
                         * 而我们之所以要配置多个消费者,是因为需要他们执行不同的业务逻辑
                         * 多个消费者执行的业务逻辑不同,也就意味着他们需要的数据大概率会不同
                         *
                         * 比如消费者 A 需要处理男性的数据,消费者 B 需要处理女性的数据
                         * 如果生产者刚好连续推送了几批男性的数据,那么这会导致消费者 B 筛选不到女性数据,那么他就不会处理业务逻辑了
                         * 这么一来,消费者 B 就会无限接近空转,而空转会引起 CPU 占用率过大,所以必须加以限制
                         *
                         * 千万不要小看这个问题,本人曾经在实战中亲测过,做不做这个限制,CPU 的占有率会达到 10 倍的差距
                         * 当然了,这跟消费者的业务逻辑还是有一定关系的,具体情况具体看待
                         * 如果你的消费者几乎不会出现空转,那么这里可以设置为 0
                         *
                         */
                        @Override
                        public long getExecFrequencyLimit() {
                            return super.getExecFrequencyLimit();
                        }
    
                        /**
                         * 这个方法会接收到生产者推送过来的数据
                         * 在里面执行相应的业务逻辑即可
                         * @param data
                         */
                        @Override
                        public void doRunner(Object data) {
                            // data 可以是任何类型
                            // 因为能给他推送数据的消费者是固定的,所以 data 有可能收到的类型也是固定的
                            // 所以我们可以在这里自己判断,然后转化即可
                            // 为什么不用泛型?这是为了兼容多个生产者,因为他们推送的数据类型可能会不同
                        }
                    })
                    .start();
        }
    

    项目官网:https://magician-io.com

    1 条回复    2024-10-03 10:40:49 +08:00
    xmtpw
        1
    xmtpw  
       49 天前 via iPhone
    收藏备用,谢谢分享
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3440 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 20ms · UTC 10:46 · PVG 18:46 · LAX 02:46 · JFK 05:46
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.