JMS

Example: Read text messages from JMS queue and append to file

  • listens to the JMS queue “test” receiving Strings (1),
  • converts incoming data to akka.util.ByteString (3),
  • and appends the data to the file target/out (2).
Scala
import java.nio.file.Paths

import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.JmsConsumer
import akka.stream.scaladsl.{FileIO, Keep, Sink, Source}
import akka.stream.{IOResult, KillSwitch}
import akka.util.ByteString

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

val jmsSource: Source[String, KillSwitch] =        // (1)
  JmsConsumer.textSource(
    JmsConsumerSettings(connectionFactory).withBufferSize(10).withQueue("test")
  )

val fileSink: Sink[ByteString, Future[IOResult]] = // (2)
  FileIO.toPath(Paths.get("target/out.txt"))

val (runningSource, finished): (KillSwitch, Future[IOResult]) =
                                                   // stream element type
  jmsSource                                        //: String
    .map(ByteString(_))                            //: ByteString    (3)
    .toMat(fileSink)(Keep.both)
    .run()
Full source at GitHub
Java
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.IOResult;
import akka.stream.KillSwitch;
import akka.stream.Materializer;
import akka.stream.alpakka.jms.JmsConsumerSettings;
import akka.stream.alpakka.jms.JmsProducerSettings;
import akka.stream.alpakka.jms.javadsl.JmsConsumer;
import akka.stream.alpakka.jms.javadsl.JmsProducer;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.ByteString;

import java.nio.file.Paths;
import java.util.concurrent.CompletionStage;


Source<String, KillSwitch> jmsSource = // (1)
    JmsConsumer.textSource(
        JmsConsumerSettings.create(connectionFactory).withBufferSize(10).withQueue("test"));

Sink<ByteString, CompletionStage<IOResult>> fileSink =
    FileIO.toPath(Paths.get("target/out.txt")); // (2)

Pair<KillSwitch, CompletionStage<IOResult>> pair =
    jmsSource // : String
        .map(ByteString::fromString) // : ByteString    (3)
        .toMat(fileSink, Keep.both())
        .run(materializer);
Full source at GitHub

Example: Read text messages from JMS queue and create one file per message

  • listens to the JMS queue “test” receiving Strings (1),
  • converts incoming data to akka.util.ByteString (2),
  • combines the incoming data with a counter (3),
  • creates an intermediary stream writing the incoming data to a file using the counter value to create unique file names (4).
Scala
import java.nio.file.Paths

import akka.stream.KillSwitch
import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.JmsConsumer
import akka.stream.scaladsl.{FileIO, Keep, Sink, Source}
import akka.util.ByteString

import scala.concurrent.duration.DurationInt

val jmsSource: Source[String, KillSwitch] =                                   // (1)
  JmsConsumer.textSource(
    JmsConsumerSettings(connectionFactory).withBufferSize(10).withQueue("test")
  )
                                                          // stream element type
val runningSource = jmsSource                             //: String
  .map(ByteString(_))                                     //: ByteString         (2)
  .zipWithIndex                                           //: (ByteString, Long) (3)
  .mapAsyncUnordered(parallelism = 5) { case (byteStr, number) =>
    Source                                                //                     (4)
      .single(byteStr)
      .runWith(FileIO.toPath(Paths.get(s"target/out-$number.txt")))
  }                                                       //: IoResult
  .toMat(Sink.ignore)(Keep.left)
  .run()
Full source at GitHub
Java
import akka.Done;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.KillSwitch;
import akka.stream.Materializer;
import akka.stream.alpakka.jms.JmsConsumerSettings;
import akka.stream.alpakka.jms.JmsProducerSettings;
import akka.stream.alpakka.jms.javadsl.JmsConsumer;
import akka.stream.alpakka.jms.javadsl.JmsProducer;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.ByteString;

import java.nio.file.Paths;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;


Source<String, KillSwitch> jmsConsumer = // (1)
    JmsConsumer.textSource(
        JmsConsumerSettings.create(connectionFactory).withBufferSize(10).withQueue("test"));

int parallelism = 5;
Pair<KillSwitch, CompletionStage<Done>> pair =
    jmsConsumer // : String
        .map(ByteString::fromString) // : ByteString             (2)
        .zipWithIndex() // : Pair<ByteString, Long> (3)
        .mapAsyncUnordered(
            parallelism,
            (in) -> {
              ByteString byteString = in.first();
              Long number = in.second();
              return Source // (4)
                  .single(byteString)
                  .runWith(
                      FileIO.toPath(Paths.get("target/out-" + number + ".txt")), materializer);
            }) // : IoResult
        .toMat(Sink.ignore(), Keep.both())
        .run(materializer);
Full source at GitHub

Example: Read text messages from JMS queue and send to web server

  • listens to the JMS queue “test” receiving Strings (1),
  • converts incoming data to akka.util.ByteString (2),
  • puts the received text into an HttpRequest (3),
  • sends the created request via Akka Http (4),
  • prints the HttpResponse to standard out (5).
Scala
import akka.Done
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.KillSwitch
import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.JmsConsumer
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

val jmsSource: Source[String, KillSwitch] =                                 // (1)
  JmsConsumer.textSource(
    JmsConsumerSettings(connectionFactory).withBufferSize(10).withQueue("test")
  )

val (runningSource, finished): (KillSwitch, Future[Done]) =
  jmsSource                                                   //: String
    .map(ByteString(_))                                       //: ByteString   (2)
    .map { bs =>
      HttpRequest(uri = Uri("http://localhost:8080/hello"),   //: HttpRequest  (3)
        entity = HttpEntity(bs))
    }
    .mapAsyncUnordered(4)(Http().singleRequest(_))            //: HttpResponse (4)
    .toMat(Sink.foreach(println))(Keep.both)                  //               (5)
    .run()
Full source at GitHub
Java
import akka.Done;
import akka.actor.ActorSystem;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.HttpRequest;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.KillSwitch;
import akka.stream.Materializer;
import akka.stream.alpakka.jms.JmsConsumerSettings;
import akka.stream.alpakka.jms.JmsProducerSettings;
import akka.stream.alpakka.jms.javadsl.JmsConsumer;
import akka.stream.alpakka.jms.javadsl.JmsProducer;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.ByteString;


final Http http = Http.get(system);

Source<String, KillSwitch> jmsSource = // (1)
    JmsConsumer.textSource(
        JmsConsumerSettings.create(connectionFactory).withBufferSize(10).withQueue("test"));

int parallelism = 4;
Pair<KillSwitch, CompletionStage<Done>> pair =
    jmsSource // : String
        .map(ByteString::fromString) // : ByteString   (2)
        .map(
            bs ->
                HttpRequest.create("http://localhost:8080/hello")
                    .withEntity(bs)) // : HttpRequest  (3)
        .mapAsyncUnordered(parallelism, http::singleRequest) // : HttpResponse (4)
        .toMat(Sink.foreach(System.out::println), Keep.both()) //               (5)
        .run(materializer);
Full source at GitHub

Example: Read text messages from JMS queue and send to web socket

  • listens to the JMS queue “test” receiving Strings (1),
  • configures a web socket flow to localhost (2),
  • converts incoming data to a ws.TextMessageakka.http.javadsl.model.ws.TextMessage,
  • pass the message via the web socket flow (4),
  • convert the (potentially chunked) web socket reply to a String (5),
  • prefix the String (6),
  • end the stream by writing the values to standard out (7).
Scala
import akka.Done
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws.{WebSocketRequest, WebSocketUpgradeResponse}

import akka.stream.KillSwitch
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.JmsConsumer

import scala.concurrent.Future

val jmsSource: Source[String, KillSwitch] =
  JmsConsumer.textSource(                                                           // (1)
    JmsConsumerSettings(connectionFactory).withBufferSize(10).withQueue("test")
  )

val webSocketFlow: Flow[ws.Message, ws.Message, Future[WebSocketUpgradeResponse]] = // (2)
  Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080/webSocket/ping"))

val ((runningSource, wsUpgradeResponse), streamCompletion): ((KillSwitch, Future[WebSocketUpgradeResponse]), Future[Done]) =
                                                   // stream element type
  jmsSource                                        //: String
    .map(ws.TextMessage(_))                        //: ws.TextMessage                  (3)
    .viaMat(webSocketFlow)(Keep.both)              //: ws.TextMessage                  (4)
    .mapAsync(1)(wsMessageToString)                //: String                          (5)
    .map("client received: " + _)                  //: String                          (6)
    .toMat(Sink.foreach(println))(Keep.both)       //                                  (7)
    .run()

/**
 * Convert potentially chunked WebSocket Message to a string.
 */
def wsMessageToString: ws.Message => Future[String] = {
  case message: ws.TextMessage.Strict =>
    Future.successful(message.text)

  case message: ws.TextMessage.Streamed =>
    val seq = message.textStream.runWith(Sink.seq)
    seq.map(seq => seq.mkString)

  case message =>
    Future.successful(message.toString)
}
Full source at GitHub
Java
import akka.Done;
import akka.actor.ActorSystem;
import akka.japi.Pair;

import akka.http.javadsl.Http;
import akka.http.javadsl.model.StatusCodes;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.TextMessage;
import akka.http.javadsl.model.ws.WebSocketRequest;
import akka.http.javadsl.model.ws.WebSocketUpgradeResponse;

import akka.stream.ActorMaterializer;
import akka.stream.KillSwitch;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import akka.stream.alpakka.jms.JmsConsumerSettings;
import akka.stream.alpakka.jms.JmsProducerSettings;
import akka.stream.alpakka.jms.javadsl.JmsConsumer;
import akka.stream.alpakka.jms.javadsl.JmsProducer;

import java.util.concurrent.CompletionStage;

final Http http = Http.get(system);

Source<String, KillSwitch> jmsSource = // (1)
    JmsConsumer.textSource(
        JmsConsumerSettings.create(connectionFactory).withBufferSize(10).withQueue("test"));

Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>> webSocketFlow = // (2)
    http.webSocketClientFlow(WebSocketRequest.create("ws://localhost:8080/webSocket/ping"));

int parallelism = 4;
Pair<Pair<KillSwitch, CompletionStage<WebSocketUpgradeResponse>>, CompletionStage<Done>> pair =
    jmsSource // : String
        .map(
            s -> {
              Message msg = TextMessage.create(s);
              return msg;
            }) // : Message           (3)
        .viaMat(webSocketFlow, Keep.both()) // : Message           (4)
        .mapAsync(parallelism, this::wsMessageToString) // : String            (5)
        .map(s -> "client received: " + s) // : String            (6)
        .toMat(Sink.foreach(System.out::println), Keep.both()) //                    (7)
        .run(materializer);

/** Convert potentially chunked WebSocket Message to a string. */
private CompletionStage<String> wsMessageToString(Message msg) {
  if (msg.isText()) {
    TextMessage tMsg = msg.asTextMessage();
    if (tMsg.isStrict()) {
      return CompletableFuture.completedFuture(tMsg.getStrictText());
    } else {
      CompletionStage<List<String>> strings =
          tMsg.getStreamedText().runWith(Sink.seq(), materializer);
      return strings.thenApply(list -> String.join("", list));
    }
  } else {
    return CompletableFuture.completedFuture(msg.toString());
  }
}
Full source at GitHub

Running the example code

This example is contained in a stand-alone runnable main, it can be run from sbt like this:

Scala
sbt
> doc-examples/run
The source code for this page can be found here.