Project Reactor:OptimizableOperator 原理

前言

通常来说在响应式编程中 Publisher 的创建到真正的订阅者中间会经过许多的响应式操作符,而大部分的操作符其实都是 OptimizableOperator 的实现。

随便举几个例子,例如:map,flatMap,filter,doOnNext 等等。基本上所有对上游数据做处理的函数都实现了 OptimizableOperator

准备工作

你需要对 Reactor 或者响应式编程有一定了解

推荐阅读:
「响应式编程入门之 Project Reactor」

OptimizableOperator

OptimizableOperator

首先 OptimizableOperator 继承了 CorePublisher,这没什么可说的,因为不管是使用 Mono 还是 Flux 执行完任何操作之后返回的依旧是一个 Publisher

下面我们来看下 OptimizableOperator 的核心方法,先来简单说下,后边会结合源码来详细理解

OptimizableOperator.subscribeOrReturn

该方法有两种方式实现

  1. 返回一个 CoreSubscriber,返回的订阅者包装了下游的实际订阅者

  1. 返回 null
    当选择这种实现方式时,当前 OptimizableOperator 自行消费真正的 Publisher。将自己作为下游 Subscriber 的发布者

这两种实现方式有什么区别呢?第一种方式仅仅是作为一个消费者订阅。而第二种方式,Operator 相当于翻身农奴把歌唱了,自己当上了 Publisher,能做的事情比一种方式更多

OptimizableOperator.source

返回当前 OptimizableOperator 的上游(上游可能还是一个 OptimizableOperator)

OptimizableOperator.nextOptimizableSource

返回链中的下一个 OptimizableOperator(如果上游是 OptimizableOperator 则返回,否则返回 null)

InternalMonoOperator

我们来通过 InternalMonoOperator 来理解一下上述的几个方法

InternalMonoOperator

InternalMonoOperator 构造方法

InternalMonoOperator 是所有 Mono 操作符的父类,在执行操作符动作之前都会先构建操作符对象,最终会调用到 InternalMonoOperator 构造方法。

  1. 入参 source 为当前操作符的上游,
  2. super(source) 内部实现是将上游赋值给成员变量 source
  3. 如果上游同样也是操作符,还会将其赋值给 optimizableOperator

subscribe

当最后一个操作符被订阅时会执行如下逻辑。看似很难读,其实和上面我们讲的是一样的,下面一起来分析下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public final void subscribe(CoreSubscriber<? super O> subscriber) {
// 将当前对象赋值给 operator
OptimizableOperator operator = this;
try {
while (true) {
// 调用操作符的 subscribeOrReturn
// 1. 返回 != null,我们认为 operator 已经对实际订阅者做了包装。所以继续调用下一个操作符(也就是当前操作符的上游)
// 2. 如果返回 == null,则代表 operator 要自己处理上游的消费和下游的订阅,方法结束
subscriber = operator.subscribeOrReturn(subscriber);
if (subscriber == null) {
// null means "I will subscribe myself", returning...
return;
}
// 返回当前 operator 的下一个操作符(也就是返回的上游的操作符)
OptimizableOperator newSource = operator.nextOptimizableSource();
if (newSource == null) {
// 如果所有 operator 都执行完了,那么可以直接向 Publisher 发起订阅了
// 订阅结束后,退出当前方法
operator.source().subscribe(subscriber);
return;
}
// 存在下一个操作符,继续执行该逻辑
operator = newSource;
}
}
catch (Throwable e) {
Operators.reportThrowInSubscribe(subscriber, e);
return;
}
}

在理解了 subscribe 的逻辑之后,我们在以后在阅读 Reactor 操作符的源码时,就可以清楚地知道只需要关注该操作符的 subscribeOrReturn 方法即可。

如果 subscribeOrReturn 仅是返回一个 subscriber,那么我们只需要关注其 Subscriber 的相关逻辑即可。如果返回的 null,则代表该操作符要自行消费上游然后向下游传递订阅,我们需要关注他的 Subscription 相关

最后

如果觉得我的文章对你有帮助,动动小手点下关注,你的支持是对我最大的帮助