IronMq Connector

The IronMq connector provides an Akka stream source and sink to connect to the IronMQ queue.

IronMq is a simple point-to-point queue, but it is possible to implement a fan-out semantic by configure the queue as push queue and set other queue as subscribers. More information about that could be found on IronMq documentation

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-ironmq" % "0.11"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-ironmq_2.12</artifactId>
  <version>0.11</version>
</dependency>
Gradle
dependencies {
  compile group: "com.lightbend.akka", name: "akka-stream-alpakka-ironmq_2.12", version: "0.11"
}

Usage

IronMq can be used either in cloud or on-premise. Either way you need a authentication token and a project ID. These can be set in the Typesafe config:

akka.stream.alpakka.ironmq {

  // The IronMg endpoint. It may vary due to availability zone and region.
  endpoint = "https://mq-aws-eu-west-1-1.iron.io"

  credentials {

    // The IronMq project id
    // project-id =

    // The IronMq auth token
    // token =
  }

  consumer {

    // This is the max number of message to fetch from IronMq.
    buffer-max-size = 100

    // This is the threshold where fech other messages from IronMq
    buffer-min-size = 25

    // This is the time interval between each poll loop
    fetch-interval = 250 milliseconds

    // This is the amount of time the IronMq client will wait for a message to be available in the queue
    poll-timeout = 0

    // This is the amount of time a fetched message will be not available to other consumers
    reservation-timeout = 30 seconds

  }
}

Consumer

The consumer is poll based one. It will poll every n milliseconds, waiting for m milliseconds to consume new messages and will push them to the downstream. All These parameters are configurable by the Typesafe config.

It supports both at-most-once and at-least-once semantics. In the first case the messages are deleted straight away after been fetched. In the latter case the messages piggy back a Committable object that should be used to commit the message. Committing the message will cause the message to be deleted from the queue.

The consumer could be instantiated using the IronMqConsumer. It provides methods to obtain either a Source[Message, NotUsed] or Source[CommittableMessage, NotUsed]. The first is for at-most-one semantic, the latter for at-least-once semantic.

Producer

The producer is very trivial at this time, it does not provide any batching mechanism, but sends messages to IronMq as soon as they arrive to the stage.

The producer could be instantiated using the IronMqProducer. It provides methods to obtain either a Flow[PushMessage, Messages.Id, NotUsed] or a Sink[PushMessage, NotUsed].

The PushMessage allows to specify the delay per individual message. The message expiration is set a queue level. The IronMqClient does not still have all the features to create and update these settings on a queue.

If you are using the producer Flow The returned Messages.Ids will contain the ID of the pushed message, that can be used to manipulate the message.

For each PushMessage from the upstream you will have exactly one Message.Id in downstream in the same order. Regardless if the producer will implement a batch mechanism in the future.

The producer also provides a Committable aware Flow/Sink as Flow[(PushMessage, ToCommit), (Message.Id, CommitResult), CommitMat]. It can be used to consume a Flow from an IronMq consumer or any other source that provides a commit mechanism.

The source code for this page can be found here.