O
- The type of the output valuespublic abstract class Stream<O> extends Object implements org.reactivestreams.Publisher<O>, NonBlocking
map(reactor.fn.Function)
and filter(reactor.fn.Predicate)
methods.
A Stream can be implemented to perform specific actions on callbacks (doNext,doComplete,doError,doOnSubscribe).
It is an asynchronous boundary and will run the callbacks using the input Dispatcher
. Stream can
eventually produce a result <O>
and will offer cascading over its own subscribers.
*
Typically, new Stream
aren't created directly. To create a Stream
,
create a Streams
and configure it with the appropriate Environment
,
Dispatcher
, and other settings.
Modifier | Constructor and Description |
---|---|
protected |
Stream() |
Modifier and Type | Method and Description |
---|---|
Control |
adaptiveConsume(Consumer<? super O> consumer,
Function<Stream<Long>,? extends org.reactivestreams.Publisher<? extends Long>> requestMapper)
|
Control |
adaptiveConsumeOn(Dispatcher dispatcher,
Consumer<? super O> consumer,
Function<Stream<Long>,? extends org.reactivestreams.Publisher<? extends Long>> requestMapper)
|
Stream<Void> |
after()
Only forward onError and onComplete signals into the returned stream.
|
Control |
batchConsume(Consumer<? super O> consumer,
Function<Long,? extends Long> requestMapper)
|
Control |
batchConsumeOn(Dispatcher dispatcher,
Consumer<? super O> consumer,
Function<Long,? extends Long> requestMapper)
|
Stream<O> |
broadcast()
Subscribe a new
Broadcaster and return it for future subscribers interactions. |
Stream<O> |
broadcastOn(Dispatcher dispatcher)
Subscribe a new
Broadcaster and return it for future subscribers interactions. |
<E extends org.reactivestreams.Subscriber<? super O>> |
broadcastTo(E subscriber)
Subscribe the passed subscriber, only creating once necessary upstream Subscriptions and returning itself.
|
Stream<List<O>> |
buffer()
Collect incoming values into a
List that will be pushed into the returned Stream every
time getCapacity() has been reached, or flush is triggered. |
Stream<List<O>> |
buffer(int maxSize)
Collect incoming values into multiple
List buckets that will be pushed into the returned Stream
every time getCapacity() has been reached. |
Stream<List<O>> |
buffer(int maxSize,
int skip)
Collect incoming values into a
List that will be pushed into the returned Stream every time maxSize has been reached by any of them. |
Stream<List<O>> |
buffer(int maxSize,
long timespan,
TimeUnit unit)
Collect incoming values into a
List that will be pushed into the returned Stream every
timespan OR maxSize items. |
Stream<List<O>> |
buffer(int maxSize,
long timespan,
TimeUnit unit,
Timer timer)
Collect incoming values into a
List that will be pushed into the returned Stream every
timespan OR maxSize items |
Stream<List<O>> |
buffer(long timespan,
long timeshift,
TimeUnit unit)
Collect incoming values into multiple
List buckets created every timeshift that will be pushed
into the returned Stream every
timespan. |
Stream<List<O>> |
buffer(long timespan,
long timeshift,
TimeUnit unit,
Timer timer)
Collect incoming values into multiple
List buckets created every timeshift that will be pushed
into the returned Stream every
timespan. |
Stream<List<O>> |
buffer(long timespan,
TimeUnit unit)
Collect incoming values into a
List that will be pushed into the returned Stream every
timespan. |
Stream<List<O>> |
buffer(long timespan,
TimeUnit unit,
Timer timer)
Collect incoming values into a
List that will be pushed into the returned Stream every
timespan. |
Stream<List<O>> |
buffer(org.reactivestreams.Publisher<?> bucketOpening,
Supplier<? extends org.reactivestreams.Publisher<?>> boundarySupplier)
Collect incoming values into a
List that will be moved into the returned Stream every time the
passed boundary publisher emits an item. |
Stream<List<O>> |
buffer(Supplier<? extends org.reactivestreams.Publisher<?>> boundarySupplier)
Collect incoming values into a
List that will be moved into the returned Stream every time the
passed boundary publisher emits an item. |
Stream<O> |
cache()
Cache all signal to this
Stream and release them on request that will observe any values accepted by this
Stream . |
boolean |
cancelSubscription(PushSubscription<O> oPushSubscription)
Try cleaning a given subscription from the stream references.
|
Stream<O> |
capacity(long elements)
Bind the stream to a given volume of in-flight data:
- An
Action will request up to the defined volume upstream |
<E> Stream<E> |
cast(Class<E> stream)
Cast the current Stream flowing data type into a target class type.
|
<E> CompositeAction<E,O> |
combine()
Combine the most ancient upstream action to act as the
Subscriber input component and
the current stream to act as the Publisher . |
<V> Stream<V> |
concatMap(Function<? super O,org.reactivestreams.Publisher<? extends V>> fn)
Assign the given
Function to transform the incoming value T into a Stream<O,V> and pass
it into another Stream . |
Stream<O> |
concatWith(org.reactivestreams.Publisher<? extends O> publisher)
lift(Supplier) all the nested Publisher values from this current upstream and then on
complete consume from the
passed publisher. |
Control |
consume()
Instruct the stream to request the produced subscription indefinitely.
|
Control |
consume(Consumer<? super O> consumer)
|
Control |
consume(Consumer<? super O> consumer,
Consumer<? super Throwable> errorConsumer)
|
Control |
consume(Consumer<? super O> consumer,
Consumer<? super Throwable> errorConsumer,
Consumer<Void> completeConsumer)
|
Control |
consume(long n)
Instruct the action to request upstream subscription if any for N elements.
|
Control |
consumeLater()
Defer a Controls operations ready to be requested.
|
Control |
consumeOn(Dispatcher dispatcher,
Consumer<? super O> consumer)
|
Control |
consumeOn(Dispatcher dispatcher,
Consumer<? super O> consumer,
Consumer<? super Throwable> errorConsumer)
|
Control |
consumeOn(Dispatcher dispatcher,
Consumer<? super O> consumer,
Consumer<? super Throwable> errorConsumer,
Consumer<Void> completeConsumer)
|
Stream<Long> |
count()
Count accepted events for each batch and pass each accumulated long to the .
|
Stream<Long> |
count(long i)
Count accepted events for each batch and pass each accumulated long to the .
|
<V> Stream<V> |
decode(Codec<O,V,?> codec)
Transform a sequence of codec source elements into codec input elements through
Codec.decode(Publisher) |
Stream<O> |
defaultIfEmpty(O defaultValue)
Create an operation that returns the passed value if the Stream has completed without any emitted signals.
|
<X> Stream<X> |
dematerialize()
Transform the incoming onSubscribe, onNext, onError and onComplete signals into
Signal . |
Stream<O> |
dispatchOn(Dispatcher dispatcher)
Assign a new Dispatcher to the returned Stream.
|
Stream<O> |
dispatchOn(Environment environment)
Assign a new Environment and its default Dispatcher to the returned Stream.
|
Stream<O> |
dispatchOn(Environment environment,
Dispatcher dispatcher)
Assign the a new Dispatcher and an Environment to the returned Stream.
|
Stream<O> |
distinct()
Create a new
Stream that filters in only unique values. |
<V> Stream<O> |
distinct(Function<? super O,? extends V> keySelector)
Create a new
Stream that filters in only values having distinct keys computed by function |
Stream<O> |
distinctUntilChanged()
Create a new
Stream that filters out consecutive equals values. |
<V> Stream<O> |
distinctUntilChanged(Function<? super O,? extends V> keySelector)
Create a new
Stream that filters out consecutive values having equal keys computed by function |
PushSubscription<O> |
downstreamSubscription()
Get the current action child subscription
|
Stream<Tuple2<Long,O>> |
elapsed()
|
Stream<O> |
elementAt(int index)
Create a new
Stream that emits an item at a specified index from a source Stream |
Stream<O> |
elementAtOrDefault(int index,
O defaultValue)
Create a new
Stream that emits an item at a specified index from a source Stream
or default value when index is out of bounds |
<V> Stream<V> |
encode(Codec<V,?,O> codec)
Transform a sequence of codec output elements into codec source elements through
Codec.encode(Publisher) |
Stream<O> |
env(Environment environment)
Assign an Environment to be provided to this Stream Subscribers
|
Stream<Boolean> |
exists(Predicate<? super O> predicate)
Create a new
Stream that emits true when any value satisfies a predicate
and false otherwise |
<T,V> Stream<V> |
fanIn(FanInAction<T,?,V,? extends FanInAction.InnerSubscriber<T,?,V>> fanInAction)
lift(Supplier) all the nested Publisher values to a new Stream calling the logic
inside the provided fanInAction for complex merging strategies. |
Stream<Boolean> |
filter()
Evaluate each accepted boolean value.
|
Stream<O> |
filter(Predicate<? super O> p)
Evaluate each accepted value against the given
Predicate . |
Stream<O> |
finallyDo(Consumer<Signal<O>> consumer)
Attach a
Consumer to this Stream that will observe terminal signal complete|error. |
<V> Stream<V> |
flatMap(Function<? super O,? extends org.reactivestreams.Publisher<? extends V>> fn)
Assign the given
Function to transform the incoming value T into a Stream<O,V> and pass
it into another Stream . |
long |
getCapacity()
Return defined element capacity, used to drive new
Subscription
request needs. |
Dispatcher |
getDispatcher()
Get the dispatcher used to execute signals on this Stream instance.
|
Environment |
getEnvironment()
Get the assigned
Environment . |
Timer |
getTimer()
Get the current timer available if any or try returning the shared Environment one (which may cause an exception
if no Environment has been globally initialized)
|
<K> Stream<GroupedStream<K,O>> |
groupBy(Function<? super O,? extends K> keyMapper)
Re-route incoming values into a dynamically created
Stream for each unique key evaluated by the
{param keyMapper}. |
Stream<O> |
ignoreError()
Connect an error-proof action that will transform an incoming error signal into a complete signal.
|
<E> Stream<O> |
ignoreError(Predicate<? super Throwable> ignorePredicate)
Connect an error-proof action based on the given predicate matching the current error.
|
boolean |
isReactivePull(Dispatcher dispatcher,
long producerCapacity)
Get the assigned
Dispatcher . |
<V> Stream<List<V>> |
join()
|
<V> Stream<List<V>> |
joinWith(org.reactivestreams.Publisher<? extends V> publisher)
|
Stream<O> |
keepAlive()
Prevent a
Stream to be cancelled. |
Stream<O> |
last()
Create a new
Stream that will signal the last element observed before complete signal. |
<V> Stream<V> |
lift(Supplier<? extends Action<O,V>> action)
Defer the subscription of an
Action to the actual pipeline. |
Stream<O> |
log()
Attach a
Logger to this Stream that will observe any signal emitted. |
Stream<O> |
log(String name)
Attach a
Logger to this Stream that will observe any signal emitted. |
<V> Stream<V> |
map(Function<? super O,? extends V> fn)
Assign the given
Function to transform the incoming value T into a V and pass it into
another Stream . |
Stream<Signal<O>> |
materialize()
Transform the incoming onSubscribe, onNext, onError and onComplete signals into
Signal . |
<V> Stream<V> |
merge()
|
Stream<O> |
mergeWith(org.reactivestreams.Publisher<? extends O> publisher)
lift(Supplier) all the nested Publisher values from this current upstream and from the
passed publisher. |
Stream<Stream<O>> |
nest()
Create a new
Stream whose only value will be the current instance of the Stream . |
Promise<O> |
next()
Return the promise of the next triggered signal.
|
Stream<O> |
observe(Consumer<? super O> consumer)
|
Stream<O> |
observeCancel(Consumer<Void> consumer)
Attach a
Consumer to this Stream that will observe any cancel signal |
Stream<O> |
observeComplete(Consumer<Void> consumer)
Attach a
Consumer to this Stream that will observe any complete signal |
<E extends Throwable> |
observeError(Class<E> exceptionType,
BiConsumer<Object,? super E> onError)
Assign an error handler that will pass eventual associated values and exceptions of the given type.
|
Stream<O> |
observeStart(Consumer<? super org.reactivestreams.Subscription> consumer)
Attach a
Consumer to this Stream that will observe any onSubscribe signal |
Stream<O> |
observeSubscribe(Consumer<? super org.reactivestreams.Subscriber<? super O>> consumer)
Attach a
Consumer to this Stream that will observe any subscribe signal |
<E extends Throwable> |
onErrorResumeNext(Class<E> exceptionType,
org.reactivestreams.Publisher<? extends O> fallback)
Subscribe to a fallback publisher when exceptions of the given type occur, otherwise propagate the error.
|
Stream<O> |
onErrorResumeNext(org.reactivestreams.Publisher<? extends O> fallback)
Subscribe to a fallback publisher when any exception occurs.
|
<E extends Throwable> |
onErrorReturn(Class<E> exceptionType,
Function<E,? extends O> fallback)
Produce a default value when exceptions of the given type occur, otherwise propagate the error.
|
Stream<O> |
onErrorReturn(Function<Throwable,? extends O> fallback)
Produce a default value if any exception occurs.
|
Stream<O> |
onOverflowBuffer()
Attach a No-Op Action that only serves the purpose of buffering incoming values if not enough demand is signaled
downstream.
|
Stream<O> |
onOverflowBuffer(Supplier<? extends CompletableQueue<O>> queueSupplier)
Attach a No-Op Action that only serves the purpose of buffering incoming values if not enough demand is signaled
downstream.
|
Stream<O> |
onOverflowDrop()
Attach a No-Op Action that only serves the purpose of dropping incoming values if not enough demand is signaled
downstream.
|
Stream<GroupedStream<Integer,O>> |
partition()
Re-route incoming values into a dynamically created
Stream for each unique key evaluated by the
{param keyMapper}. |
Stream<GroupedStream<Integer,O>> |
partition(int buckets)
Re-route incoming values into a dynamically created
Stream for each unique key evaluated by the
{param keyMapper}. |
<E> Stream<E> |
process(org.reactivestreams.Processor<O,E> processor)
Create a
Tap that maintains a reference to the last value seen by this Stream . |
Stream<O> |
recover(Class<? extends Throwable> exceptionType,
org.reactivestreams.Subscriber<Object> recoveredValuesSink)
Create a new
Stream which will re-subscribe its oldest parent-child stream pair if the exception is of
the given type. |
<A> Stream<A> |
reduce(A initial,
BiFunction<A,? super O,A> fn)
Reduce the values passing through this
Stream into an object A . |
Stream<O> |
reduce(BiFunction<O,O,O> fn)
Reduce the values passing through this
Stream into an object T . |
Stream<O> |
repeat()
Create a new
Stream which will keep re-subscribing its oldest parent-child stream pair on complete. |
Stream<O> |
repeat(int numRepeat)
Create a new
Stream which will keep re-subscribing its oldest parent-child stream pair on complete. |
Stream<O> |
repeatWhen(Function<? super Stream<? extends Long>,? extends org.reactivestreams.Publisher<?>> backOffStream)
Create a new
Stream which will re-subscribe its oldest parent-child stream pair if the backOff stream
produced by the passed mapper emits any next signal. |
Stream<O> |
requestWhen(Function<? super Stream<? extends Long>,? extends org.reactivestreams.Publisher<? extends Long>> throttleStream)
Request the parent stream every time the passed throttleStream signals a Long request volume.
|
Stream<O> |
retry()
Create a new
Stream which will re-subscribe its oldest parent-child stream pair. |
Stream<O> |
retry(int numRetries)
Create a new
Stream which will re-subscribe its oldest parent-child stream pair. |
Stream<O> |
retry(int numRetries,
Predicate<Throwable> retryMatcher)
Create a new
Stream which will re-subscribe its oldest parent-child stream pair. |
Stream<O> |
retry(Predicate<Throwable> retryMatcher)
Create a new
Stream which will re-subscribe its oldest parent-child stream pair. |
Stream<O> |
retryWhen(Function<? super Stream<? extends Throwable>,? extends org.reactivestreams.Publisher<?>> backOffStream)
Create a new
Stream which will re-subscribe its oldest parent-child stream pair if the backOff stream
produced by the passed mapper emits any next data or complete signal. |
Stream<O> |
sample()
Create a new
Stream whose values will be only the last value of each batch. |
Stream<O> |
sample(int batchSize)
Create a new
Stream whose values will be only the last value of each batch. |
Stream<O> |
sample(int maxSize,
long timespan,
TimeUnit unit)
Create a new
Stream whose values will be only the last value of each batch. |
Stream<O> |
sample(int maxSize,
long timespan,
TimeUnit unit,
Timer timer)
Create a new
Stream whose values will be only the last value of each batch. |
Stream<O> |
sample(long timespan,
TimeUnit unit)
Create a new
Stream whose values will be only the last value of each batch. |
Stream<O> |
sample(long timespan,
TimeUnit unit,
Timer timer)
Create a new
Stream whose values will be only the last value of each batch. |
Stream<O> |
sampleFirst()
Create a new
Stream whose values will be only the first value of each batch. |
Stream<O> |
sampleFirst(int batchSize)
Create a new
Stream whose values will be only the first value of each batch. |
Stream<O> |
sampleFirst(int maxSize,
long timespan,
TimeUnit unit)
Create a new
Stream whose values will be only the first value of each batch. |
Stream<O> |
sampleFirst(int maxSize,
long timespan,
TimeUnit unit,
Timer timer)
Create a new
Stream whose values will be only the first value of each batch. |
Stream<O> |
sampleFirst(long timespan,
TimeUnit unit)
Create a new
Stream whose values will be only the first value of each batch. |
Stream<O> |
sampleFirst(long timespan,
TimeUnit unit,
Timer timer)
Create a new
Stream whose values will be only the first value of each batch. |
<A> Stream<A> |
scan(A initial,
BiFunction<A,? super O,A> fn)
Scan the values passing through this
Stream into an object A . |
Stream<O> |
scan(BiFunction<O,O,O> fn)
Scan the values passing through this
Stream into an object A . |
Stream<O> |
skip(long max)
Create a new
Stream that will NOT signal next elements up to times. |
Stream<O> |
skip(long time,
TimeUnit unit)
Create a new
Stream that will NOT signal next elements up to the specified . |
Stream<O> |
skip(long time,
TimeUnit unit,
Timer timer)
Create a new
Stream that will NOT signal next elements up to the specified . |
Stream<O> |
skipWhile(long max,
Predicate<O> limitMatcher)
Create a new
Stream that will NOT signal next elements while is true or
up to times. |
Stream<O> |
skipWhile(Predicate<O> limitMatcher)
Create a new
Stream that will NOT signal next elements while is true. |
Stream<O> |
sort()
Stage incoming values into a
PriorityQueue<O> that will be re-ordered and signaled to the
returned fresh Stream . |
Stream<O> |
sort(Comparator<? super O> comparator)
Stage incoming values into a
PriorityQueue<O> that will be re-ordered and signaled to the
returned fresh Stream . |
Stream<O> |
sort(int maxCapacity)
Stage incoming values into a
PriorityQueue<O> that will be re-ordered and signaled to the
returned fresh Stream . |
Stream<O> |
sort(int maxCapacity,
Comparator<? super O> comparator)
Stage incoming values into a
PriorityQueue<O> that will be re-ordered and signaled to the
returned fresh Stream . |
<V> Stream<V> |
split()
Create a new
Stream whose values will be each element E of any IterableStream . |
<V> Stream<V> |
split(long batchSize)
Create a new
Stream whose values will be each element E of any IterableStream . |
Stream<O> |
startWith(Iterable<O> iterable)
Start emitting all items from the passed publisher then emits from the current stream.
|
Stream<O> |
startWith(O value)
Start emitting all items from the passed publisher then emits from the current stream.
|
Stream<O> |
startWith(org.reactivestreams.Publisher<? extends O> publisher)
Start emitting all items from the passed publisher then emits from the current stream.
|
<A> void |
subscribe(CompositeAction<O,A> subscriber)
Subscribe the
CompositeAction.input() to this Stream. |
Stream<O> |
subscribeOn(Dispatcher currentDispatcher)
Assign a new Dispatcher to handle upstream request to the returned Stream.
|
void |
subscribeOn(Dispatcher currentDispatcher,
org.reactivestreams.Subscriber<? super O> sub)
Assign a new Dispatcher to handle upstream request to the returned Stream.
|
Stream<O> |
subscribeOn(Environment environment)
Assign a new Dispatcher to handle upstream request to the returned Stream.
|
<V> Stream<V> |
switchMap(Function<? super O,org.reactivestreams.Publisher<? extends V>> fn)
Assign the given
Function to transform the incoming value T into a Stream<O,V> and pass
it into another Stream . |
Stream<O> |
take(long max)
Create a new
Stream that will signal next elements up to times. |
Stream<O> |
take(long time,
TimeUnit unit)
Create a new
Stream that will signal next elements up to the specified . |
Stream<O> |
take(long time,
TimeUnit unit,
Timer timer)
Create a new
Stream that will signal next elements up to the specified . |
Stream<O> |
takeWhile(Predicate<O> limitMatcher)
Create a new
Stream that will signal next elements while is true. |
TapAndControls<O> |
tap()
Create a
Tap that maintains a reference to the last value seen by this Stream . |
Stream<O> |
throttle(long period)
Request once the parent stream every milliseconds.
|
Stream<O> |
throttle(long period,
Timer timer)
Request once the parent stream every milliseconds after an initial .
|
Stream<O> |
timeout(long timeout)
Signal an error if no data has been emitted for milliseconds.
|
Stream<O> |
timeout(long timeout,
TimeUnit unit)
Signal an error if no data has been emitted for milliseconds.
|
Stream<O> |
timeout(long timeout,
TimeUnit unit,
org.reactivestreams.Publisher<? extends O> fallback)
Switch to the fallback Publisher if no data has been emitted for milliseconds.
|
Stream<O> |
timeout(long timeout,
TimeUnit unit,
org.reactivestreams.Publisher<? extends O> fallback,
Timer timer)
Signal an error if no data has been emitted for milliseconds.
|
Stream<Tuple2<Long,O>> |
timestamp()
|
CompletableBlockingQueue<O> |
toBlockingQueue()
Blocking call to pass values from this stream to the queue that can be polled from a consumer.
|
CompletableBlockingQueue<O> |
toBlockingQueue(int maximum)
Blocking call to eagerly fetch values from this stream
|
Promise<List<O>> |
toList()
Fetch all values in a List to the returned Promise
|
Promise<List<O>> |
toList(long maximum)
Return the promise of N signals collected into an array list.
|
String |
toString() |
Stream<O> |
unbounded()
Make this Stream subscribers unbounded
|
<E extends Throwable> |
when(Class<E> exceptionType,
Consumer<E> onError)
Assign an error handler to exceptions of the given type.
|
Stream<Stream<O>> |
window()
Re-route incoming values into a dynamically created
Stream every pre-defined getCapacity()
times. |
Stream<Stream<O>> |
window(int backlog)
Re-route incoming values into a dynamically created
Stream every pre-defined times. |
Stream<Stream<O>> |
window(int maxSize,
int skip)
Re-route incoming values into bucket streams that will be pushed into the returned
Stream every skip and complete every time maxSize has been reached by any of them. |
Stream<Stream<O>> |
window(int maxSize,
long timespan,
TimeUnit unit)
Re-route incoming values into a dynamically created
Stream every pre-defined timespan OR maxSize items. |
Stream<Stream<O>> |
window(int maxSize,
long timespan,
TimeUnit unit,
Timer timer)
Re-route incoming values into a dynamically created
Stream every pre-defined timespan OR maxSize items. |
Stream<Stream<O>> |
window(long timespan,
long timeshift,
TimeUnit unit)
Re-route incoming values into bucket streams that will be pushed into the returned
Stream every timeshift period. |
Stream<Stream<O>> |
window(long timespan,
long timeshift,
TimeUnit unit,
Timer timer)
Re-route incoming values into bucket streams that will be pushed into the returned
Stream every timeshift period. |
Stream<Stream<O>> |
window(long timespan,
TimeUnit unit)
Re-route incoming values into a dynamically created
Stream every pre-defined timespan. |
Stream<Stream<O>> |
window(long timespan,
TimeUnit unit,
Timer timer)
Re-route incoming values into a dynamically created
Stream every pre-defined timespan. |
Stream<Stream<O>> |
window(org.reactivestreams.Publisher<?> bucketOpening,
Supplier<? extends org.reactivestreams.Publisher<?>> boundarySupplier)
Re-route incoming values into bucket streams that will be pushed into the returned
Stream every and
complete every time boundarySupplier stream emits an item. |
Stream<Stream<O>> |
window(Supplier<? extends org.reactivestreams.Publisher<?>> boundarySupplier)
Re-route incoming values into bucket streams that will be pushed into the returned
Stream every and
complete every time boundarySupplier stream emits an item. |
<V> Stream<V> |
zip(Function<TupleN,? extends V> zipper)
|
<T2,V> Stream<V> |
zipWith(Iterable<? extends T2> iterable,
Function<Tuple2<O,T2>,V> zipper)
|
<T2,V> Stream<V> |
zipWith(org.reactivestreams.Publisher<? extends T2> publisher,
Function<Tuple2<O,T2>,V> zipper)
|
public final <E> Stream<E> cast(@Nonnull Class<E> stream)
E
- the Action
output typepublic <V> Stream<V> lift(@Nonnull Supplier<? extends Action<O,V>> action)
Action
to the actual pipeline.
Terminal operations such as consume(reactor.fn.Consumer)
will start the subscription chain.
It will listen for current Stream signals and will be eventually producing signals as well (subscribe,error,
complete,next).
The action is returned for functional-style chaining.
V
- the Action
output typeaction
- the function to map a provided dispatcher to a fresh Action to subscribe.org.reactivestreams.Publisher#subscribe(org.reactivestreams.Subscriber)}
public final <E extends Throwable> Stream<O> when(@Nonnull Class<E> exceptionType, @Nonnull Consumer<E> onError)
E
- type of the exception to handleexceptionType
- the type of exceptions to handleonError
- the error handler for each exceptionpublic final <E extends Throwable> Stream<O> observeError(@Nonnull Class<E> exceptionType, @Nonnull BiConsumer<Object,? super E> onError)
E
- type of the exception to handleexceptionType
- the type of exceptions to handleonError
- the error handler for each exceptionpublic final Stream<O> onErrorResumeNext(@Nonnull org.reactivestreams.Publisher<? extends O> fallback)
fallback
- the error handler for each exceptionpublic final <E extends Throwable> Stream<O> onErrorResumeNext(@Nonnull Class<E> exceptionType, @Nonnull org.reactivestreams.Publisher<? extends O> fallback)
E
- type of the exception to handleexceptionType
- the type of exceptions to handlefallback
- the error handler for each exceptionpublic final Stream<O> onErrorReturn(@Nonnull Function<Throwable,? extends O> fallback)
fallback
- the error handler for each exceptionpublic final <E extends Throwable> Stream<O> onErrorReturn(@Nonnull Class<E> exceptionType, @Nonnull Function<E,? extends O> fallback)
E
- type of the exception to handleexceptionType
- the type of exceptions to handlefallback
- the error handler for each exceptionpublic final Stream<Void> after()
public final Stream<Signal<O>> materialize()
Signal
.
Since the error is materialized as a Signal
, the propagation will be stopped.
Complete signal will first emit a Signal.complete()
and then effectively complete the stream.public final <X> Stream<X> dematerialize()
Signal
.
Since the error is materialized as a Signal
, the propagation will be stopped.
Complete signal will first emit a Signal.complete()
and then effectively complete the stream.public final Stream<O> broadcast()
Broadcaster
and return it for future subscribers interactions. Effectively it turns any
stream into an Hot Stream where subscribers will only values from the time T when they subscribe to the returned
stream. Complete and Error signals are however retained unless keepAlive()
has been called before.
public final Stream<O> broadcastOn(Dispatcher dispatcher)
Broadcaster
and return it for future subscribers interactions. Effectively it turns any
stream into an Hot Stream where subscribers will only values from the time T when they subscribe to the returned
stream. Complete and Error signals are however retained unless keepAlive()
has been called before.
dispatcher
- the dispatcher to run the signalspublic final <E extends org.reactivestreams.Subscriber<? super O>> E broadcastTo(E subscriber)
E
- the hydrated generic type for the passed argument, allowing for method chainingsubscriber
- the subscriber to subscribe to this stream and returnpublic final TapAndControls<O> tap()
public Control consumeLater()
public Control consume()
public Control consume(long n)
Control
interface to operate on the materialized upstreampublic final Control consume(Consumer<? super O> consumer)
Consumer
to this Stream
that will consume any values accepted by this Stream
. As such this a terminal action to be placed on a stream flow.
It will also eagerly prefetch upstream publisher.
For a passive version that observe and forward incoming data see observe(reactor.fn.Consumer)
consumer
- the consumer to invoke on each valueControl
interface to operate on the materialized upstreampublic final Control consumeOn(Dispatcher dispatcher, Consumer<? super O> consumer)
Consumer
to this Stream
that will consume any values accepted by this Stream
. As such this a terminal action to be placed on a stream flow. It will also eagerly prefetch upstream
publisher.
For a passive version that observe and forward incoming data see observe(reactor.fn.Consumer)
dispatcher
- the dispatcher to run the consumerconsumer
- the consumer to invoke on each valueControl
interface to operate on the materialized upstreampublic final Control consume(Consumer<? super O> consumer, Consumer<? super Throwable> errorConsumer)
Consumer
to this Stream
that will consume any values signaled by this Stream
. As such this a terminal action to be placed on a stream flow.
Any Error signal will be consumed by the error consumer.
It will also eagerly prefetch upstream publisher.
consumer
- the consumer to invoke on each next signalerrorConsumer
- the consumer to invoke on each error signalControl
interface to operate on the materialized upstreampublic final Control consumeOn(Dispatcher dispatcher, Consumer<? super O> consumer, Consumer<? super Throwable> errorConsumer)
Consumer
to this Stream
that will consume any values signaled by this Stream
. As such this a terminal action to be placed on a stream flow.
Any Error signal will be consumed by the error consumer.
It will also eagerly prefetch upstream publisher.
consumer
- the consumer to invoke on each next signalerrorConsumer
- the consumer to invoke on each error signaldispatcher
- the dispatcher to run the consumerControl
interface to operate on the materialized upstreampublic final Control consume(Consumer<? super O> consumer, Consumer<? super Throwable> errorConsumer, Consumer<Void> completeConsumer)
Consumer
to this Stream
that will consume any values signaled by this Stream
. As such this a terminal action to be placed on a stream flow.
Any Error signal will be consumed by the error consumer.
The Complete signal will be consumed by the complete consumer.
Only error and complete signal will be signaled downstream. It will also eagerly prefetch upstream publisher.
consumer
- the consumer to invoke on each valueerrorConsumer
- the consumer to invoke on each error signalcompleteConsumer
- the consumer to invoke on complete signalpublic final Control consumeOn(Dispatcher dispatcher, Consumer<? super O> consumer, Consumer<? super Throwable> errorConsumer, Consumer<Void> completeConsumer)
Consumer
to this Stream
that will consume any values signaled by this Stream
. As such this a terminal action to be placed on a stream flow.
Any Error signal will be consumed by the error consumer.
The Complete signal will be consumed by the complete consumer.
Only error and complete signal will be signaled downstream. It will also eagerly prefetch upstream publisher.
consumer
- the consumer to invoke on each valueerrorConsumer
- the consumer to invoke on each error signalcompleteConsumer
- the consumer to invoke on complete signaldispatcher
- the dispatcher to run the consumerpublic final Control batchConsume(Consumer<? super O> consumer, Function<Long,? extends Long> requestMapper)
Consumer
to this Stream
that will consume any values accepted by this Stream
. As such this a terminal action to be placed on a stream flow. It will also eagerly prefetch upstream
publisher.
The passed {code requestMapper} function will receive the Stream
of the last N requested elements
-starting with the
capacity defined for the stream- when the N elements have been consumed. It will return a Publisher
of
long signals
S that will instruct the consumer to request S more elements, possibly altering the "batch" size if wished.
For a passive version that observe and forward incoming data see observe(reactor.fn.Consumer)
consumer
- the consumer to invoke on each valueControl
interface to operate on the materialized upstreampublic final Control adaptiveConsume(Consumer<? super O> consumer, Function<Stream<Long>,? extends org.reactivestreams.Publisher<? extends Long>> requestMapper)
Consumer
to this Stream
that will consume any values accepted by this Stream
. As such this a terminal action to be placed on a stream flow. It will also eagerly prefetch upstream
publisher.
The passed {code requestMapper} function will receive the Stream
of the last N requested elements
-starting with the
capacity defined for the stream- when the N elements have been consumed. It will return a Publisher
of
long signals
S that will instruct the consumer to request S more elements.
For a passive version that observe and forward incoming data see observe(reactor.fn.Consumer)
consumer
- the consumer to invoke on each valueControl
interface to operate on the materialized upstreampublic final Control batchConsumeOn(Dispatcher dispatcher, Consumer<? super O> consumer, Function<Long,? extends Long> requestMapper)
Consumer
to this Stream
that will consume any values accepted by this Stream
. As such this a terminal action to be placed on a stream flow. It will also eagerly prefetch upstream
publisher.
The passed {code requestMapper} function will receive the Stream
of the last N requested elements
-starting with the
capacity defined for the stream- when the N elements have been consumed. It will return a Publisher
of
long signals
S that will instruct the consumer to request S more elements, possibly altering the "batch" size if wished.
For a passive version that observe and forward incoming data see observe(reactor.fn.Consumer)
consumer
- the consumer to invoke on each valueControl
interface to operate on the materialized upstreampublic final Control adaptiveConsumeOn(Dispatcher dispatcher, Consumer<? super O> consumer, Function<Stream<Long>,? extends org.reactivestreams.Publisher<? extends Long>> requestMapper)
Consumer
to this Stream
that will consume any values accepted by this Stream
. As such this a terminal action to be placed on a stream flow. It will also eagerly prefetch upstream
publisher.
The passed {code requestMapper} function will receive the Stream
of the last N requested elements
-starting with the
capacity defined for the stream- when the N elements have been consumed. It will return a Publisher
of
long signals
S that will instruct the consumer to request S more elements.
Multiple long signals S can be requested before a given request complete and therefore an approriate ordering Dispatcher should be used.
For a passive version that observe and forward incoming data see observe(reactor.fn.Consumer)
consumer
- the consumer to invoke on each valueControl
interface to operate on the materialized upstreampublic final Stream<O> dispatchOn(@Nonnull Environment environment)
environment
- the environment to get dispatcher from Environment.getDefaultDispatcher()
Stream
running on a different Dispatcher
public final Stream<O> subscribeOn(@Nonnull Environment environment)
environment
- the environment to get dispatcher from Environment.getDefaultDispatcher()
Stream
whose requests are running on a different Dispatcher
public final Stream<O> dispatchOn(@Nonnull Dispatcher dispatcher)
dispatcher
- the new dispatcherStream
running on a different Dispatcher
public final void subscribeOn(@Nonnull Dispatcher currentDispatcher, org.reactivestreams.Subscriber<? super O> sub)
sub
- the subscriber to request using the current dispatchercurrentDispatcher
- the new dispatcherpublic final Stream<O> subscribeOn(@Nonnull Dispatcher currentDispatcher)
currentDispatcher
- the new dispatcherStream
whose requests are running on a different Dispatcher
public Stream<O> dispatchOn(Environment environment, @Nonnull Dispatcher dispatcher)
dispatcher
- the new dispatcherenvironment
- the environmentStream
running on a different Dispatcher
public final Stream<O> observe(@Nonnull Consumer<? super O> consumer)
consumer
- the consumer to invoke on each valuepublic final Stream<O> cache()
Stream
and release them on request that will observe any values accepted by this
Stream
.public final Stream<O> log()
Logger
to this Stream
that will observe any signal emitted.public final Stream<O> log(String name)
Logger
to this Stream
that will observe any signal emitted.name
- The logger namepublic final Stream<O> observeComplete(@Nonnull Consumer<Void> consumer)
Consumer
to this Stream
that will observe any complete signalconsumer
- the consumer to invoke on completepublic final Stream<O> observeSubscribe(@Nonnull Consumer<? super org.reactivestreams.Subscriber<? super O>> consumer)
Consumer
to this Stream
that will observe any subscribe signalconsumer
- the consumer to invoke ont subscribepublic final Stream<O> observeStart(@Nonnull Consumer<? super org.reactivestreams.Subscription> consumer)
Consumer
to this Stream
that will observe any onSubscribe signalconsumer
- the consumer to invoke on onSubscribepublic final Stream<O> observeCancel(@Nonnull Consumer<Void> consumer)
Consumer
to this Stream
that will observe any cancel signalconsumer
- the consumer to invoke on cancelpublic Stream<O> ignoreError()
Stream
public <E> Stream<O> ignoreError(Predicate<? super Throwable> ignorePredicate)
ignorePredicate
- a predicate to test if an error should be transformed to a complete signal.Stream
public final Stream<O> finallyDo(Consumer<Signal<O>> consumer)
Consumer
to this Stream
that will observe terminal signal complete|error.
The consumer will listen for the signal and introspect its state.consumer
- the consumer to invoke on terminal signalpublic final Stream<O> defaultIfEmpty(O defaultValue)
defaultValue
- the value to forward if the stream is emptypublic final <V> Stream<V> map(@Nonnull Function<? super O,? extends V> fn)
Function
to transform the incoming value T
into a V
and pass it into
another Stream
.V
- the type of the return value of the transformation functionfn
- the transformation functionStream
containing the transformed valuespublic final <V> Stream<V> flatMap(@Nonnull Function<? super O,? extends org.reactivestreams.Publisher<? extends V>> fn)
Function
to transform the incoming value T
into a Stream<O,V>
and pass
it into another Stream
.V
- the type of the return value of the transformation functionfn
- the transformation functionStream
containing the transformed valuespublic final <V> Stream<V> switchMap(@Nonnull Function<? super O,org.reactivestreams.Publisher<? extends V>> fn)
Function
to transform the incoming value T
into a Stream<O,V>
and pass
it into another Stream
. The produced stream will emit the data from the most recent transformed stream.V
- the type of the return value of the transformation functionfn
- the transformation functionStream
containing the transformed valuespublic final <V> Stream<V> concatMap(@Nonnull Function<? super O,org.reactivestreams.Publisher<? extends V>> fn)
Function
to transform the incoming value T
into a Stream<O,V>
and pass
it into another Stream
. The produced stream will emit the data from all transformed streams in order.V
- the type of the return value of the transformation functionfn
- the transformation functionStream
containing the transformed valuespublic final <V> Stream<V> decode(Codec<O,V,?> codec)
Codec.decode(Publisher)
V
- the type of the Input codec type translated from the current "source" type sequence codec
- the unmarshalling codecStream
containing the transformed valuespublic final <V> Stream<V> encode(Codec<V,?,O> codec)
Codec.encode(Publisher)
V
- the type of the Source codec type translated from the current "output" sequence codec
- the unmarshalling codecStream
containing the transformed valuespublic final <V> Stream<V> merge()
lift(Supplier)
all the nested Publisher
values to a new Stream
.
Dynamic merge requires use of reactive-pull
offered by default StreamSubscription. If merge hasn't getCapacity() to take new elements because its #getCapacity()(long)
instructed so, the subscription will buffer
them.V
- the inner stream flowing data type that will be the produced signal.public final Stream<O> mergeWith(org.reactivestreams.Publisher<? extends O> publisher)
lift(Supplier)
all the nested Publisher
values from this current upstream and from the
passed publisher.public final Stream<O> concatWith(org.reactivestreams.Publisher<? extends O> publisher)
lift(Supplier)
all the nested Publisher
values from this current upstream and then on
complete consume from the
passed publisher.public final Stream<O> startWith(Iterable<O> iterable)
public final Stream<O> startWith(O value)
public final Stream<O> startWith(org.reactivestreams.Publisher<? extends O> publisher)
public final <V> Stream<List<V>> join()
lift(Supplier)
all the nested Publisher
values to a new Stream
until one of them
complete.
The result will be produced with a list of each upstream most recent emitted data.public final <V> Stream<List<V>> joinWith(org.reactivestreams.Publisher<? extends V> publisher)
lift(Supplier)
all the nested Publisher
values to a new Stream
until one of them
complete.
The result will be produced with a list of each upstream most recent emitted data.public final <V> Stream<V> zip(@Nonnull Function<TupleN,? extends V> zipper)
lift(Supplier)
all the nested Publisher
values to a new Stream
until one of them
complete.
The result will be produced by the zipper transformation from a tuple of each upstream most recent emitted data.public final <T2,V> Stream<V> zipWith(Iterable<? extends T2> iterable, @Nonnull Function<Tuple2<O,T2>,V> zipper)
lift(Supplier)
all the nested Publisher
values to a new Stream
until one of them
complete.
The result will be produced by the zipper transformation from a tuple of each upstream most recent emitted data.public final <T2,V> Stream<V> zipWith(org.reactivestreams.Publisher<? extends T2> publisher, @Nonnull Function<Tuple2<O,T2>,V> zipper)
lift(Supplier)
with the passed Publisher
values to a new Stream
until one of them
complete.
The result will be produced by the zipper transformation from a tuple of each upstream most recent emitted data.public <T,V> Stream<V> fanIn(FanInAction<T,?,V,? extends FanInAction.InnerSubscriber<T,?,V>> fanInAction)
lift(Supplier)
all the nested Publisher
values to a new Stream
calling the logic
inside the provided fanInAction for complex merging strategies.
FanInAction
provides helpers to create subscriber for each source,
a registry of incoming sources and overriding doXXX signals as usual to produce the result via
reactor.rx.action.Action#broadcastXXX.
A default fanInAction will act like merge()
, passing values to doNext. In java8 one can then
implement
stream.fanIn(data -> broadcastNext(data)) or stream.fanIn(System.out::println)
Dynamic merge (moving nested data into the top-level returned stream) requires use of reactive-pull offered by
default StreamSubscription. If merge hasn't getCapacity() to
take new elements because its #getCapacity()(long)
instructed so, the subscription will buffer
them.
T
- the nested type of flowing upstream Stream.V
- the produced outputpublic Stream<O> capacity(long elements)
Action
will request up to the defined volume upstream.
- An Action
will track the pending requests and fire up to when the previous volume has
been processed.
- A BatchAction
and any other size-bound action will be limited to the
defined volume.
A stream capacity can't be superior to the underlying dispatcher capacity: if the overflow the dispatcher backlog size, the capacity will be aligned automatically to fit it. RingBufferDispatcher will for instance take to a power of 2 size up to Integer.MAX_VALUE, where a Stream can be sized up to Long.MAX_VALUE in flight data.
When the stream receives more elements than requested, incoming data is eventually staged in a Subscription
.
The subscription can react differently according to the implementation in-use,
the default strategy is as following:
- The first-level of pair compositions Stream->Action will overflow data in a .CompletableQueue
,
ready to be polled when the action fire the pending requests.
- The following pairs of Action->Action will synchronously pass data
- Any pair of Stream->Subscriber or Action->Subscriber will behave as with the root Stream->Action pair rule.
- onOverflowBuffer()
force this staging behavior, with a possibilty to pass a .PersistentQueue
elements
- maximum number of in-flight datapublic final Stream<O> unbounded()
capacity(long)
public final Stream<O> onOverflowBuffer()
public Stream<O> onOverflowBuffer(Supplier<? extends CompletableQueue<O>> queueSupplier)
queueSupplier
- A completable queue Supplier
to provide support for overflowpublic final Stream<O> onOverflowDrop()
public final Stream<O> filter(Predicate<? super O> p)
Predicate
. If the predicate test succeeds, the value is
passed into the new Stream
. If the predicate test fails, the value is ignored.public final Stream<Boolean> filter()
Stream
. If the predicate test fails, the value is ignored.Stream
containing only values that pass the predicate testpublic final Stream<Stream<O>> nest()
Stream
whose only value will be the current instance of the Stream
.public final Stream<O> retry()
Stream
which will re-subscribe its oldest parent-child stream pair. The action will start
propagating errors after Integer.MAX_VALUE.Stream
public final Stream<O> retry(int numRetries)
Stream
which will re-subscribe its oldest parent-child stream pair. The action will start
propagating errors after .
This is generally useful for retry strategies and fault-tolerant streams.numRetries
- the number of times to tolerate an errorStream
public final Stream<O> retry(Predicate<Throwable> retryMatcher)
Stream
which will re-subscribe its oldest parent-child stream pair.
will test an incoming Throwable
, if positive the retry will occur.
This is generally useful for retry strategies and fault-tolerant streams.retryMatcher
- the predicate to evaluate if retry should occur based on a given error signalStream
public final Stream<O> retry(int numRetries, Predicate<Throwable> retryMatcher)
Stream
which will re-subscribe its oldest parent-child stream pair. The action will start
propagating errors after . will test an incoming ,
if positive
the retry will occur (in conjonction with the condition).
This is generally useful for retry strategies and fault-tolerant streams.numRetries
- the number of times to tolerate an errorretryMatcher
- the predicate to evaluate if retry should occur based on a given error signalStream
public final Stream<O> recover(@Nonnull Class<? extends Throwable> exceptionType, org.reactivestreams.Subscriber<Object> recoveredValuesSink)
Stream
which will re-subscribe its oldest parent-child stream pair if the exception is of
the given type.
The recoveredValues subscriber will be emitted the associated value if any. If it doesn't match the given
exception type, the error signal will be propagated downstream but not to the recovered values sink.recoveredValuesSink
- the subscriber to listen for recovered valuesexceptionType
- the type of exceptions to handleStream
public final Stream<O> retryWhen(Function<? super Stream<? extends Throwable>,? extends org.reactivestreams.Publisher<?>> backOffStream)
Stream
which will re-subscribe its oldest parent-child stream pair if the backOff stream
produced by the passed mapper emits any next data or complete signal. It will propagate the error if the backOff
stream emits an error signal.backOffStream
- the function taking the error stream as an input and returning a new stream that applies
some backoff policy e.g. Streams.timerStream
public final Stream<O> repeat()
Stream
which will keep re-subscribing its oldest parent-child stream pair on complete.Stream
public final Stream<O> repeat(int numRepeat)
Stream
which will keep re-subscribing its oldest parent-child stream pair on complete.
The action will be propagating complete after .
if positivenumRepeat
- the number of times to re-subscribe on completeStream
public final Stream<O> repeatWhen(Function<? super Stream<? extends Long>,? extends org.reactivestreams.Publisher<?>> backOffStream)
Stream
which will re-subscribe its oldest parent-child stream pair if the backOff stream
produced by the passed mapper emits any next signal. It will propagate the complete and error if the backoff
stream emits the relative signals.backOffStream
- the function taking a stream of complete timestamp in millis as an input and returning a new
stream that applies some backoff policy, e.g. @{link Streams#timer(long)}Stream
public final Stream<O> last()
Stream
that will signal the last element observed before complete signal.Stream
public final Stream<O> take(long max)
Stream
that will signal next elements up to times.max
- the number of times to broadcast next signals before completingStream
public final Stream<O> take(long time, TimeUnit unit)
Stream
that will signal next elements up to the specified .time
- the time window to broadcast next signals before completingunit
- the time unit to useStream
public final Stream<O> take(long time, TimeUnit unit, Timer timer)
Stream
that will signal next elements up to the specified .time
- the time window to broadcast next signals before completingunit
- the time unit to usetimer
- the Timer to useStream
public final Stream<O> takeWhile(Predicate<O> limitMatcher)
Stream
that will signal next elements while is true.limitMatcher
- the predicate to evaluate for starting dropping events and completingStream
public final Stream<O> skip(long max)
Stream
that will NOT signal next elements up to times.max
- the number of times to drop next signals before startingStream
public final Stream<O> skip(long time, TimeUnit unit)
Stream
that will NOT signal next elements up to the specified .time
- the time window to drop next signals before startingunit
- the time unit to useStream
public final Stream<O> skip(long time, TimeUnit unit, Timer timer)
Stream
that will NOT signal next elements up to the specified .time
- the time window to drop next signals before startingunit
- the time unit to usetimer
- the Timer to useStream
public final Stream<O> skipWhile(Predicate<O> limitMatcher)
Stream
that will NOT signal next elements while is true.limitMatcher
- the predicate to evaluate to start broadcasting eventsStream
public final Stream<O> skipWhile(long max, Predicate<O> limitMatcher)
Stream
that will NOT signal next elements while is true or
up to times.max
- the number of times to drop next signals before startinglimitMatcher
- the predicate to evaluate for starting dropping events and completingStream
public final Stream<Tuple2<Long,O>> timestamp()
Stream
that emits tuples of millis time and matching datapublic final Stream<Tuple2<Long,O>> elapsed()
Stream
that accepts a Tuple2
of T1 Long
timemillis and T2
Stream
that emits tuples of time elapsed in milliseconds and matching datapublic final Stream<O> elementAt(int index)
Stream
that emits an item at a specified index from a source Stream
index
- index of an itempublic final Stream<O> elementAtOrDefault(int index, O defaultValue)
Stream
that emits an item at a specified index from a source Stream
or default value when index is out of boundsindex
- index of an itempublic final Stream<O> sampleFirst()
Stream
whose values will be only the first value of each batch. Requires a getCapacity()
to have been set.
When a new batch is triggered, the first value of that next batch will be pushed into this Stream
.
Stream
whose values are the first value of each batchpublic final Stream<O> sampleFirst(int batchSize)
Stream
whose values will be only the first value of each batch.
When a new batch is triggered, the first value of that next batch will be pushed into this Stream
.
batchSize
- the batch size to useStream
whose values are the first value of each batch)public final Stream<O> sampleFirst(long timespan, TimeUnit unit)
Stream
whose values will be only the first value of each batch.timespan
- the period in unit to use to release a buffered listunit
- the time unitStream
whose values are the first value of each batchpublic final Stream<O> sampleFirst(long timespan, TimeUnit unit, Timer timer)
Stream
whose values will be only the first value of each batch.timespan
- the period in unit to use to release a buffered listunit
- the time unittimer
- the Timer to run onStream
whose values are the first value of each batchpublic final Stream<O> sampleFirst(int maxSize, long timespan, TimeUnit unit)
Stream
whose values will be only the first value of each batch.maxSize
- the max counted sizetimespan
- the period in unit to use to release a buffered listunit
- the time unitStream
whose values are the first value of each batchpublic final Stream<O> sampleFirst(int maxSize, long timespan, TimeUnit unit, Timer timer)
Stream
whose values will be only the first value of each batch.maxSize
- the max counted sizetimespan
- the period in unit to use to release a buffered listunit
- the time unittimer
- the Timer to run onStream
whose values are the first value of each batchpublic final Stream<O> sample()
Stream
whose values will be only the last value of each batch. Requires a getCapacity()
Stream
whose values are the last value of each batchpublic final Stream<O> sample(int batchSize)
Stream
whose values will be only the last value of each batch. Requires a getCapacity()
batchSize
- the batch size to useStream
whose values are the last value of each batchpublic final Stream<O> sample(long timespan, TimeUnit unit)
Stream
whose values will be only the last value of each batch.timespan
- the period in unit to use to release a buffered listunit
- the time unitStream
whose values are the last value of each batchpublic final Stream<O> sample(long timespan, TimeUnit unit, Timer timer)
Stream
whose values will be only the last value of each batch.timespan
- the period in unit to use to release a buffered listunit
- the time unittimer
- the Timer to run onStream
whose values are the last value of each batchpublic final Stream<O> sample(int maxSize, long timespan, TimeUnit unit)
Stream
whose values will be only the last value of each batch.maxSize
- the max counted sizetimespan
- the period in unit to use to release a buffered listunit
- the time unitStream
whose values are the last value of each batchpublic final Stream<O> sample(int maxSize, long timespan, TimeUnit unit, Timer timer)
Stream
whose values will be only the last value of each batch.maxSize
- the max counted sizetimespan
- the period in unit to use to release a buffered listunit
- the time unittimer
- the Timer to run onStream
whose values are the last value of each batchpublic final Stream<O> distinctUntilChanged()
Stream
that filters out consecutive equals values.Stream
whose values are the last value of each batchpublic final <V> Stream<O> distinctUntilChanged(Function<? super O,? extends V> keySelector)
Stream
that filters out consecutive values having equal keys computed by functionkeySelector
- function to compute comparison key for each elementStream
whose values are the last value of each batchpublic final Stream<O> distinct()
Stream
that filters in only unique values.Stream
with unique valuespublic final <V> Stream<O> distinct(Function<? super O,? extends V> keySelector)
Stream
that filters in only values having distinct keys computed by functionkeySelector
- function to compute comparison key for each elementStream
with values having distinct keyspublic final Stream<Boolean> exists(Predicate<? super O> predicate)
Stream
that emits true
when any value satisfies a predicate
and false
otherwisepredicate
- predicate tested upon valuesStream
with true
if any value satisfies a predicate
and false
otherwisepublic final <V> Stream<V> split()
Stream
whose values will be each element E of any IterableStream
.Stream
whose values result from the iterable inputpublic final <V> Stream<V> split(long batchSize)
Stream
whose values will be each element E of any Iterable
When a new batch is triggered, the last value of that next batch will be pushed into this Stream
.
batchSize
- the batch size to useStream
whose values result from the iterable inputpublic final Stream<List<O>> buffer()
List
that will be pushed into the returned Stream
every
time getCapacity()
has been reached, or flush is triggered.public final Stream<List<O>> buffer(int maxSize)
List
buckets that will be pushed into the returned Stream
every time getCapacity()
has been reached.public final Stream<List<O>> buffer(org.reactivestreams.Publisher<?> bucketOpening, Supplier<? extends org.reactivestreams.Publisher<?>> boundarySupplier)
List
that will be moved into the returned Stream
every time the
passed boundary publisher emits an item.
Complete will flush any remaining items.public final Stream<List<O>> buffer(Supplier<? extends org.reactivestreams.Publisher<?>> boundarySupplier)
List
that will be moved into the returned Stream
every time the
passed boundary publisher emits an item.
Complete will flush any remaining items.public final Stream<List<O>> buffer(int maxSize, int skip)
List
that will be pushed into the returned Stream
every time maxSize
has been reached by any of them. Complete signal will flush any remaining buckets.public final Stream<List<O>> buffer(long timespan, TimeUnit unit)
List
that will be pushed into the returned Stream
every
timespan.public final Stream<List<O>> buffer(long timespan, TimeUnit unit, Timer timer)
List
that will be pushed into the returned Stream
every
timespan.public final Stream<List<O>> buffer(long timespan, long timeshift, TimeUnit unit)
List
buckets created every timeshift
that will be pushed
into the returned Stream
every
timespan. Complete signal will flush any remaining buckets.public final Stream<List<O>> buffer(long timespan, long timeshift, TimeUnit unit, Timer timer)
List
buckets created every timeshift
that will be pushed
into the returned Stream
every
timespan. Complete signal will flush any remaining buckets.public final Stream<List<O>> buffer(int maxSize, long timespan, TimeUnit unit)
List
that will be pushed into the returned Stream
every
timespan OR maxSize items.public final Stream<List<O>> buffer(int maxSize, long timespan, TimeUnit unit, Timer timer)
List
that will be pushed into the returned Stream
every
timespan OR maxSize itemspublic final Stream<O> sort()
PriorityQueue<O>
that will be re-ordered and signaled to the
returned fresh Stream
. Possible flush triggers are: getCapacity()
,
complete signal or request signal.
PriorityQueue will use the Comparable
interface from an incoming data signal.Stream
whose values re-ordered using a PriorityQueue.public final Stream<O> sort(int maxCapacity)
PriorityQueue<O>
that will be re-ordered and signaled to the
returned fresh Stream
. Possible flush triggers are: getCapacity()
,
complete signal or request signal.
PriorityQueue will use the Comparable
interface from an incoming data signal.maxCapacity
- a fixed maximum number or elements to re-order at once.Stream
whose values re-ordered using a PriorityQueue.public final Stream<O> sort(Comparator<? super O> comparator)
PriorityQueue<O>
that will be re-ordered and signaled to the
returned fresh Stream
. Possible flush triggers are: getCapacity()
,
complete signal or request signal.
PriorityQueue will use the Comparable
interface from an incoming data signal.comparator
- A Comparator
to evaluate incoming dataStream
whose values re-ordered using a PriorityQueue.public final Stream<O> sort(int maxCapacity, Comparator<? super O> comparator)
PriorityQueue<O>
that will be re-ordered and signaled to the
returned fresh Stream
. Possible flush triggers are: getCapacity()
,
complete signal or request signal.
PriorityQueue will use the Comparable
interface from an incoming data signal.maxCapacity
- a fixed maximum number or elements to re-order at once.comparator
- A Comparator
to evaluate incoming dataStream
whose values re-ordered using a PriorityQueue.public final Stream<Stream<O>> window()
Stream
every pre-defined getCapacity()
times. The nested streams will be pushed into the returned Stream
.public final Stream<Stream<O>> window(int backlog)
Stream
every pre-defined times.
The nested streams will be pushed into the returned Stream
.public final Stream<Stream<O>> window(int maxSize, int skip)
Stream
every skip
and complete every time maxSize
has been reached by any of them. Complete signal will flush any remaining buckets.public final Stream<Stream<O>> window(Supplier<? extends org.reactivestreams.Publisher<?>> boundarySupplier)
Stream
every and
complete every time boundarySupplier
stream emits an item.public final Stream<Stream<O>> window(org.reactivestreams.Publisher<?> bucketOpening, Supplier<? extends org.reactivestreams.Publisher<?>> boundarySupplier)
Stream
every and
complete every time boundarySupplier
stream emits an item. Window starts forwarding when the
bucketOpening stream emits an item, then subscribe to the boundary supplied to complete.public final Stream<Stream<O>> window(long timespan, TimeUnit unit)
Stream
every pre-defined timespan.
The nested streams will be pushed into the returned Stream
.public final Stream<Stream<O>> window(long timespan, TimeUnit unit, Timer timer)
Stream
every pre-defined timespan.
The nested streams will be pushed into the returned Stream
.public final Stream<Stream<O>> window(int maxSize, long timespan, TimeUnit unit)
Stream
every pre-defined timespan OR maxSize items.
The nested streams will be pushed into the returned Stream
.public final Stream<Stream<O>> window(int maxSize, long timespan, TimeUnit unit, Timer timer)
Stream
every pre-defined timespan OR maxSize items.
The nested streams will be pushed into the returned Stream
.public final Stream<Stream<O>> window(long timespan, long timeshift, TimeUnit unit)
Stream
every timeshift
period. These streams will complete every timespan
period has cycled. Complete signal will flush any remaining buckets.public final Stream<Stream<O>> window(long timespan, long timeshift, TimeUnit unit, Timer timer)
Stream
every timeshift
period. These streams will complete every timespan
period has cycled. Complete signal will flush any remaining buckets.public final <K> Stream<GroupedStream<K,O>> groupBy(Function<? super O,? extends K> keyMapper)
Stream
for each unique key evaluated by the
{param keyMapper}.public final Stream<GroupedStream<Integer,O>> partition()
Stream
for each unique key evaluated by the
{param keyMapper}. The hashcode of the incoming data will be used for partitioning over Environment.PROCESSORS
buckets.
That means that at any point of time at most Environment.PROCESSORS
number of streams will be created and
used accordingly
to the current hashcode % n result.public final Stream<GroupedStream<Integer,O>> partition(int buckets)
Stream
for each unique key evaluated by the
{param keyMapper}. The hashcode of the incoming data will be used for partitioning over the buckets number passed.
That means that at any point of time at most buckets
number of streams will be created and used
accordingly to the positive modulo of the current hashcode with respect to the number of buckets specified.public final Stream<O> reduce(@Nonnull BiFunction<O,O,O> fn)
Stream
into an object T
.
This is a simple functional way for accumulating values.
The arguments are the N-1 and N next signal in this order.fn
- the reduce functionStream
whose values contain only the reduced objectspublic final <A> Stream<A> reduce(A initial, @Nonnull BiFunction<A,? super O,A> fn)
Stream
into an object A
.
The arguments are the N-1 and N next signal in this order.A
- the type of the reduced objectfn
- the reduce functioninitial
- the initial argument to pass to the reduce functionStream
whose values contain only the reduced objectspublic final Stream<O> scan(@Nonnull BiFunction<O,O,O> fn)
Stream
into an object A
.
The arguments are the N-1 and N next signal in this order.fn
- the reduce functionStream
whose values contain only the reduced objectspublic final <A> Stream<A> scan(A initial, @Nonnull BiFunction<A,? super O,A> fn)
Stream
into an object A
. The given initial object will be
passed to the function's Tuple2
argument. Behave like Reduce but triggers downstream Stream for every
transformation.A
- the type of the reduced objectinitial
- the initial argument to pass to the reduce functionfn
- the scan functionStream
whose values contain only the reduced objectspublic final Stream<Long> count()
public final Stream<Long> count(long i)
Stream
public final Stream<O> throttle(long period)
period
- the period in milliseconds between two notifications on this streamStream
public final Stream<O> throttle(long period, Timer timer)
period
- the timeout in milliseconds between two notifications on this streamtimer
- the reactor timer to run the timeout onStream
public final Stream<O> requestWhen(Function<? super Stream<? extends Long>,? extends org.reactivestreams.Publisher<? extends Long>> throttleStream)
throttleStream
- a function that takes a broadcasted stream of request signals and must return a stream of
valid request signal (long).Stream
public final Stream<O> timeout(long timeout)
A Timeout Exception will be signaled if no data or complete signal have been sent within the given period.
timeout
- the timeout in milliseconds between two notifications on this composableStream
public final Stream<O> timeout(long timeout, TimeUnit unit)
A Timeout Exception will be signaled if no data or complete signal have been sent within the given period.
timeout
- the timeout in unit between two notifications on this composableunit
- the time unitStream
public final Stream<O> timeout(long timeout, TimeUnit unit, org.reactivestreams.Publisher<? extends O> fallback)
The current subscription will be cancelled and the fallback publisher subscribed.
A Timeout Exception will be signaled if no data or complete signal have been sent within the given period.
timeout
- the timeout in unit between two notifications on this composableunit
- the time unitfallback
- the fallback Publisher
to subscribe to once the timeout has occuredStream
public final Stream<O> timeout(long timeout, TimeUnit unit, org.reactivestreams.Publisher<? extends O> fallback, Timer timer)
A Timeout Exception will be signaled if no data or complete signal have been sent within the given period.
timeout
- the timeout in milliseconds between two notifications on this composableunit
- the time unittimer
- the reactor timer to run the timeout onStream
public <E> CompositeAction<E,O> combine()
Subscriber
input component and
the current stream to act as the Publisher
.
Useful to share and ship a full stream whilst hiding the staging actions in the middle.
Default behavior, e.g. a single stream, will raise an IllegalStateException
as there would not
be any Subscriber (Input) side to combine. Action.combine()
is the usual reference
implementation used.
E
- the type of the most ancien action input.public final Promise<O> next()
Stream
. It is useful to coordinate on single data streams or await for any signal.Promise
public final Promise<List<O>> toList()
public final Promise<List<O>> toList(long maximum)
maximum
- list size and therefore events signal to listen forpublic Stream<O> env(Environment environment)
environment
- the environmentpublic final CompletableBlockingQueue<O> toBlockingQueue()
public final CompletableBlockingQueue<O> toBlockingQueue(int maximum)
maximum
- queue getCapacity(), a full queue might block the stream producer.public Stream<O> keepAlive()
Stream
to be cancelled. Cancel propagation occurs when last subscriber is cancelled.public final <A> void subscribe(CompositeAction<O,A> subscriber)
CompositeAction.input()
to this Stream. Combining action
through Action.combine()
allows for easy distribution of a full flow.subscriber
- the combined actions to subscribepublic long getCapacity()
NonBlocking
Subscription
request needs. This is the maximum in-flight data allowed to transit to this elements.getCapacity
in interface NonBlocking
public boolean isReactivePull(Dispatcher dispatcher, long producerCapacity)
NonBlocking
Dispatcher
.isReactivePull
in interface NonBlocking
public Timer getTimer()
public PushSubscription<O> downstreamSubscription()
PushSubscription
public boolean cancelSubscription(PushSubscription<O> oPushSubscription)
true
if successful.PushSubscription
public Environment getEnvironment()
Environment
.Environment
public Dispatcher getDispatcher()
Copyright © 2017. All rights reserved.