JDK9响应式流使用详解( 二 )


当发布者开始发送数据后,会异步的调用onNext方法并将数据传入 。该类中使用了一个计数器对数据数量进行校验,当达到最大值的时候,则会通过令牌(subscription)异步通知发布者订阅结束,然后发送者再异步的调用发订阅者的onComplete方法,以处理完成流程 。
其中的onError和onComplete方法只进行打印,这里就不再说了 。
以上的这个订阅者可以看作是一个push模型的实现,因为当开始订阅时订阅者就约定了需要接受的数量,然后在后续的处理(onNext)中不再请求新数据 。
我们可以用以下的代码创建一个名称为S1,消费2个元素的订阅者:
SimpleSubscriber sub1 = new SimpleSubscriber("S1", 2);订阅令牌交互当我们可以创建了发送者和订阅者之后,我们需要确认一下进行交互的顺序,由于响应流的处理就是对于事件的处理,所以事件的顺序十分重要,具体顺序如下:

  1. 我们创建一个发布者publisher一个订阅者subscriber
  2. 订阅者subscriber通过调用发布者的subscribe()方法进行信息订阅 。如果订阅成功,则发布者将生成一个令牌(Subscription)并作为入参调用订阅者的订阅事件方法onSubscribe() 。如果调用异常则会直接调用订阅者的onError错误处理方法,并抛出IllegalStateException异常然后结束订阅 。
  3. 在onSubscribe()中,订阅者需要通过调用令牌(Subscription)的请求方法request(long)来异步的向发布者请求数据 。
  4. 当发布者有数据可以发布的时候,则会异步的调用订阅者的onNext()方法,直到所有消息的总数已经满足了订阅者调用request的数据请求上限 。所以当订阅者请求订阅的消息数为Long.MAX_VALUE时,实际上是消费所有数据,即push模式 。如果发布者没有数据要发布了,则可以会调用发布者自己的close()方法并异步的调用所有订阅者的onComplete()方法来通知订阅结束 。
  5. 发布者可以随时向发布者请求更多的元素请求(一般在onNext里),而不用等到之前的处理完毕,一般是与之前的数据数量进行累加 。
  6. 放发布者遇到异常的时候会调用订阅者的onError()方法 。
上面的描述中是只使用的一个订阅者来进行描述的,后面的例子中将说明发布者可以拥有多个订阅者(甚至0个订阅者) 。
发送信息当发布者需要推送消息的时候会调用submit方法或者offer方法,上文中我们提到submit实际上是offer的一种简单实现,本节咱们自己比较一下 。
首先他们的方法签名为:
int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)int offer(T item, BiPredicate<Flow.Subscriber <? super T>,? super T> onDrop)int submit(T item)而submit 和 offer的直接方法为:
public int submit(T item) {return doOffer(item, Long.MAX_VALUE, null);}public int offer(T item,BiPredicate<Subscriber<? super T>, ? super T> onDrop) {return doOffer(item, 0L, onDrop);可以看到他们的底层调用的都是 doOffer 方法,而doOffer的方法签名为:
private int doOffer(T item, long nanos,BiPredicate<Subscriber<? super T>, ? super T> onDrop)所以我们可以直接看doOffer()方法 。doOffer()方法是可选阻塞时长的,而时长根据入参数nanos来决定 。而onDrop()是一个删除判断器,如果调用BiPredicate的test()方法结果为true则会再次重试(根据令牌中的nextRetry属性与发布器中的retryOffer()方法组合判断,但是具体实现还没梳理明白);如果结果为flase则直接删除内容 。doOffer()返回的结果为正负两种,正数的结果为发送了数据,但是订阅者还未消费的数据(估计值,因为是异步多线程的);如果为负数,则返回的是重拾次数 。
所以,根据submit()的参数我们可以发现,submit会一直阻塞直到数据可以被消费(因为不会阻塞超时,所以不需要传入onDrop()方法) 。而我们可以根据需要配置offer()选择器 。如果必须要求数据都要被消费的话,那就可以直接选择submit(),如果要设置重试次数的话就可以选择使用offer()
异步调用的例子下面看一个具体的程序例子,程序将以3秒为周期进行数据发布:
public class PeriodicPublisher {public static final int WAIT_TIME = 2;public static final int SLEEP_TIME = 3;public static void main(String[] args) {SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();// 创建4订阅者SimpleSubscriber subscriber1 = new SimpleSubscriber("S1", 2);SimpleSubscriber subscriber2 = new SimpleSubscriber("S2", 4);SimpleSubscriber subscriber3 = new SimpleSubscriber("S3", 6);SimpleSubscriber subscriber4 = new SimpleSubscriber("S4", 10);// 前三个订阅者直接进行订阅publisher.subscribe(subscriber1);publisher.subscribe(subscriber2);publisher.subscribe(subscriber3);// 第四个方法延迟订阅delaySubscribeWithWaitTime(publisher, subscriber4);// 开始发送消息Thread pubThread = publish(publisher, 5);try {// 等待处理完成pubThread.join();} catch (InterruptedException e) {e.printStackTrace();}}public static Thread publish(SubmissionPublisher<Integer> publisher, int count) {Thread t = new Thread(() -> {IntStream.range(1,count).forEach(item ->{publisher.submit(item);sleep(item);});publisher.close();});t.start();return t;}private static void sleep(Integer item) {try {System.out.printf("推送数据:%d 。休眠 3 秒 。%n", item);TimeUnit.SECONDS.sleep(SLEEP_TIME);} catch (InterruptedException e) {e.printStackTrace();}}private static void delaySubscribeWithWaitTime(SubmissionPublisher<Integer> publisher, Flow.Subscriber<Integer> sub) {new Thread(() -> {try {TimeUnit.SECONDS.sleep(WAIT_TIME);publisher.subscribe(sub);} catch (InterruptedException e) {e.printStackTrace();}}).start();}}


推荐阅读