webflux的delay原理详解

反应式编程一开始是从前端和客户端开始兴起,现在大有蔓延到后端的趋势,Spring5推出的webflux就是反应式编程的产物。

webflux对比于springMVC,性能高出很多,网上已经有很多的测评,不再在过多说明。

左图同步,右图异步

上图看出对比于同步,异步所用的线程是比较少的,不过有个前提是,程序逻辑中有阻塞(如io阻塞等),且这种阻塞是可以异步化的。

为了满足这个前提,反应式编程框架就必须将这些阻塞变成异步化,如新出的WebClient工具就是将http请求io异步化。

delay方法就是用来代替sleep方法的,下面来讲解一下delay方法是怎么将延时异步化的。

源码解读

  • 通过查看Mono<Long> delay(Duration duration)方法源码,它会构造一个MonoDelay类,并通过传入全局公用的调度器Schedulers.parallel()来调度里面的异步任务。
1
2
3
4
5
6
7
public static Mono<Long> delay(Duration duration) {
return delay(duration, Schedulers.parallel());
}

public static Mono<Long> delay(Duration duration, Scheduler timer) {
return onAssembly(new MonoDelay(duration.toMillis(), TimeUnit.MILLISECONDS, timer));
}
  • 查看MonoDelay类的订阅方法subscribe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void subscribe(CoreSubscriber<? super Long> actual) {
MonoDelayRunnable r = new MonoDelayRunnable(actual);

actual.onSubscribe(r);

try {
//重点在于下面的 timedScheduler.schedule(r, delay, unit)
//通过timedScheduler来调度延时任务,而不是当前线程阻塞等待
r.setCancel(timedScheduler.schedule(r, delay, unit));
}
catch (RejectedExecutionException ree) {
if(r.cancel != OperatorDisposables.DISPOSED) {
actual.onError(Operators.onRejectedExecution(ree, r, null, null,
actual.currentContext()));
}
}
}
  • 查看ParallelScheduler的delay方法:
1
2
3
4
public Disposable schedule(Runnable task, long delay, TimeUnit unit) {
//pick方法会获取一个ScheduledExecutorService线程执行器给到Schedulers使用
return Schedulers.directSchedule(pick(), task, delay, unit);
}
  • 查看directSchedule方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static Disposable directSchedule(ScheduledExecutorService exec,
Runnable task,
long delay,
TimeUnit unit) {
//包装任务
SchedulerTask sr = new SchedulerTask(task);
Future<?> f;
if (delay <= 0L) {
f = exec.submit((Callable<?>) sr);
}
else {
//延时调度
//ScheduledExecutorService是java自带的并发调度接口,
//通过一条线程轮询延时队列来避免所有线程阻塞
f = exec.schedule((Callable<?>) sr, delay, unit);
}
//设置结果
sr.setFuture(f);

return sr;
}

自此就可以知道为什么delay方法没有阻塞线程,因为它的延时处理都交给了ScheduledExecutorService执行器处理,调用delay方法的主线程就直接返回了,等到延时时间过后,ScheduledExecutorService就会从线程池就获取一个线程来处理延时后的任务逻辑。整个流程就类似于上面图片中的右图。

通过反应式编程范式,将所有阻塞都修改为类似于delay之于sleep的形式,就能大幅度提升服务性能了。

  • 本文作者:二当家的
  • 本文链接: 2019/07/28/webflux的delay原理详解/
  • 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 CN 许可协议。转载请注明出处!
  • 彩蛋: 左边Overview微信公众号二维码,扫描它获取更多技术信息