本文目标
- 理解响应式编程
前言
之前的 《聊聊 IO 多路复用》 中,我们理解了非阻塞 IO 的意义。但是 Spring MVC 并不能完美的应用非阻塞编程,于是 Spring 团队开发了 WebFlux,而 WebFlux 的基础正是本文要讲到的 Project Reactor(下文简称为 Reactor)
本文以 Reactor 为例带大家入门响应式编程
版本
1 | <dependency> |
什么是 Reactor
Reactor 是 JVM 的非阻塞响应式编程基础,支持背压。 它直接与 Java 8 函数式 API 集成,特别是 CompletableFuture、Stream 和 Duration。 它提供了可组合的异步序列 API — Flux(用于 [N] 个元素)和 Mono(用于 [0|1] 个元素),并实现了 Reactive Streams 规范。
在 Reactor 的基础上还演化出了适合微服务架构的 Reactor Netty 。为 HTTP(包括 Websockets)、TCP 和 UDP 提供支持背压和响应式的网络引擎。
上面是对于官方文档的翻译。下面来说说我自己对 Reactor 和响应式编程的理解。
回想一下之前的非阻塞 IO 编程,例如我们现在要用非阻塞的方式调用一个远程服务,当远程接口数据可用时去做一些业务处理。这时候代码怎么写呢?我们需要提供一个回调函数,然后在响应就绪的时候,去调用我们的回调函数。
从逻辑上来看,这完全没有问题。但是如果我们的回调很复杂,代码看起来会是什么样呢?
1 | // 以下案例来自 Reactor 官网 |
这个代码说实话已经有点回调地狱那味儿了,让一段不是很复杂的逻辑变得很难读了。但是如果用 Reactor 写呢?
1 | // 以下案例来自 Reactor 官网 |
可以看到,代码变得非常的简洁。唯一带来的困扰就是,我们不知道这些函数到底是啥意思 😂
响应式编程虽然有非常多的特性,但是它并不是什么神奇的技术,它也是建立在传统命令式编程的基础上。只不过它所提供的 API 以及规范更适合在非阻塞 IO 中使用。虽然在非阻塞 IO 框架中几乎只使用响应式编程(Vertx,WebFlux),只是因为这样做更合适,并不是说没了响应式编程,就玩不了非阻塞 IO 了。
响应式编程内幕
Reactor 实现了 org.reactivestreams
提供的 Java 响应式编程规范,我们只要了解 reactivestreams 中代码是如何运转的,再看 Reactor 相关的代码就容易多了。
下图展示了 reactivestreams 中的核心接口
Publisher:发布者
Subscriber:订阅者
Subscription:这个单词中文翻译为名词的订阅,在代码中它是发布者和订阅者之间的媒介
Processor:该接口继承了发布者和订阅者,可以理解为发布者和订阅者的中间操作(但是 Reactor 的中间操作并没有实现 Processor,在最新版本的 Reactor 中,Processor 的相关实现接口已经被弃用)
在了解了响应式编程的核心接口之后,我们来看下响应式编程是如何运作的
在 Reactor 中大部分实现都是按照上图的逻辑来执行的
- 首先是Subscriber(订阅者)主动订阅 Publisher(发布者),通过调用 Publisher 的 subscribe 方法
- Publisher 在向下游发送数据之前,会先调用 Subscriber 的 onSubscribe 方法,传递的参数为 Subscription(订阅媒介)
- Subscriber 通过 Subscription#request 来请求数据,或者 Subscription#cancel 来取消数据发布(这就是响应式编程中的背压,订阅者可以控制数据发布)
- Subscription 在接收到订阅者的调用后,通过 Subscriber#onNext 向下游订阅者传递数据。
- 在数据发布完成后,调用 Subscriber#onComplete 结束本次流,如果数据发布或者处理遇到错误会调用 Subscriber#onError
调用 Subscriber#onNext,onComplete,onError 这三个方法,可能是在 Publisher 中做的,也可能是在 Subscription 中做的,根据不同的场景有不同的实现方式,并没有什么严格的要求。可以认为 Publisher 和 Subscription 共同配合完成了数据发布
其实 Reactor 中 API 实现原理也都是这个套路,我这边也自己写了个例子便于让读者加深对响应式编程的理解
1 | import org.reactivestreams.Publisher; |
响应式编程思想
响应式编程,就像装配一条流水线。Publisher 规定了数据如何生产,中间会有 Operators(操作符)对流水线的数据进行解析,校验,转换等等操作,最终处理好的数据流转到 Subscriber。
这条流水线还有一个特点。大部分情况下当 Publisher 的 subscribe 方法被调用之前,什么都不会发生。在被订阅之前我们只是在定义流水线该如何工作,直到真正有人需要的时候,流水线才会启动。
Reactor 中的 Operator
Operators 怎么理解呢?对于上游来说,Operators 像一个订阅者,而对于它的下游来说,它像一个发布者(我们上文说过了 Reactor 中的中间操作并没有实现 Processor 接口)
1 | Mono.just("hello") |
举个简单的例子,在上面的代码中,map 就是一个 Operator,它的实现思路是什么?来看下面的代码
1 | // 注意,这是我基于 Reactor API 实现的伪代码! |
上述代码是我自己实现的一个伪代码,用于让大家理解操作符的实现思路,实际 Reactor 代码也是这个思路,只不过实现的更加巧妙和严谨
我们首先来分析一下 Mono.just("hello").map(a -> a + "world")
这句话
- 当执行到 Mono.just 时,会新建一个 MonoJust 对象作为当前的 Publisher。该发布者的逻辑是,当订阅时,向下游发送数据 “hello”
当执行到 map 方法时,会新建一个 MonoMap 对象替作为当前的 Publisher,MonoJust 成为了 MonoMap 中的一个属性 source(实际的上游)
当 MonoMap 被订阅时,会先将它的下游 actual 做一层包装,也就是我们上面的 MonoMapSubscriber。然后去调用 source 的 subscribe 方法。上游发布数据时,MonoMapSubscriber 先对数据进行转换(我们上面的拼接字符串操作),然后再发送给 actual(它的下游)
当 MonoMap 被再次转换时,MonoMap 就变成了下游操作符的 source…
最后通过一张图来总结一下
Reactor 该如何学习
本文并没有介绍太多 Reactor 的细节,因为这些东西实在是太多了。我想聊聊我自己是如何学习 Reactor 的
如果你已经通过本文理解了响应式编程的核心接口是如何工作的了,那恭喜你已经迈向了成功的第一步了。接下来就是阅读官方文档,不断的练习和阅读 Reactor 的源码。源码追踪的方向已经很明确了,当我们想了解一个发布者的实现原理是什么,我就要去关注这个发布者的 subscribe 方法和 Subscription 都做了什么。想了解消费者的逻辑,就看它的 onNext,onComplete,onError。
最后
如果觉得我的文章对你有帮助,动动小手点下关注,你的支持是对我最大的帮助