创建 flux 代码如下,其中的 long consumer 可能会被下游多次调用。
Flux.create(new Consumer<FluxSink<Object>>() {
@Override
public void accept(FluxSink<Object> fluxSink) {
fluxSink.onRequest(new LongConsumer() {
@Override
public void accept(long value) {
log.info("我被多次调用了 request:" + value);
for (long i = 0; i < value; i++) {
fluxSink.next("request:" + i);
}
}
});
}
})
也就是说,我们不能决定下游调用的时机,调用的次数,调用的所在线程。这样就很容易产生 bug 。
FluxArray 解决此问题的办法是使用 Operators.addCap(REQUESTED, this, n) == 0
判断,
只有返回为 0 时,才进行处理,否则将请求的 n 叠加到 request 后就 return 。
public void request(long n) {
if (Operators.validate(n)) {
if (Operators.addCap(REQUESTED, this, n) == 0) {
if (n == Long.MAX_VALUE) {
fastPath();
}
else {
slowPath(n);
}
}
}
}
我们自己写 Flux.create() 时也可以借鉴 FluxArray 的处理办法,但是这样就变得麻烦了。 不知道有什么现有封装好的实现没有??