这个接口的处理类似这样
List<Data> dataList = xxx.paralleStream().map(e->dealData(e)).collect(Collector.toList());
其中有的 dealData 会用 http 调用外部接口。我在每个 dealData 都打印了时间,基本都在 50ms 左右徘徊。按理说这个方法的处理时间应该是耗时最长的那个,但是最终结果却差的很远。这个接口除了并发流基本就没什么操作了。服务器间的时延可以忽略不计。好神奇
public static void test1() throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
for( int i=0;i<1000;i++) {
int index = i;
long time = System.currentTimeMillis();
List<String> list = forkJoinPool.submit(() -> Stream.of(ary).parallel().map(s -> getString(index)).collect(Collectors.toList())).get();
System.out.println("cost time :" + (System.currentTimeMillis() - time));
}
}
public static void test2(){
for( int i=0;i<1000;i++) {
int index = i;
long time = System.currentTimeMillis();
List<String> list = Stream.of(ary).parallel().map(s -> getString(index)).collect(Collectors.toList());
System.out.println("cost time :" + (System.currentTimeMillis() - time));
}
}
public static String getString(int index) {
int i = new Random().nextInt(50);
try {
Thread.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":" +index + "cost time " + i);
return "";
}
试着跑了一下,test1自定义线程池之后,会一直用到池里面的线程。 test2用的共享池,竟然还会用主线程main在跑。所以时间差距是不是在这块。
public static void test1() throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool(30);
long time = System.currentTimeMillis();
List<String> list = forkJoinPool.submit(() -> Stream.of(ary).parallel().map(s -> getString()).collect(Collectors.toList())).get();
System.out.println("cost time :" + (System.currentTimeMillis() - time));
}
public static String getString() {
int i = new Random().nextInt(50);
try {
Thread.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "";
}
我用jmeter20并发压测发现,这个性能也并不是很好。要比50ms多两倍多
1
Jooooooooo 2021-10-28 22:21:39 +08:00
相差很远啥意思, 怎么统计的耗时?
|
2
aureole999 2021-10-28 22:23:18 +08:00
这个只是告诉 java 你这个流可以并行执行,不代表一定要并行执行,更不代表所有的子项全都同时并行,或者说根本不会所有子项都同时执行。具体好像是通过 ForkJoinPool 线程池取得线程执行,默认线程数不超过你的 cpu 的个数。你在每个 dealData 打出当前线程看就知道了。
|
3
luxinfl OP |
4
Jooooooooo 2021-10-28 23:29:25 +08:00
@luxinfl 很有可能是 2 楼说的问题
|
5
546L5LiK6ZOt 2021-10-28 23:41:49 +08:00 via iPhone
我也遇到过 2 楼说的问题,一般都是自定义一个 forkjoinpool
|
6
dqzcwxb 2021-10-29 00:09:50 +08:00
Fork/Join 的策略就是会用主线程跑看源码就知道了
|
7
luxinfl OP @dqzcwxb 这个以前用得少,没注意过。。而且我发现用 jmeter 压测,ForkJoinPool 的性能并不是太好的样子。。和线程数有很大关系
|
8
dqzcwxb 2021-10-29 00:46:34 +08:00 1
@luxinfl #7 parallelStream 默认是 cpu 核心数的线程数,而且只推荐 cpu 密集型运算时使用
io 密集型请使用 Completablefuture+Fork/Join 线程池 ForkJoinPool 因为有工作窃取机制性能比其他线程池要高得多,多测试即可知道 |
9
dqzcwxb 2021-10-29 00:46:54 +08:00
|
10
luxinfl OP @dqzcwxb 主要是有多个 io 调用,而且 parallel 并发调用写起来方便。。没太搞明白 CompletableFuture 怎么写才能并发调用。要写多个 supplyAsync 调用麽
|
11
ZeroDu 2021-10-29 10:11:58 +08:00
确实如楼上所说的,forkjoinpool 线程数默认是 cpu 核心数,而且这个只适合 CPU 密集型任务。贴主这种情况 io 密集型得用 CompletableFuture 但是这个写起来当然么有 parallelStream 丝滑
|