JMS Connector

The JMS connector provides Akka Stream sources and sinks to connect to JMS providers.

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-jms" % "0.12"
libraryDependencies += "javax.jms" % "jms" % "1.1"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-jms_2.12</artifactId>
  <version>0.12</version>
</dependency>
<dependency>
  <groupId>javax.jms</groupId>
  <artifactId>jms</artifactId>
  <version>1.1</version>
</dependency>
Gradle
dependencies {
  compile group: "com.lightbend.akka", name: "akka-stream-alpakka-jms_2.12", version: "0.12"
  compile group: 'javax.jms', name: 'jms', version: '1.1'
}

Usage

The JMS message model supports several types of message body (see @extref(javaee-api:javax.jms.Message)) and Alpakka currently supports messages with a body containing a String object.

Use the case class JmsTextMessage to wrap the messages you want to send and optionally set their properties (see below for an example).

Sending messages to a JMS provider

First define a jms ConnectionFactory depending on the implementation you’re using. Here we’re using Active MQ.

Scala
val connectionFactory = new ActiveMQConnectionFactory(url)
Java
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ctx.url);

Create a sink, that accepts and forwards Strings to the JMS provider.

Scala
val jmsSink: Sink[String, NotUsed] = JmsSink.textSink(
  JmsSinkSettings(connectionFactory).withQueue("test")
)
Java
Sink<String, NotUsed> jmsSink = JmsSink.textSink(
        JmsSinkSettings
                .create(connectionFactory)
                .withQueue("test")
);

JmsSink contains factory methods to facilitate the creation of sinks.

Last step is to materialize and run the sink(s) we have created.

Scala
val in = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")
Source(in).runWith(jmsSink)
Java
List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
Source.from(in).runWith(jmsSink, materializer);

Sending messages with properties to a JMS provider

Create a sink, that accepts and forwards JmsTextMessages to the JMS provider:

Scala
val msgsIn = (1 to 10).toList.map { n =>
  JmsTextMessage(n.toString).add("Number", n).add("IsOdd", n % 2 == 1).add("IsEven", n % 2 == 0)
}
Java
private List<JmsTextMessage> createTestMessageList() {
    List<Integer> intsIn = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    List<JmsTextMessage> msgsIn = new ArrayList<>();
    for(Integer n: intsIn) {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put("Number", n);
        properties.put("IsOdd", n % 2 == 1);
        properties.put("IsEven", n % 2 == 0);

        msgsIn.add(JmsTextMessage.create(n.toString(), properties));
    }

    return msgsIn;
}
List<JmsTextMessage> msgsIn = createTestMessageList();

Receiving String messages from a JMS provider

Create a source:

Scala
val jmsSource: Source[String, NotUsed] = JmsSource.textSource(
  JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("test")
)
Java
Source<String, NotUsed> jmsSource = JmsSource
        .textSource(JmsSourceSettings
                .create(connectionFactory)
                .withQueue("test")
                .withBufferSize(10)
        );

The bufferSize parameter controls the maximum number of messages to prefetch before applying backpressure.

Run the source and take the same amount of messages as we previously sent to it.

Scala
val result = jmsSource.take(in.size).runWith(Sink.seq)
Java
CompletionStage<List<String>> result = jmsSource
        .take(in.size())
        .runWith(Sink.seq(), materializer);

Receiving javax.jms.Messages from a JMS provider

Create a javax.jms.Message source:

Scala
val jmsSource: Source[Message, NotUsed] = JmsSource(
  JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("numbers")
)
Java
Source<Message, NotUsed> jmsSource = JmsSource.create(JmsSourceSettings
        .create(connectionFactory)
        .withQueue("test")
        .withBufferSize(10)
);

The bufferSize parameter controls the maximum number of messages to prefetch before applying backpressure.

Run the source and specify the amount of messages to take:

Scala
val result = jmsSource.take(msgsIn.size).runWith(Sink.seq)
Java
CompletionStage<List<Message>> result = jmsSource
        .take(msgsIn.size())
        .runWith(Sink.seq(), materializer);

Receiving javax.jms.Messages from a JMS provider with a selector

Create a javax.jms.Message source specifying a JMS selector expression:

Scala
val jmsSource = JmsSource(
  JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("numbers").withSelector("IsOdd = TRUE")
)
Java
Source<Message, NotUsed> jmsSource = JmsSource.create(JmsSourceSettings
        .create(connectionFactory)
        .withQueue("test")
        .withBufferSize(10)
        .withSelector("IsOdd = TRUE")
);

Verify that we are only receiving messages according to the selector:

Scala
val oddMsgsIn = msgsIn.filter(msg => msg.body.toInt % 2 == 1)
val result = jmsSource.take(oddMsgsIn.size).runWith(Sink.seq)
// We should have only received the odd numbers in the list
result.futureValue.zip(oddMsgsIn).foreach {
  case (out, in) =>
    out.getIntProperty("Number") shouldEqual in.properties("Number")
    out.getBooleanProperty("IsOdd") shouldEqual in.properties("IsOdd")
    out.getBooleanProperty("IsEven") shouldEqual in.properties("IsEven")
    // Make sure we are only receiving odd numbers
    out.getIntProperty("Number") % 2 shouldEqual 1
}
Java
List<JmsTextMessage> oddMsgsIn = msgsIn.stream()
        .filter(msg -> Integer.valueOf(msg.body()) % 2 == 1)
        .collect(Collectors.toList());
assertEquals(5, oddMsgsIn.size());

CompletionStage<List<Message>> result = jmsSource
        .take(oddMsgsIn.size())
        .runWith(Sink.seq(), materializer);

List<Message> outMessages = result.toCompletableFuture().get(4, TimeUnit.SECONDS);
int msgIdx = 0;
for(Message outMsg: outMessages) {
    assertEquals(outMsg.getIntProperty("Number"), oddMsgsIn.get(msgIdx).properties().get("Number").get());
    assertEquals(outMsg.getBooleanProperty("IsOdd"), oddMsgsIn.get(msgIdx).properties().get("IsOdd").get());
    assertEquals(outMsg.getBooleanProperty("IsEven"), (oddMsgsIn.get(msgIdx).properties().get("IsEven").get()));
    assertEquals(1, outMsg.getIntProperty("Number") % 2);
    msgIdx++;
}

Using Topic with an JMS provider

You can use JMS topic in a very similar way.

For the Sink :

Scala
val jmsTopicSink: Sink[String, NotUsed] = JmsSink.textSink(
  JmsSinkSettings(connectionFactory).withTopic("topic")
)
Java
Sink<String, NotUsed> jmsTopicSink = JmsSink.textSink(
        JmsSinkSettings
                .create(connectionFactory)
                .withTopic("topic")
);

For the source :

Scala
val jmsTopicSource: Source[String, NotUsed] = JmsSource.textSource(
  JmsSourceSettings(connectionFactory).withBufferSize(10).withTopic("topic")
)
Java
Source<String, NotUsed> jmsTopicSource = JmsSource
        .textSource(JmsSourceSettings
                .create(connectionFactory)
                .withTopic("topic")
                .withBufferSize(10)
        );

Such sink and source can be started the same way as in the previous example.

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> jms/testOnly *.JmsConnectorsSpec
Java
sbt
> jms/testOnly *.JmsConnectorsTest

Using IBM MQ

You can use IBM MQ like any other JMS Provider by creating a QueueConnectionFactory or a TopicConnectionFactory and creating a JmsSourceSettings or JmsSinkSettings from it. The below snippets have been tested with a default IBM MQ docker image which contains queues and topics for testing. The following command starts MQ 9 using docker:

docker run --env LICENSE=accept --env MQ_QMGR_NAME=QM1 --publish 1414:1414 --publish 9443:9443 ibmcom/mq:9

MQ settings for this image are shown here: https://github.com/ibm-messaging/mq-docker#mq-developer-defaults

Create a JmsSource to an IBM MQ Queue

The MQQueueConnectionFactory needs a queue manager name and a channel name, the docker command used in the previous section sets up a QM1 queue manager and a DEV.APP.SVRCONN channel. The IBM MQ client makes it possible to connect to the MQ server over TCP/IP or natively through JNI (when the client and server run on the same machine). In the examples below we have chosen to use TCP/IP, which is done by setting the transport type to CommonConstants.WMQ_CM_CLIENT.

Scala
import com.ibm.mq.jms.MQQueueConnectionFactory
import com.ibm.msg.client.wmq.common.CommonConstants
val QueueManagerName = "QM1"
val TestChannelName = "DEV.APP.SVRCONN"
// Create the IBM MQ QueueConnectionFactory
val queueConnectionFactory = new MQQueueConnectionFactory()
queueConnectionFactory.setQueueManager(QueueManagerName)
queueConnectionFactory.setChannel(TestChannelName)
// Connect to IBM MQ over TCP/IP
queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT)
val TestQueueName = "DEV.QUEUE.1"
val jmsSource: Source[String, NotUsed] = JmsSource.textSource(
  JmsSourceSettings(queueConnectionFactory).withQueue(TestQueueName)
)
Java
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.msg.client.wmq.common.CommonConstants;
String queueManagerName = "QM1";
String testChannelName = "DEV.APP.SVRCONN";
// Create the IBM MQ QueueConnectionFactory
MQQueueConnectionFactory queueConnectionFactory = new MQQueueConnectionFactory();
queueConnectionFactory.setQueueManager(queueManagerName);
queueConnectionFactory.setChannel(testChannelName);
// Connect to IBM MQ over TCP/IP
queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);
String testQueueName = "DEV.QUEUE.1";
Source<String, NotUsed> jmsSource = JmsSource.textSource(
  JmsSourceSettings
    .create(queueConnectionFactory)
    .withQueue(testQueueName)
);

Create a JmsSink to an IBM MQ Topic

The IBM MQ docker container sets up a dev/ topic, which is used in the example below.

Scala
import com.ibm.mq.jms.MQTopicConnectionFactory
import com.ibm.msg.client.wmq.common.CommonConstants
val QueueManagerName = "QM1"
val TestChannelName = "DEV.APP.SVRCONN"
// Create the IBM MQ TopicConnectionFactory
val topicConnectionFactory = new MQTopicConnectionFactory()
topicConnectionFactory.setQueueManager(QueueManagerName)
topicConnectionFactory.setChannel(TestChannelName)
// Connect to IBM MQ over TCP/IP
topicConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT)
val TestTopicName = "dev/"
val jmsTopicSink: Sink[String, NotUsed] = JmsSink(
  JmsSinkSettings(topicConnectionFactory).withTopic(TestTopicName)
)
Java
import com.ibm.mq.jms.MQTopicConnectionFactory;
import com.ibm.msg.client.wmq.common.CommonConstants;
String queueManagerName = "QM1";
String testChannelName = "DEV.APP.SVRCONN";
// Create the IBM MQ TopicConnectionFactory
val topicConnectionFactory = new MQTopicConnectionFactory();
topicConnectionFactory.setQueueManager(queueManagerName);
topicConnectionFactory.setChannel(testChannelName);
// Connect to IBM MQ over TCP/IP
topicConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);
String testTopicName = "dev/";
Sink<String, NotUsed> jmsTopicSink = JmsSink.textSink(
  JmsSinkSettings
    .create(topicConnectionFactory)
    .withTopic(testTopicName)
);
The source code for this page can be found here.