public class ReactiveSubscription<O> extends PushSubscription<O>
A Reactive Subscription using a pattern called "reactive-pull" to dynamically adapt to the downstream subscriber
capacity:
- If no capacity (no previous request or capacity drained), queue data into the buffer CompletableQueue
- If capacity (previous request and capacity remaining), call subscriber onNext
Queued data will be polled when the next request(n) signal is received. If there is remaining requested volume, it will be added to the current capacity and therefore will let the next signals to be directly pushed. Each next signal will decrement the capacity by 1.
Modifier and Type | Field and Description |
---|---|
protected CompletableQueue<O> |
buffer |
protected long |
currentNextSignals |
protected boolean |
draining |
protected long |
maxCapacity |
PENDING_UPDATER, pendingRequestSignals, publisher, subscriber, TERMINAL_UPDATER, terminated
Constructor and Description |
---|
ReactiveSubscription(Stream<O> publisher,
org.reactivestreams.Subscriber<? super O> subscriber) |
ReactiveSubscription(Stream<O> publisher,
org.reactivestreams.Subscriber<? super O> subscriber,
CompletableQueue<O> buffer) |
Modifier and Type | Method and Description |
---|---|
long |
capacity() |
long |
currentNextSignals() |
CompletableQueue<O> |
getBuffer() |
long |
getBufferSize() |
boolean |
isComplete() |
void |
maxCapacity(long maxCapacity) |
void |
onComplete() |
void |
onNext(O ev) |
void |
request(long elements) |
boolean |
shouldRequestPendingSignals() |
String |
toString() |
void |
updatePendingRequests(long n) |
accept, cancel, equals, getPublisher, getSubscriber, hashCode, hasPublisher, markAsDeferredStart, markAsStarted, onError, onRequest, pendingRequestSignals, start, terminate, wrap
protected final CompletableQueue<O> buffer
protected boolean draining
protected volatile long currentNextSignals
protected volatile long maxCapacity
public ReactiveSubscription(Stream<O> publisher, org.reactivestreams.Subscriber<? super O> subscriber)
public ReactiveSubscription(Stream<O> publisher, org.reactivestreams.Subscriber<? super O> subscriber, CompletableQueue<O> buffer)
public void request(long elements)
request
in interface org.reactivestreams.Subscription
request
in class PushSubscription<O>
public void onNext(O ev)
onNext
in class PushSubscription<O>
public void onComplete()
onComplete
in class PushSubscription<O>
public long currentNextSignals()
public void updatePendingRequests(long n)
updatePendingRequests
in class PushSubscription<O>
public boolean shouldRequestPendingSignals()
shouldRequestPendingSignals
in class PushSubscription<O>
public final void maxCapacity(long maxCapacity)
maxCapacity
in class PushSubscription<O>
public final long getBufferSize()
public final long capacity()
public final CompletableQueue<O> getBuffer()
public final boolean isComplete()
isComplete
in class PushSubscription<O>
public String toString()
toString
in class PushSubscription<O>
Copyright © 2017. All rights reserved.