Consumer
The Alpakka JMS connector offers consuming JMS messages from topics or queues:
- Read
javax.jms.Message
s from an Akka Streams source - Allow for client acknowledgement to the JMS broker
- Allow for JMS transactions
- Read raw JVM types from an Akka Streams Source
The JMS message model supports several types of message bodies in (see javax.jms.Message
), which may be created directly from the Akka Stream elements, or in wrappers to access more advanced features.
Receiving messages
JmsConsumer
JmsConsumer
offers factory methods to consume JMS messages in a number of ways.
This examples shows how to listen to a JMS queue and emit `javax.jms.Message`
elements into the stream.
The materialized value JmsConsumerControl
is used to shut down the consumer (it is a Killswitch
) and offers the possibility to inspect the connectivity state of the consumer.
- Scala
-
val jmsSource: Source[javax.jms.Message, JmsConsumerControl] = JmsConsumer( JmsConsumerSettings(system, connectionFactory).withQueue("numbers") ) val (control, result): (JmsConsumerControl, Future[immutable.Seq[String]]) = jmsSource .take(msgsIn.size) .map { case t: javax.jms.TextMessage => t.getText case other => sys.error(s"unexpected message type ${other.getClass}") } .toMat(Sink.seq)(Keep.both) .run() control.shutdown()
- Java
-
ConnectionFactory connectionFactory = server.createConnectionFactory(); Source<javax.jms.Message, JmsConsumerControl> jmsSource = JmsConsumer.create( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); Pair<JmsConsumerControl, CompletionStage<List<String>>> controlAndResult = jmsSource .take(expectedMessages) .map( msg -> { if (msg instanceof TextMessage) { TextMessage t = (TextMessage) msg; return t.getText(); } else throw new RuntimeException("unexpected message type " + msg.getClass()); }) .toMat(Sink.seq(), Keep.both()) .run(materializer); JmsConsumerControl control = controlAndResult.first(); control.shutdown();
Configure JMS consumers
To connect to the JMS broker, first define an appropriate javax.jms.ConnectionFactory
. The Alpakka tests and all examples use Active MQ.
- Scala
-
val connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url)
- Java
-
javax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();
The created ConnectionFactory
is then used for the creation of the different JMS sources.
The JmsConsumerSettings
factories allow for passing the actor system to read from the default alpakka.jms.consumer
section, or you may pass a Config
instance which is resolved to a section of the same structure.
- Scala
-
val consumerConfig: Config = system.settings.config.getConfig(JmsConsumerSettings.configPath) // reiterating defaults from reference.conf val settings = JmsConsumerSettings(consumerConfig, connectionFactory) .withQueue("target-queue") .withCredentials(Credentials("username", "password")) .withConnectionRetrySettings(retrySettings) .withSessionCount(1) .withBufferSize(100) .withAckTimeout(1.second)
- Java
-
Config consumerConfig = config.getConfig(JmsConsumerSettings.configPath()); JmsConsumerSettings settings = JmsConsumerSettings.create(consumerConfig, new ActiveMQConnectionFactory("broker-url")) .withTopic("message-topic") .withCredential(Credentials.create("username", "password")) .withConnectionRetrySettings(retrySettings) .withSessionCount(10) .withAcknowledgeMode(AcknowledgeMode.AutoAcknowledge()) .withSelector("Important = TRUE");
The Alpakka JMS consumer is configured via default settings in the HOCON config file section alpakka.jms.consumer
in your application.conf
, and settings may be tweaked in the code using the withXyz
methods. On the second tab the section from reference.conf
shows the structure to use for configuring multiple set-ups.
- Table
-
Setting Description Default Value connectionFactory Factory to use for creating JMS connections Must be set in code destination Destination (queue or topic) to send JMS messages to Must be set in code credentials JMS broker credentials Empty connectionRetrySettings Retry characteristics if the connection failed to be established or is taking a long time. See Connection Retries sessionCount Number of parallel sessions to use for receiving JMS messages. defaults to 1
bufferSize Maximum number of messages to prefetch before applying backpressure. 100 ackTimeout For use with JMS transactions, only: maximum time given to a message to be committed or rolled back. 1 second selector JMS selector expression (see below) Empty - reference.conf
-
# Jms Consumer Settings # sets default values consumer { # Configure connection retrying by providing settings for ConnectionRetrySettings. connection-retry = ${alpakka.jms.connection-retry} # Credentials to connect to the JMS broker. # credentials { # username = "some text" # password = "some text" # } # "off" to not use any credentials. credentials = off # Number of parallel sessions to use for receiving JMS messages. session-count = 1 # Buffer size for maximum number for messages read from JMS when there is no demand # (or acks are pending for acknowledged consumers). buffer-size = 100 # JMS selector expression. # See https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html # empty string for unset selector = "" # optional # Set an explicit acknowledge mode. # (Consumers have specific defaults.) # See eg. javax.jms.Session.AUTO_ACKNOWLEDGE # Allowed values: "off", "auto", "client", "duplicates-ok", "session", integer value acknowledge-mode = off # Timeout for acknowledge. # (Used by TX consumers.) ack-timeout = 1 second # For use with transactions, if true the stream fails if Alpakka rolls back the transaction # when `ack-timeout` is hit. fail-stream-on-ack-timeout = false }
Broker specific destinations
To reach out to special features of the JMS broker, destinations can be created as CustomDestination
which takes a factory method for creating destinations.
- Scala
-
def createQueue(destinationName: String): Session => javax.jms.Queue = { (session: Session) => val amqSession = session.asInstanceOf[ActiveMQSession] amqSession.createQueue(s"my-$destinationName") } val jmsSource: Source[javax.jms.Message, JmsConsumerControl] = JmsConsumer( JmsConsumerSettings(consumerConfig, connectionFactory) .withDestination(CustomDestination("custom", createQueue("custom"))) )
- Java
-
Function<javax.jms.Session, javax.jms.Destination> createQueue(String destinationName) { return (session) -> { ActiveMQSession amqSession = (ActiveMQSession) session; try { return amqSession.createQueue("my-" + destinationName); } catch (JMSException e) { throw new RuntimeException(e); } }; } Source<Message, JmsConsumerControl> jmsSource = JmsConsumer.create( JmsConsumerSettings.create(system, connectionFactory) .withDestination(new CustomDestination("custom", createQueue("custom"))));
Using JMS client acknowledgement
Client acknowledgement ensures a message is successfully received by the consumer and notifies the JMS broker for every message. Due to the threading details in JMS brokers, this special source is required (see the explanation below).
- Scala
-
val jmsSource: Source[AckEnvelope, JmsConsumerControl] = JmsConsumer.ackSource( JmsConsumerSettings(consumerConfig, connectionFactory) .withSessionCount(5) .withQueue("numbers") ) val result: Future[immutable.Seq[javax.jms.Message]] = jmsSource .take(msgsIn.size) .map { ackEnvelope => ackEnvelope.acknowledge() ackEnvelope.message } .runWith(Sink.seq)
- Java
-
ConnectionFactory connectionFactory = server.createConnectionFactory(); Source<akka.stream.alpakka.jms.AckEnvelope, JmsConsumerControl> jmsSource = JmsConsumer.ackSource( JmsConsumerSettings.create(system, connectionFactory) .withSessionCount(5) .withQueue("test")); CompletionStage<List<javax.jms.Message>> result = jmsSource .take(msgsIn.size()) .map( envelope -> { envelope.acknowledge(); return envelope.message(); }) .runWith(Sink.seq(), materializer);
The sessionCount
parameter controls the number of JMS sessions to run in parallel.
Notes:
- Using multiple sessions increases throughput, especially if acknowledging message by message is desired.
- Messages may arrive out of order if
sessionCount
is larger than 1. - Message-by-message acknowledgement can be achieved by setting
bufferSize
to 0, thus disabling buffering. The outstanding messages before backpressure will be thesessionCount
. - The default
AcknowledgeMode
isClientAcknowledge
but can be overridden to customAcknowledgeMode
s, even implementation-specific ones by setting theAcknowledgeMode
in theJmsConsumerSettings
when creating the stream.
Using a regular JmsConsumer
with AcknowledgeMode.ClientAcknowledge
and using message.acknowledge()
from the stream is not compliant with the JMS specification and can cause issues for some message brokers. message.acknowledge()
in many cases acknowledges the session and not the message itself, contrary to what the API makes you believe.
Use this JmsConsumer.ackSource
as shown above instead.
Using JMS transactions
JMS transactions may be used with this connector. Be aware that transactions are a heavy-weight tool and may not perform very good.
- Scala
-
val jmsSource: Source[TxEnvelope, JmsConsumerControl] = JmsConsumer.txSource( JmsConsumerSettings(consumerConfig, connectionFactory) .withSessionCount(5) .withAckTimeout(1.second) .withQueue("numbers") ) val result: Future[immutable.Seq[javax.jms.Message]] = jmsSource .take(msgsIn.size) .map { txEnvelope => txEnvelope.commit() txEnvelope.message } .runWith(Sink.seq)
- Java
-
ConnectionFactory connectionFactory = server.createConnectionFactory(); Source<akka.stream.alpakka.jms.TxEnvelope, JmsConsumerControl> jmsSource = JmsConsumer.txSource( JmsConsumerSettings.create(system, connectionFactory) .withSessionCount(5) .withAckTimeout(Duration.ofSeconds(1)) .withQueue("test")); CompletionStage<List<javax.jms.Message>> result = jmsSource .take(msgsIn.size()) .map( txEnvelope -> { txEnvelope.commit(); return txEnvelope.message(); }) .runWith(Sink.seq(), materializer);
The sessionCount
parameter controls the number of JMS sessions to run in parallel.
The ackTimeout
parameter controls the maximum time given to a message to be committed or rolled back. If the message times out it will automatically be rolled back. This is to prevent stream from starvation if the application fails to commit or rollback a message, or if the message errors out and the stream is resumed by a decider
.
Notes:
- Higher throughput is achieved by increasing the
sessionCount
. - Messages will arrive out of order if
sessionCount
is larger than 1. - Buffering is not supported in transaction mode. The
bufferSize
is ignored. - The default
AcknowledgeMode
isSessionTransacted
but can be overridden to customAcknowledgeMode
s, even implementation-specific ones by setting theAcknowledgeMode
in theJmsConsumerSettings
when creating the stream.
Using JMS selectors
Create a javax.jms.Message
source specifying a JMS selector expression: Verify that we are only receiving messages according to the selector:
- Scala
-
val jmsSource = JmsConsumer( JmsConsumerSettings(consumerConfig, connectionFactory) .withQueue("numbers") .withSelector("IsOdd = TRUE") )
- Java
-
Source<Message, JmsConsumerControl> jmsSource = JmsConsumer.create( JmsConsumerSettings.create(system, connectionFactory) .withQueue("test") .withSelector("IsOdd = TRUE"));
Raw JVM type sources
Stream element type | Alpakka source factory |
---|---|
String | JmsConsumer.textSource |
Array[Byte]byte[] | JmsConsumer.bytesSource |
Map[String, AnyRef]Map<String, Object> | JmsConsumer.mapSource |
Object (java.io.Serializable ) |
JmsConsumer.objectSource |
Text sources
The textSource
emits the received message body as String:
- Scala
-
val connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url) val jmsSource: Source[String, JmsConsumerControl] = JmsConsumer.textSource( JmsConsumerSettings(system, connectionFactory).withQueue("test") ) val result: Future[immutable.Seq[String]] = jmsSource.take(in.size).runWith(Sink.seq)
- Java
-
javax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory(); Source<String, JmsConsumerControl> jmsSource = JmsConsumer.textSource( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); CompletionStage<List<String>> result = jmsSource.take(in.size()).runWith(Sink.seq(), materializer);
Byte array sources
The bytesSource
emits the received message body as byte array:
- Scala
-
val jmsSource: Source[Array[Byte], JmsConsumerControl] = JmsConsumer.bytesSource( JmsConsumerSettings(system, connectionFactory).withQueue("test") ) val result: Future[Array[Byte]] = jmsSource .take(1) .runWith(Sink.head)
- Java
-
ConnectionFactory connectionFactory = server.createConnectionFactory(); Source<byte[], JmsConsumerControl> jmsSource = JmsConsumer.bytesSource( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); CompletionStage<byte[]> result = jmsSource.take(1).runWith(Sink.head(), materializer);
Map sources
The mapSource
emits the received message body as Map[String, Object]Map<String, Object>:
- Scala
-
val jmsSource: Source[Map[String, Any], JmsConsumerControl] = JmsConsumer.mapSource( JmsConsumerSettings(system, connectionFactory).withQueue("test") ) val result: Future[immutable.Seq[Map[String, Any]]] = jmsSource .take(1) .runWith(Sink.seq)
- Java
-
ConnectionFactory connectionFactory = server.createConnectionFactory(); Source<Map<String, Object>, JmsConsumerControl> jmsSource = JmsConsumer.mapSource( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); CompletionStage<Map<String, Object>> resultStage = jmsSource.take(1).runWith(Sink.head(), materializer);
Object sources
The objectSource
emits the received message body as deserialized JVM instance. As serialization may be a security concern, JMS clients require special configuration to allow this. The example shows how to configure ActiveMQ connection factory to support serialization. See ActiveMQ Security for more information on this.
- Scala
-
val connectionFactory = connFactory.asInstanceOf[ActiveMQConnectionFactory] connectionFactory.setTrustedPackages(List(classOf[DummyObject].getPackage.getName).asJava) val jmsSource: Source[java.io.Serializable, JmsConsumerControl] = JmsConsumer.objectSource( JmsConsumerSettings(system, connectionFactory).withQueue("test") ) val result: Future[java.io.Serializable] = jmsSource .take(1) .runWith(Sink.head)
- Java
-
ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) server.createConnectionFactory(); connectionFactory.setTrustedPackages( Arrays.asList(DummyJavaTests.class.getPackage().getName())); Source<java.io.Serializable, JmsConsumerControl> jmsSource = JmsConsumer.objectSource( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); CompletionStage<java.io.Serializable> result = jmsSource.take(1).runWith(Sink.head(), materializer);
Request / Reply
The request / reply pattern can be implemented by streaming a JmsConsumer
JmsConsumer
to a JmsProducer
JmsProducer
, with a stage in between that extracts the ReplyTo
and CorrelationID
from the original message and adds them to the response.
- Scala
-
val respondStreamControl: JmsConsumerControl = JmsConsumer(JmsConsumerSettings(system, connectionFactory).withQueue("test")) .collect { case message: TextMessage => JmsTextMessage(message) } .map { textMessage => textMessage.headers.foldLeft(JmsTextMessage(textMessage.body.reverse)) { case (acc, rt: JmsReplyTo) => acc.to(rt.jmsDestination) case (acc, cId: JmsCorrelationId) => acc.withHeader(cId) case (acc, _) => acc } } .via { JmsProducer.flow( JmsProducerSettings(system, connectionFactory).withQueue("ignored") ) } .to(Sink.ignore) .run()
- Java
-
JmsConsumerControl respondStreamControl = JmsConsumer.create( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")) .map(JmsMessageFactory::create) .collectType(JmsTextMessage.class) .map( textMessage -> { JmsTextMessage m = JmsTextMessage.create(reverse.apply(textMessage.body())); for (JmsHeader h : textMessage.getHeaders()) if (h.getClass().equals(JmsReplyTo.class)) m = m.to(((JmsReplyTo) h).jmsDestination()); else if (h.getClass().equals(JmsCorrelationId.class)) m = m.withHeader(h); return m; }) .via( JmsProducer.flow( JmsProducerSettings.create(system, connectionFactory) .withQueue("ignored"))) .to(Sink.ignore()) .run(materializer);