p

akka.stream

contrib

package contrib

Ordering
  1. Alphabetic
Visibility
  1. Public
  2. All

Type Members

  1. final class AccumulateWhileUnchanged[Element, Property] extends GraphStage[FlowShape[Element, Seq[Element]]]

    Accumulates elements of type Element while a property extracted with propertyExtractor remains unchanged, emits an accumulated sequence when the property changes, maxElements is reached or maxDuration has passed.

    Accumulates elements of type Element while a property extracted with propertyExtractor remains unchanged, emits an accumulated sequence when the property changes, maxElements is reached or maxDuration has passed.

    Element

    type of accumulated elements

    Property

    type of the observed property

  2. final class DelayFlow[T] extends SimpleLinearGraphStage[T]

    Flow stage for universal delay management, allows to manage delay through DelayStrategy.

    Flow stage for universal delay management, allows to manage delay through DelayStrategy. It determines delay for each ongoing element invoking DelayStrategy.nextDelay(elem: T): FiniteDuration. Implementing DelayStrategy with your own gives you flexible ability to manage delay value depending on coming elements. It is important notice that DelayStrategy can be stateful. There are also predefined strategies, see DelayStrategy companion object's methods.

    See also

    DelayStrategy

  3. final case class KeepAliveConcat[T](keepAliveFailoverSize: Int, interval: FiniteDuration, extrapolate: (T) ⇒ Seq[T]) extends GraphStage[FlowShape[T, T]] with Product with Serializable

    Sends elements from buffer if upstream does not emit for a configured amount of time.

    Sends elements from buffer if upstream does not emit for a configured amount of time. In other words, this stage attempts to maintains a base rate of emitted elements towards the downstream using elements from upstream.

    If upstream emits new elements until the accumulated elements in the buffer exceed the specified minimum size used as the keep alive elements, then the base rate is no longer maintained until we reach another period without elements form upstream.

    The keep alive period is the keep alive failover size times the interval.

    Emits when upstream emits an element or if the upstream was idle for the configured period

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

    See also

    akka.stream.scaladsl.FlowOps#keepAlive

    akka.stream.scaladsl.FlowOps#expand

  4. final class LastElement[A] extends GraphStageWithMaterializedValue[FlowShape[A, A], Future[Option[A]]]

    This stage materializes to the last element pushed before upstream completion, if any, thereby recovering from any failure.

    This stage materializes to the last element pushed before upstream completion, if any, thereby recovering from any failure. Pushed elements are just passed along.

    A

    input and output type

  5. final class PartitionWith[In, Out0, Out1] extends GraphStage[FanOutShape2[In, Out0, Out1]]

    This stage partitions input to 2 different outlets, applying different transformations on the elements, according to the received partition function.

    This stage partitions input to 2 different outlets, applying different transformations on the elements, according to the received partition function.

    In

    input type

    Out0

    left output type

    Out1

    right output type

  6. final class Pulse[T] extends SimpleLinearGraphStage[T]

    Signals demand only once every interval (pulse) and then back-pressures.

    Signals demand only once every interval (pulse) and then back-pressures. Requested element is emitted downstream if there is demand.

    It can be used to implement simple time-window processing where data is aggregated for predefined amount of time and the computed aggregate is emitted once per this time. See TimeWindow

    T

    type of element

  7. case class Sample[T](next: () ⇒ Int) extends GraphStage[FlowShape[T, T]] with Product with Serializable

    supports sampling on stream

    supports sampling on stream

    next

    a lambda returns next sample position

  8. trait SwitchMode extends AnyRef
  9. trait TimedIntervalBetweenOps extends AnyRef

    Provides operations needed to implement the timedIntervalBetween DSL

  10. trait TimedOps extends AnyRef

    Provides operations needed to implement the timed DSL

  11. final class Valve[A] extends GraphStageWithMaterializedValue[FlowShape[A, A], Future[ValveSwitch]]

    Materializes into a Future of ValveSwitch which provides a the method flip that stops or restarts the flow of elements passing through the stage.

    Materializes into a Future of ValveSwitch which provides a the method flip that stops or restarts the flow of elements passing through the stage. As long as the valve is closed it will backpressure.

    Note that closing the valve could result in one element being buffered inside the stage, and if the stream completes or fails while being closed, that element may be lost.

  12. sealed trait ValveSwitch extends AnyRef

    Pause/ Resume a Flow

  13. final class ZipInputStreamSource extends GraphStageWithMaterializedValue[SourceShape[(ZipEntryData, ByteString)], Future[Long]]

    A stage that works as a Source of data chunks extracted from zip files.

    A stage that works as a Source of data chunks extracted from zip files. In addition to regular files, the zip file might contain directories and other zip files. Every chunk is a tuple of ZipEntryData and ByteString, where the former carries basic info about the file from which the bytes come and the latter carries those bytes. This stage materializes to the total amount of read bytes.

  14. final class Accumulate[A, B] extends GraphStage[FlowShape[A, B]]

    This stage emits folded values like scan, but the first element emitted is not the zero value but the result of applying the given function to the given zero value and the first pushed element.

    This stage emits folded values like scan, but the first element emitted is not the zero value but the result of applying the given function to the given zero value and the first pushed element.

    A

    input type

    B

    output type

    Annotations
    @deprecated
    Deprecated

    (Since version 0.5) Use scan and drop(1) instead

  15. final class DirectoryChanges extends GraphStage[SourceShape[Pair[Path, Change]]]

    Watches a file system directory and streams change events from it.

    Watches a file system directory and streams change events from it.

    Note that the JDK watcher is notoriously slow on some platform (up to 1s after event actually happened on OSX for example)

    Deprecated

    since 0.10, use Alpakka's implementation instead https://developer.lightbend.com/docs/alpakka/current/file.html#listing-directory-contents

  16. final class FileTailSource extends GraphStage[SourceShape[ByteString]]

    Read the entire contents of a file, and then when the end is reached, keep reading newly appended data.

    Read the entire contents of a file, and then when the end is reached, keep reading newly appended data. Like the unix command tail -f.

    Aborting the stage can be done by combining with a akka.stream.KillSwitch

    Deprecated

    since 0.10, use Alpakka's implementation instead https://developer.lightbend.com/docs/alpakka/current/file.html#tailing-a-file-into-a-stream

Value Members

  1. object Accumulate

    This companion defines a factory for Accumulate instances, see Accumulate.apply.

  2. object AccumulateWhileUnchanged
  3. object DelayFlow
  4. object FeedbackLoop
  5. object Implicits

    Additional akka.stream.scaladsl.Flow and akka.stream.scaladsl.Flow operators.

  6. object IntervalBasedRateLimiter
  7. object LastElement

    This companion defines a factory for LastElement instances, see LastElement.apply.

  8. object PagedSource

    Defines a factory for PagedSource.

  9. object PartitionWith

    This companion defines a factory for PartitionWith instances, see PartitionWith.apply.

  10. object PassThroughFlow

    Given Flow[In, Out] and f: (In, Out) => Result, transforms original flow into a Flow[In, Result], i.e.

    Given Flow[In, Out] and f: (In, Out) => Result, transforms original flow into a Flow[In, Result], i.e. for every element e signalled from upstream, emits f(e, flow(e)).

    Has an overloaded factory that fixes f to be a tuple constructor, i.e. for every element e signalled from upstream, emits a tuple (e, flow(e)).

    IMPORTANT! This flow combinator is guaranteed to work correctly on flows that have behavior of classic total functions, meaning that they should be a one-t-one functions that don't reorder, drop, inject etc new elements. In the future these restrictions may be lifted, for now please refer to the following resources for more: - https://github.com/akka/akka-stream-contrib/pull/142#discussion_r228875614 - https://github.com/akka/akka/issues/15957 and linked issues - https://discuss.lightbend.com/t/passing-stream-elements-into-and-over-flow/2536 and linked resources

    Applies no internal buffering / flow control etc.

    Emits when upstream emits an element

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

    Examples:

    1. Consuming from Kafka: Allows for busines logic to stay unaware of commits:

    val logic: Flow[CommittableMessage[String, Array[Byte]], ProcessingResult] =
      Flow[CommittableMessage[String, Array[Byte]]]
        .map(m => process(m.record))
    
    // Used like this:
    Consumer
      .committableSource(settings, Subscriptions.topics("my-topic"))
      .via(PassThroughFlow(logic))
      .map { case (committableMessage, processingResult) =>
        // decide to commit or not based on `processingResult`
      }
    
    // or:
    
    Consumer
      .committableSource(settings, Subscriptions.topics("my-topic"))
      .via(PassThroughFlow(logic, Keep.left)) // process messages but return original elements
      .mapAsync(1)(_.commitScalaDsl())

    2. Logging HTTP request-response based on some rule

    // assuming Akka HTTP entities:
    val route: Route = ???
    
    // Route has an implicit conversion to Flow[HttpRequest, HttpResponse]
    
    Flow[HttpRequest]
      .map { r =>
        // don't log web crawlers' requests
        if (userAgent(r) != "google-bot") {
          logRequest(r)
        }
        r
      }
      .via(PassThroughFlow(route)) // req => (req, resp)
      .map { case (req, resp) =>
        // don't log responses to web crawlers
        if (userAgent(req) != "google-bot") {
          logResponse(resp)
        }
        resp
      }
  11. object Retry

    This object defines methods for retry operations.

  12. object Sample extends Serializable
  13. object SourceGen

    Source factory methods are placed here

  14. object SourceRepeatEval

    Create a Source that will output elements of type A given a "producer" function

    Create a Source that will output elements of type A given a "producer" function

    Examples:

    stream of current times:

    SourceRepeatEval(() => System.currentTimeMillis)

    stream of random numbers:

    SourceRepeatEval(() => Random.nextInt)

    Behavior is the same as in

    Source.repeat(()).map(_ => x)

    Supports cancellation via materialized Cancellable.

  15. object SwitchMode
  16. object TimeWindow
  17. object Timed extends TimedOps with TimedIntervalBetweenOps
  18. object Valve
  19. object ZipInputStreamSource

    This companion defines a factory for ZipInputStreamSource instances, see ZipInputStreamSource.apply.

Ungrouped