JMS Connector

The JMS connector provides Akka Stream sources and sinks to connect to JMS providers.

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-jms" % "0.11"
libraryDependencies += "javax.jms" % "jms" % "1.1"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-jms_2.12</artifactId>
  <version>0.11</version>
</dependency>
<dependency>
  <groupId>javax.jms</groupId>
  <artifactId>jms</artifactId>
  <version>1.1</version>
</dependency>
Gradle
dependencies {
  compile group: "com.lightbend.akka", name: "akka-stream-alpakka-jms_2.12", version: "0.11"
  compile group: 'javax.jms', name: 'jms', version: '1.1'
}

Usage

The JMS message model supports several types of message body (see @extref(javaee-api:javax.jms.Message)) and Alpakka currently supports messages with a body containing a String object.

Use the case class JmsTextMessage to wrap the messages you want to send and optionally set their properties (see below for an example).

Sending messages to a JMS provider

First define a jms ConnectionFactory depending on the implementation you’re using. Here we’re using Active MQ.

Scala
val connectionFactory = new ActiveMQConnectionFactory(url)
Java
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ctx.url);

Create a sink, that accepts and forwards Strings to the JMS provider.

Scala
val jmsSink: Sink[String, NotUsed] = JmsSink.textSink(
  JmsSinkSettings(connectionFactory).withQueue("test")
)
Java
Sink<String, NotUsed> jmsSink = JmsSink.textSink(
        JmsSinkSettings
                .create(connectionFactory)
                .withQueue("test")
);

JmsSink contains factory methods to facilitate the creation of sinks.

Last step is to materialize and run the sink(s) we have created.

Scala
val in = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")
Source(in).runWith(jmsSink)
Java
List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
Source.from(in).runWith(jmsSink, materializer);

Sending messages with properties to a JMS provider

Create a sink, that accepts and forwards JmsTextMessages to the JMS provider:

Scala
val msgsIn = (1 to 10).toList.map { n =>
  JmsTextMessage(n.toString).add("Number", n).add("IsOdd", n % 2 == 1).add("IsEven", n % 2 == 0)
}
Java
private List<JmsTextMessage> createTestMessageList() {
    List<Integer> intsIn = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    List<JmsTextMessage> msgsIn = new ArrayList<>();
    for(Integer n: intsIn) {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put("Number", n);
        properties.put("IsOdd", n % 2 == 1);
        properties.put("IsEven", n % 2 == 0);

        msgsIn.add(JmsTextMessage.create(n.toString(), properties));
    }

    return msgsIn;
}
List<JmsTextMessage> msgsIn = createTestMessageList();

Receiving String messages from a JMS provider

Create a source:

Scala
val jmsSource: Source[String, NotUsed] = JmsSource.textSource(
  JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("test")
)
Java
Source<String, NotUsed> jmsSource = JmsSource
        .textSource(JmsSourceSettings
                .create(connectionFactory)
                .withQueue("test")
                .withBufferSize(10)
        );

The bufferSize parameter controls the maximum number of messages to prefetch before applying backpressure.

Run the source and take the same amount of messages as we previously sent to it.

Scala
val result = jmsSource.take(in.size).runWith(Sink.seq)
Java
CompletionStage<List<String>> result = jmsSource
        .take(in.size())
        .runWith(Sink.seq(), materializer);

Receiving javax.jms.Messages from a JMS provider

Create a javax.jms.Message source:

Scala
val jmsSource: Source[Message, NotUsed] = JmsSource(
  JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("numbers")
)
Java
Source<Message, NotUsed> jmsSource = JmsSource.create(JmsSourceSettings
        .create(connectionFactory)
        .withQueue("test")
        .withBufferSize(10)
);

The bufferSize parameter controls the maximum number of messages to prefetch before applying backpressure.

Run the source and specify the amount of messages to take:

Scala
val result = jmsSource.take(msgsIn.size).runWith(Sink.seq)
Java
CompletionStage<List<Message>> result = jmsSource
        .take(msgsIn.size())
        .runWith(Sink.seq(), materializer);

Receiving javax.jms.Messages from a JMS provider with a selector

Create a javax.jms.Message source specifying a JMS selector expression:

Scala
val jmsSource = JmsSource(
  JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("numbers").withSelector("IsOdd = TRUE")
)
Java
Source<Message, NotUsed> jmsSource = JmsSource.create(JmsSourceSettings
        .create(connectionFactory)
        .withQueue("test")
        .withBufferSize(10)
        .withSelector("IsOdd = TRUE")
);

Verify that we are only receiving messages according to the selector:

Scala
val oddMsgsIn = msgsIn.filter(msg => msg.body.toInt % 2 == 1)
val result = jmsSource.take(oddMsgsIn.size).runWith(Sink.seq)
// We should have only received the odd numbers in the list
result.futureValue.zip(oddMsgsIn).foreach {
  case (out, in) =>
    out.getIntProperty("Number") shouldEqual in.properties("Number")
    out.getBooleanProperty("IsOdd") shouldEqual in.properties("IsOdd")
    out.getBooleanProperty("IsEven") shouldEqual in.properties("IsEven")
    // Make sure we are only receiving odd numbers
    out.getIntProperty("Number") % 2 shouldEqual 1
}
Java
List<JmsTextMessage> oddMsgsIn = msgsIn.stream()
        .filter(msg -> Integer.valueOf(msg.body()) % 2 == 1)
        .collect(Collectors.toList());
assertEquals(5, oddMsgsIn.size());

CompletionStage<List<Message>> result = jmsSource
        .take(oddMsgsIn.size())
        .runWith(Sink.seq(), materializer);

List<Message> outMessages = result.toCompletableFuture().get(4, TimeUnit.SECONDS);
int msgIdx = 0;
for(Message outMsg: outMessages) {
    assertEquals(outMsg.getIntProperty("Number"), oddMsgsIn.get(msgIdx).properties().get("Number").get());
    assertEquals(outMsg.getBooleanProperty("IsOdd"), oddMsgsIn.get(msgIdx).properties().get("IsOdd").get());
    assertEquals(outMsg.getBooleanProperty("IsEven"), (oddMsgsIn.get(msgIdx).properties().get("IsEven").get()));
    assertEquals(1, outMsg.getIntProperty("Number") % 2);
    msgIdx++;
}

Using Topic with an JMS provider

You can use JMS topic in a very similar way.

For the Sink :

Scala
val jmsTopicSink: Sink[String, NotUsed] = JmsSink.textSink(
  JmsSinkSettings(connectionFactory).withTopic("topic")
)
Java
Sink<String, NotUsed> jmsTopicSink = JmsSink.textSink(
        JmsSinkSettings
                .create(connectionFactory)
                .withTopic("topic")
);

For the source :

Scala
val jmsTopicSource: Source[String, NotUsed] = JmsSource.textSource(
  JmsSourceSettings(connectionFactory).withBufferSize(10).withTopic("topic")
)
Java
Source<String, NotUsed> jmsTopicSource = JmsSource
        .textSource(JmsSourceSettings
                .create(connectionFactory)
                .withTopic("topic")
                .withBufferSize(10)
        );

Such sink and source can be started the same way as in the previous example.

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> jms/testOnly *.JmsConnectorsSpec
Java
sbt
> jms/testOnly *.JmsConnectorsTest
The source code for this page can be found here.