JMS Connector

The JMS connector provides Akka Stream sources and sinks to connect to JMS providers.

Reported issues

Tagged issues at Github

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-jms" % "0.19"
libraryDependencies += "javax.jms" % "jms" % "1.1"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-jms_2.12</artifactId>
  <version>0.19</version>
</dependency>
<dependency>
  <groupId>javax.jms</groupId>
  <artifactId>jms</artifactId>
  <version>1.1</version>
</dependency>
Gradle
dependencies {
  compile group: "com.lightbend.akka", name: "akka-stream-alpakka-jms_2.12", version: "0.19"
  compile group: 'javax.jms', name: 'jms', version: '1.1'
}

Usage

The JMS message model supports several types of message body (see javax.jms.Message) and Alpakka currently supports messages with a body containing a String, a Serializable object, a Map or a byte array.

First define a javax.jms.ConnectionFactory depending on the implementation you’re using. Here we’re using Active MQ.

Scala
val connectionFactory: javax.jms.ConnectionFactory = new ActiveMQConnectionFactory(ctx.url)
Java
javax.jms.ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ctx.url);

The created ConnectionFactory is then used for the creation of the different jms sinks or sources (see below).

Sending messages to a JMS provider

Use a case class with the subtype of JmsMessage to wrap the messages you want to send and optionally set their properties. JmsProducerJmsProducer contains factory methods to facilitate the creation of sinks according to the message type (see below for an example).

Sending text messages to a JMS provider

Create a sink, that accepts and forwards JmsTextMessages to the JMS provider:

Scala
val jmsSink: Sink[String, Future[Done]] = JmsProducer.textSink(
  JmsProducerSettings(connectionFactory).withQueue("test")
)
Java
Sink<String, CompletionStage<Done>> jmsSink = JmsProducer
    .textSink(
        JmsProducerSettings
            .create(connectionFactory)
            .withQueue("test")
    );

Last step is to materialize and run the sink(s) we have created.

Scala
val in = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")
Source(in).runWith(jmsSink)
Java
List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
CompletionStage<Done> finished = Source
    .from(in)
    .runWith(jmsSink, materializer);

Sending byte messages to a JMS provider

Create a sink, that accepts and forwards JmsByteMessages to the JMS provider.

Scala
val jmsSink: Sink[Array[Byte], Future[Done]] = JmsProducer.bytesSink(
  JmsProducerSettings(connectionFactory).withQueue("test")
)
Java
Sink<byte[], CompletionStage<Done>> jmsSink = JmsProducer
    .bytesSink(
        JmsProducerSettings
            .create(connectionFactory)
            .withQueue("test")
    );

Last step is to materialize and run the sink(s) we have created.

Scala
val in = "ThisIsATest".getBytes(Charset.forName("UTF-8"))
Source.single(in).runWith(jmsSink)
Java
byte[] in = "ThisIsATest".getBytes(Charset.forName("UTF-8"));
CompletionStage<Done> finished = Source
        .single(in)
        .runWith(jmsSink, materializer);

Sending map messages to a JMS provider

Create a sink, that accepts and forwards JmsMapMessages to the JMS provider:

Scala
val jmsSink: Sink[Map[String, Any], Future[Done]] = JmsProducer.mapSink(
  JmsProducerSettings(connectionFactory).withQueue("test")
)
Java
Sink<Map<String, Object>, CompletionStage<Done>> jmsSink = JmsProducer
    .mapSink(
        JmsProducerSettings
            .create(connectionFactory)
            .withQueue("test")
    );

Last step is to materialize and run the sink(s) we have created.

Scala
val input = List(
  Map[String, Any](
    "string" -> "value",
    "int value" -> 42,
    "double value" -> 43.toDouble,
    "short value" -> 7.toShort,
    "boolean value" -> true,
    "long value" -> 7.toLong,
    "bytearray" -> "AStringAsByteArray".getBytes(Charset.forName("UTF-8")),
    "byte" -> 1.toByte
  )
)

Source(input).runWith(jmsSink)
Java
Map<String, Object> in = new HashMap<>();
in.put("string value", "value");
in.put("int value", 42);
in.put("double value", 43.0);
in.put("short value", (short) 7);
in.put("boolean value", true);
in.put("long value", 7L);
in.put("bytearray", "AStringAsByteArray".getBytes(Charset.forName("UTF-8")));
in.put("byte", (byte) 1);

CompletionStage<Done> finished = Source
    .single(in)
    .runWith(jmsSink, materializer);

Sending object messages to a JMS provider

Create and configure ActiveMQ connection factory to support serialization. See ActiveMQ Security for more information on this.

Scala
val connectionFactory = new ActiveMQConnectionFactory(ctx.url)
connectionFactory.setTrustedPackages(List(classOf[DummyObject].getPackage.getName).asJava)
Java
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ctx.url);
connectionFactory.setTrustedPackages(
    Arrays.asList(
        DummyJavaTests.class.getPackage().getName()
    ));

Create a sink, that accepts and forwards JmsObjectMessages to the JMS provider:

Scala
val jmsSink: Sink[Serializable, Future[Done]] = JmsProducer.objectSink(
  JmsProducerSettings(connectionFactory).withQueue("test")
)
Java
Sink<java.io.Serializable, CompletionStage<Done>> jmsSink = JmsProducer
    .objectSink(
        JmsProducerSettings
            .create(connectionFactory)
            .withQueue("test")
    );

Last step is to materialize and run the sink(s) we have created.

Scala

val in = DummyObject("ThisIsATest") Source.single(in).runWith(jmsSink)
Java
java.io.Serializable in = new DummyJavaTests("javaTest");
CompletionStage<Done> finished = Source
    .single(in)
    .runWith(jmsSink, materializer);

Sending messages with properties to a JMS provider

For every JmsMessage you can set jms properties.

Scala
val msgsIn = (1 to 10).toList.map { n =>
  JmsTextMessage(n.toString)
    .withProperty("Number", n)
    .withProperty("IsOdd", n % 2 == 1)
    .withProperty("IsEven", n % 2 == 0)
}
Java
JmsTextMessage message = JmsTextMessage
    .create(n.toString())
        .withProperty("Number", n)
        .withProperty("IsOdd", n % 2 == 1)
        .withProperty("IsEven", n % 2 == 0);

Sending messages with header to a JMS provider

For every JmsMessage you can set also jms headers.

Scala
val msgsIn = (1 to 10).toList.map { n =>
  JmsTextMessage(n.toString)
    .withHeader(JmsType("type"))
    .withHeader(JmsCorrelationId("correlationId"))
    .withHeader(JmsReplyTo.queue("test-reply"))
    .withHeader(JmsTimeToLive(FiniteDuration(999, TimeUnit.SECONDS)))
    .withHeader(JmsPriority(2))
    .withHeader(JmsDeliveryMode(DeliveryMode.NON_PERSISTENT))
}
Java
List<JmsTextMessage> msgsIn = createTestMessageList().stream()
    .map(jmsTextMessage -> jmsTextMessage
        .withHeader(JmsType.create("type"))
        .withHeader(JmsCorrelationId.create("correlationId"))
        .withHeader(JmsReplyTo.queue("test-reply"))
        .withHeader(JmsTimeToLive.create(999, TimeUnit.SECONDS))
        .withHeader(JmsPriority.create(2))
        .withHeader(JmsDeliveryMode.create(DeliveryMode.NON_PERSISTENT)))
    .collect(Collectors.toList());

Sending messages as a Flow

The producer can also act as a flow, in order to publish messages in the middle of stream processing. For example, you can ensure that a message is persisted to the queue before subsequent processing.

Create a flow:

Scala
val flowSink: Flow[JmsMessage, JmsMessage, NotUsed] = JmsProducer.flow(
  JmsProducerSettings(connectionFactory).withQueue("test")
)
Java
Flow<JmsTextMessage, JmsTextMessage, NotUsed> flowSink = JmsProducer.flow(
        JmsProducerSettings.create(connectionFactory).withQueue("test")
);

Run the flow:

Scala
val input = (1 to 100).map(i => JmsTextMessage(i.toString))

val result = Source(input)
  .via(flowSink)
  .runWith(Sink.seq)
Java
List<JmsTextMessage> input = createTestMessageList();

CompletionStage<List<JmsTextMessage>> result = Source.from(input)
        .via(flowSink)
        .runWith(Sink.seq(), materializer);

Receiving messages from a JMS provider

JmsConsumerJmsConsumer contains factory methods to facilitate the creation of sinks according to the message type (see below for an example).

Receiving String messages from a JMS provider

Create a source:

Scala
val jmsSource: Source[String, KillSwitch] = JmsConsumer.textSource(
  JmsConsumerSettings(connectionFactory).withBufferSize(10).withQueue("test")
)
Java
Source<String, KillSwitch> jmsSource = JmsConsumer
    .textSource(JmsConsumerSettings
        .create(connectionFactory)
        .withQueue("test")
        .withBufferSize(10)
    );

The bufferSize parameter controls the maximum number of messages to prefetch before applying backpressure.

Run the source and take the same amount of messages as we previously sent to it.

Scala
val result = jmsSource.take(in.size).runWith(Sink.seq)
Java
CompletionStage<List<String>> result = jmsSource
    .take(in.size())
    .runWith(Sink.seq(), materializer);

Receiving byte array messages from a JMS provider

Create a source:

Scala
val jmsSource: Source[Array[Byte], KillSwitch] = JmsConsumer.bytesSource(
  JmsConsumerSettings(connectionFactory).withQueue("test")
)
Java
Source<byte[], KillSwitch> jmsSource = JmsConsumer
    .bytesSource(
        JmsConsumerSettings
            .create(connectionFactory)
            .withQueue("test")
    );

The bufferSize parameter controls the maximum number of messages to prefetch before applying backpressure.

Run the source and take the same amount of messages as we previously sent to it.

Scala
val result = jmsSource.take(1).runWith(Sink.head)
Java
CompletionStage<byte[]> result = jmsSource
    .take(1)
    .runWith(Sink.head(), materializer);

Receiving Serializable object messages from a JMS provider

Create and configure ActiveMQ connection factory to support serialization. See ActiveMQ Security for more information on this.

Scala
val connectionFactory = new ActiveMQConnectionFactory(ctx.url)
connectionFactory.setTrustedPackages(List(classOf[DummyObject].getPackage.getName).asJava)
Java
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ctx.url);
connectionFactory.setTrustedPackages(
    Arrays.asList(
        DummyJavaTests.class.getPackage().getName()
    ));

Create a source:

Scala
val jmsSource: Source[java.io.Serializable, KillSwitch] = JmsConsumer.objectSource(
  JmsConsumerSettings(connectionFactory).withQueue("test")
)
Java
Source<java.io.Serializable, KillSwitch> jmsSource = JmsConsumer
    .objectSource(JmsConsumerSettings
        .create(connectionFactory)
        .withQueue("test")
    );

The bufferSize parameter controls the maximum number of messages to prefetch before applying backpressure.

Run the source and take the same amount of messages as we previously sent to it.

Scala
val result = jmsSource.take(1).runWith(Sink.head)
Java
CompletionStage<java.io.Serializable> result = jmsSource
    .take(1)
    .runWith(Sink.head(), materializer);

Receiving Map messages from a JMS provider

Create a source:

Scala
val jmsSource: Source[Map[String, Any], KillSwitch] = JmsConsumer.mapSource(
  JmsConsumerSettings(connectionFactory).withQueue("test")
)
Java
Source<Map<String, Object>, KillSwitch> jmsSource = JmsConsumer
    .mapSource(
        JmsConsumerSettings
            .create(connectionFactory)
            .withQueue("test")
    );

The bufferSize parameter controls the maximum number of messages to prefetch before applying backpressure.

Run the source and take the same amount of messages as we previously sent to it.

Scala
val result = jmsSource.take(1).runWith(Sink.seq)
Java
CompletionStage<Map<String, Object>> resultStage = jmsSource
    .take(1)
    .runWith(Sink.head(), materializer);

Receiving javax.jms.Messages from a JMS provider

Create a source:

Scala
val jmsSource: Source[Message, KillSwitch] = JmsConsumer(
  JmsConsumerSettings(connectionFactory).withBufferSize(10).withQueue("numbers")
)
Java
Source<Message, KillSwitch> jmsSource = JmsConsumer
    .create(
        JmsConsumerSettings
            .create(connectionFactory)
            .withQueue("test")
            .withBufferSize(10)
    );

The bufferSize parameter controls the maximum number of messages to prefetch before applying backpressure.

Run the source and specify the amount of messages to take:

Scala
val result: Future[Seq[Message]] = jmsSource.take(msgsIn.size).runWith(Sink.seq)
Java
CompletionStage<List<Message>> result = jmsSource
    .take(msgsIn.size())
    .runWith(Sink.seq(), materializer);

Notes:

  • The default AcknowledgeMode is AutoAcknowledge but can be overridden to custom AcknowledgeModes, even implementation-specific ones by setting the AcknowledgeMode in the JmsConsumerSettings when creating the stream.

Receiving javax.jms.Messages messages from a JMS provider with Client Acknowledgement

Create a javax.jms.Message source:

Scala
val jmsSource: Source[Message, KillSwitch] = JmsConsumer(
  JmsConsumerSettings(connectionFactory)
    .withQueue("numbers")
    .withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge)
)
Java
Source<Message, KillSwitch> jmsSource = JmsConsumer
    .create(
        JmsConsumerSettings
            .create(connectionFactory)
            .withQueue("test")
            .withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge())
    );

The acknowledgeMode (AcknowledgeMode) parameter controls the JMS acknowledge mode parameter, see javax.jms.Connection.createSession.

Run the source and take the same amount of messages as we previously sent to it acknowledging them.

Scala
val result = jmsSource
  .map {
    case textMessage: TextMessage =>
      val text = textMessage.getText
      textMessage.acknowledge()
      text
  }
  .take(msgsIn.size)
  .runWith(Sink.seq)
Java
CompletionStage<List<String>> result = jmsSource
    .take(msgsIn.size())
    .map(message -> {
        String text = ((ActiveMQTextMessage)message).getText();
        message.acknowledge();
        return text;
    })
    .runWith(Sink.seq(), materializer);

Receiving javax.jms.Messages from a JMS provider with a selector

Create a javax.jms.Message source specifying a JMS selector expression:

Scala
val jmsSource = JmsConsumer(
  JmsConsumerSettings(connectionFactory).withBufferSize(10).withQueue("numbers").withSelector("IsOdd = TRUE")
)
Java
Source<Message, KillSwitch> jmsSource = JmsConsumer
    .create(
        JmsConsumerSettings
            .create(connectionFactory)
            .withQueue("test")
            .withBufferSize(10)
            .withSelector("IsOdd = TRUE")
    );

Verify that we are only receiving messages according to the selector:

Scala
val oddMsgsIn = msgsIn.filter(msg => msg.body.toInt % 2 == 1)
val result = jmsSource.take(oddMsgsIn.size).runWith(Sink.seq)
// We should have only received the odd numbers in the list
result.futureValue.zip(oddMsgsIn).foreach {
  case (out, in) =>
    out.getIntProperty("Number") shouldEqual in.properties("Number")
    out.getBooleanProperty("IsOdd") shouldEqual in.properties("IsOdd")
    out.getBooleanProperty("IsEven") shouldEqual in.properties("IsEven")
    // Make sure we are only receiving odd numbers
    out.getIntProperty("Number") % 2 shouldEqual 1
}
Java
List<JmsTextMessage> oddMsgsIn = msgsIn.stream()
        .filter(msg -> Integer.valueOf(msg.body()) % 2 == 1)
        .collect(Collectors.toList());
assertEquals(5, oddMsgsIn.size());

CompletionStage<List<Message>> result = jmsSource
    .take(oddMsgsIn.size())
    .runWith(Sink.seq(), materializer);

List<Message> outMessages = result.toCompletableFuture().get(4, TimeUnit.SECONDS);
int msgIdx = 0;
for (Message outMsg : outMessages) {
    assertEquals(outMsg.getIntProperty("Number"), oddMsgsIn.get(msgIdx).properties().get("Number").get());
    assertEquals(outMsg.getBooleanProperty("IsOdd"), oddMsgsIn.get(msgIdx).properties().get("IsOdd").get());
    assertEquals(outMsg.getBooleanProperty("IsEven"), (oddMsgsIn.get(msgIdx).properties().get("IsEven").get()));
    assertEquals(1, outMsg.getIntProperty("Number") % 2);
    msgIdx++;
}

Receiving and explicitly acknowledging javax.jms.Messages from a JMS provider

Create a javax.jms.Message source:

Scala
val jmsSource: Source[AckEnvelope, KillSwitch] = JmsConsumer.ackSource(
  JmsConsumerSettings(connectionFactory).withSessionCount(5).withBufferSize(5).withQueue("numbers")
)
Java
Source<AckEnvelope, KillSwitch> jmsSource = JmsConsumer
    .ackSource(JmsConsumerSettings
        .create(connectionFactory)
        .withSessionCount(5)
        .withBufferSize(5)
        .withQueue("test")
    );

The sessionCount parameter controls the number of JMS sessions to run in parallel.

The bufferSize parameter controls the maximum number of messages each JMS session will prefetch and awaiting acknowledgement before applying backpressure.

Run the source and specify the amount of messages to take:

Scala
val result = jmsSource
  .take(msgsIn.size)
  .map { env =>
    env.acknowledge()
    env.message
  }
  .runWith(Sink.seq)
Java
CompletionStage<List<Message>> result = jmsSource
    .take(msgsIn.size())
    .map(env -> {
        env.acknowledge();
        return env.message();
    })
    .runWith(Sink.seq(), materializer);

Notes:

  • Using multiple sessions increases throughput, especially if a acknowledging message by message is desired.
  • Messages may arrive out of order if sessionCount is larger than 1.
  • Message-by-message acknowledgement can be achieved by setting bufferSize to 0, thus disabling buffering. The outstanding messages before backpressure will be the sessionCount.
  • The default AcknowledgeMode is ClientAcknowledge but can be overridden to custom AcknowledgeModes, even implementation-specific ones by setting the AcknowledgeMode in the JmsConsumerSettings when creating the stream.

Transactionally receiving javax.jms.Messages from a JMS provider

Create a javax.jms.Message source:

Scala
val jmsSource: Source[TxEnvelope, KillSwitch] = JmsConsumer.txSource(
  JmsConsumerSettings(connectionFactory).withSessionCount(5).withQueue("numbers")
)
Java
Source<TxEnvelope, KillSwitch> jmsSource = JmsConsumer.txSource(JmsConsumerSettings
        .create(connectionFactory)
        .withSessionCount(5)
        .withQueue("test")
);
Source<TxEnvelope, KillSwitch> jmsSource = JmsConsumer.txSource(JmsConsumerSettings
        .create(connectionFactory)
        .withSessionCount(5)
        .withQueue("test")
);

The sessionCount parameter controls the number of JMS sessions to run in parallel.

The bufferSize parameter controls the maximum number of messages each JMS session will prefetch and awaiting acknowledgement before applying backpressure.

Run the source and specify the amount of messages to take:

Scala
val result = jmsSource
  .take(msgsIn.size)
  .map { env =>
    env.commit()
    env.message
  }
  .runWith(Sink.seq)
Java
CompletionStage<List<Message>> result = jmsSource
        .take(msgsIn.size())
        .map(env -> { env.commit(); return env.message(); })
        .runWith(Sink.seq(), materializer);
CompletionStage<List<Message>> result = jmsSource
        .take(msgsIn.size())
        .map(env -> { env.commit(); return env.message(); })
        .runWith(Sink.seq(), materializer);

Notes:

  • Higher throughput is achieved by increasing the sessionCount.
  • Messages will arrive out of order if sessionCount is larger than 1.
  • Buffering is not supported in transaction mode. The bufferSize is ignored.
  • The default AcknowledgeMode is SessionTransacted but can be overridden to custom AcknowledgeModes, even implementation-specific ones by setting the AcknowledgeMode in the JmsConsumerSettings when creating the stream.

Browsing messages from a JMS provider

The browse source will stream the messages in a queue without consuming them.

Create a source:

Scala
val browseSource: Source[Message, NotUsed] = JmsConsumer.browse(
  JmsBrowseSettings(connectionFactory).withQueue("test")
)
Java
Source<Message, NotUsed> browseSource = JmsConsumer.browse(
        JmsBrowseSettings
                .create(connectionFactory)
                .withQueue("test")
);

The messageSelector parameter can be used to filter the messages. Otherwise it will browse the entire content of the queue.

Unlike the other sources, the browse source will complete after browsing all the messages:

Scala
val result = browseSource.runWith(Sink.seq)
Java
CompletionStage<List<Message>> result = browseSource
        .runWith(Sink.seq(), materializer);

Notes:

  • Messages may be arriving and expiring while the scan is done.
  • The JMS API does not require the content of an enumeration to be a static snapshot of queue content. Whether these changes are visible or not depends on the JMS provider.
  • A message must not be returned by a QueueBrowser before its delivery time has been reached.

Using Topic with an JMS provider

You can use JMS topic in a very similar way.

For the Sink :

Scala
val jmsTopicSink: Sink[String, Future[Done]] = JmsProducer.textSink(
  JmsProducerSettings(connectionFactory).withTopic("topic")
)
Java
Sink<String, CompletionStage<Done>> jmsTopicSink = JmsProducer
    .textSink(
        JmsProducerSettings
            .create(connectionFactory)
            .withTopic("topic")
    );

For the source :

Scala
val jmsTopicSource: Source[String, KillSwitch] = JmsConsumer.textSource(
  JmsConsumerSettings(connectionFactory).withBufferSize(10).withTopic("topic")
)
Java
Source<String, KillSwitch> jmsTopicSource = JmsConsumer
    .textSource(
        JmsConsumerSettings
            .create(connectionFactory)
            .withTopic("topic")
            .withBufferSize(10)
    );

Such sink and source can be started the same way as in the previous example.

Notes:

  • Explicit acknowledgement sources and transactional sources work with topics the same way they work with queues.
  • DO NOT set the sessionCount greater than 1 for topics. Doing so will result in duplicate messages being delivered. Each topic message is delivered to each JMS session and all the messages feed to the same Source. JMS 2.0 created shared consumers to solve this problem and multiple sessions without duplication may be supported in the future.

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> jms/testOnly *.JmsConnectorsSpec
Java
sbt
> jms/testOnly *.JmsConnectorsTest

Stopping a JMS Source

All JMS sources materialize to a KillSwitch to allow safely stopping consumption without message loss for transactional and acknowledged messages, and with minimal message loss for the simple JMS source.

To stop consumption safely, call shutdown() on the KillSwitch that is the materialized value of the source. To abruptly abort consumption (without concerns for message loss), call abort(Throwable) on the KillSwitch.

Using IBM MQ

You can use IBM MQ like any other JMS Provider by creating a QueueConnectionFactory or a TopicConnectionFactory and creating a JmsConsumerSettings or JmsProducerSettings from it. The below snippets have been tested with a default IBM MQ docker image which contains queues and topics for testing. The following command starts MQ 9 using docker:

docker run --env LICENSE=accept --env MQ_QMGR_NAME=QM1 --publish 1414:1414 --publish 9443:9443 ibmcom/mq:9

MQ settings for this image are shown here: https://github.com/ibm-messaging/mq-docker#mq-developer-defaults

Create a JmsConsumer to an IBM MQ Queue

The MQQueueConnectionFactory needs a queue manager name and a channel name, the docker command used in the previous section sets up a QM1 queue manager and a DEV.APP.SVRCONN channel. The IBM MQ client makes it possible to connect to the MQ server over TCP/IP or natively through JNI (when the client and server run on the same machine). In the examples below we have chosen to use TCP/IP, which is done by setting the transport type to CommonConstants.WMQ_CM_CLIENT.

Scala
``` import com.ibm.mq.jms.MQQueueConnectionFactory import com.ibm.msg.client.wmq.common.CommonConstants val QueueManagerName = “QM1” val TestChannelName = “DEV.APP.SVRCONN” // Create the IBM MQ QueueConnectionFactory val queueConnectionFactory = new MQQueueConnectionFactory() queueConnectionFactory.setQueueManager(QueueManagerName) queueConnectionFactory.setChannel(TestChannelName) // Connect to IBM MQ over TCP/IP queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT) val TestQueueName = “DEV.QUEUE.1” // Option1: create Source using default factory with just name val jmsSource: Source[String, NotUsed] = JmsConsumer.textSource( JmsConsumerSettings(queueConnectionFactory).withQueue(TestQueueName) ) // Option2: create Source using custom factory private def createMqQueue(destinationName: String): Session => MQQueue = { session => … }
val jmsSource: Source[String, NotUsed] = JmsConsumer.textSource( JmsConsumerSettings(queueConnectionFactory) .withDestination(CustomDestination(TestQueueName, createMqQueue(TestQueueName))) )

```

Java
``` import com.ibm.mq.jms.MQQueueConnectionFactory; import com.ibm.msg.client.wmq.common.CommonConstants; String queueManagerName = “QM1”; String testChannelName = “DEV.APP.SVRCONN”; // Create the IBM MQ QueueConnectionFactory MQQueueConnectionFactory queueConnectionFactory = new MQQueueConnectionFactory(); queueConnectionFactory.setQueueManager(queueManagerName); queueConnectionFactory.setChannel(testChannelName); // Connect to IBM MQ over TCP/IP queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT); String testQueueName = “DEV.QUEUE.1”; // Option1: create Source using default factory with just name Source<String, NotUsed> jmsSource = JmsConsumer.textSource( JmsConsumerSettings .create(queueConnectionFactory) .withQueue(testQueueName) ); // Option2: create Source using custom factory private Function1<Session, Destination> createMqQueue(String destinationName) { return (session) -> { … }; }
Source<String, NotUsed> jmsSource = JmsConsumer.textSource( JmsConsumerSettings .create(queueConnectionFactory) .withDestination(new CustomDestination(testQueueName,createMqQueue(testQueueName))) );

```

Create a JmsProducer to an IBM MQ Topic

The IBM MQ docker container sets up a dev/ topic, which is used in the example below.

Scala
import com.ibm.mq.jms.MQTopicConnectionFactory
import com.ibm.msg.client.wmq.common.CommonConstants
val QueueManagerName = "QM1"
val TestChannelName = "DEV.APP.SVRCONN"
// Create the IBM MQ TopicConnectionFactory
val topicConnectionFactory = new MQTopicConnectionFactory()
topicConnectionFactory.setQueueManager(QueueManagerName)
topicConnectionFactory.setChannel(TestChannelName)
// Connect to IBM MQ over TCP/IP
topicConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT)
val TestTopicName = "dev/"
// Option1: create Sink using default factory with just name
val jmsTopicSink: Sink[String, NotUsed] = JmsProducer(
  JmsProducerSettings(topicConnectionFactory).withTopic(TestTopicName)
)
// Option2: create Sink using custom factory
private def createMqTopic(destinationName: String): Session => MQTopic = { session =>
    ...
}    
val jmsTopicSink: Sink[String, NotUsed] = JmsProducer(
  JmsProducerSettings(topicConnectionFactory)
    .withDestination(CustomDestination(TestTopicName, createMqTopic(TestTopicName)))
)    
Java
import com.ibm.mq.jms.MQTopicConnectionFactory;
import com.ibm.msg.client.wmq.common.CommonConstants;
String queueManagerName = "QM1";
String testChannelName = "DEV.APP.SVRCONN";
// Create the IBM MQ TopicConnectionFactory
val topicConnectionFactory = new MQTopicConnectionFactory();
topicConnectionFactory.setQueueManager(queueManagerName);
topicConnectionFactory.setChannel(testChannelName);
// Connect to IBM MQ over TCP/IP
topicConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);
String testTopicName = "dev/";
 // Option1: create Source using default factory with just name
Sink<String, NotUsed> jmsTopicSink = JmsProducer.textSink(
  JmsProducerSettings
    .create(topicConnectionFactory)
    .withTopic(testTopicName)
);
// Option2: create Source using custom factory 
private Function1<Session, Destination> createMqTopic(String destinationName) {
    return (session) -> {
        ...
    };
}    
Sink<String, NotUsed> jmsTopicSink = JmsProducer.textSink(
  JmsProducerSettings
    .create(queueConnectionFactory)
    .withDestination(new CustomDestination(testTopicName, createMqTopic(testTopicName)))
);    
The source code for this page can be found here.