JMS examples

Example: Read text messages from JMS queue and append to file

  • listens to the JMS queue “test” receiving Strings (1),
  • converts incoming data to akka.util.ByteString (3),
  • and appends the data to the file target/out (2).
Scala
import java.nio.file.Paths

import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.JmsConsumer
import akka.stream.scaladsl.{FileIO, Keep, Sink, Source}
import akka.stream.{IOResult, KillSwitch}
import akka.util.ByteString

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

  val jmsSource: Source[String, KillSwitch] =        // (1)
    JmsConsumer.textSource(
      JmsConsumerSettings(connectionFactory).withBufferSize(10).withQueue("test")
    )

  val fileSink: Sink[ByteString, Future[IOResult]] = // (2)
    FileIO.toPath(Paths.get("target/out.txt"))

  val (runningSource, finished): (KillSwitch, Future[IOResult]) =
                                                     // stream element type
    jmsSource                                        //: String
      .map(ByteString(_))                            //: ByteString    (3)
      .toMat(fileSink)(Keep.both)
      .run()

Full source

Example: Read text messages from JMS queue and create one file per message

  • listens to the JMS queue “test” receiving Strings (1),
  • converts incoming data to akka.util.ByteString (2),
  • combines the incoming data with a counter (3),
  • creates an intermediary stream writing the incoming data to a file using the counter value to create unique file names (4).
Scala
import java.nio.file.Paths

import akka.stream.KillSwitch
import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.JmsConsumer
import akka.stream.scaladsl.{FileIO, Keep, Sink, Source}
import akka.util.ByteString

import scala.concurrent.duration.DurationInt

  val jmsSource: Source[String, KillSwitch] =                                  // (1)
    JmsConsumer.textSource(
      JmsConsumerSettings(connectionFactory).withBufferSize(10).withQueue("test")
    )
                                                            // stream element type
  val runningSource = jmsSource                             //: String
    .map(ByteString(_))                                     //: ByteString        (2)
    .zip(Source.fromIterator(() => Iterator.from(0)))       //: (ByteString, Int) (3)
    .mapAsyncUnordered(parallelism = 5) { case (byteStr, number) =>
      Source                                                //                    (4)
        .single(byteStr)
        .runWith(FileIO.toPath(Paths.get(s"target/out-$number.txt")))
    }                                                       //: IoResult
    .toMat(Sink.ignore)(Keep.left)
    .run()

Full source

Example: Read text messages from JMS queue and send to web server

  • listens to the JMS queue “test” receiving Strings (1),
  • converts incoming data to akka.util.ByteString (2),
  • puts the received text into an HttpRequest (3),
  • sends the created request via Akka Http (4),
  • prints the HttpResponse to standard out (5).
Scala
import akka.Done
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.KillSwitch
import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.JmsConsumer
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

  val jmsSource: Source[String, KillSwitch] =                                 // (1)
    JmsConsumer.textSource(
      JmsConsumerSettings(connectionFactory).withBufferSize(10).withQueue("test")
    )

  val (runningSource, finished): (KillSwitch, Future[Done]) =
    jmsSource                                                   //: String
      .map(ByteString(_))                                       //: ByteString   (2)
      .map { bs =>
        HttpRequest(uri = Uri("http://localhost:8080/hello"),   //: HttpRequest  (3)
          entity = HttpEntity(bs))
      }
      .mapAsyncUnordered(4)(Http().singleRequest(_))            //: HttpResponse (4)
      .toMat(Sink.foreach(println))(Keep.both)                  //               (5)
      .run()

Full source

Example: Read text messages from JMS queue and send to web socket

  • listens to the JMS queue “test” receiving Strings (1),
  • configures a web socket flow to localhost (2),
  • converts incoming data to a ws.TextMessage (3),
  • pass the message via the web socket flow (4),
  • convert the (potentially chunked) web socket reply to a String (5),
  • prefix the String (6),
  • end the stream by writing the values to standard out (7).
Scala
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws.{WebSocketRequest, WebSocketUpgradeResponse}
import akka.stream.KillSwitch
import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.JmsConsumer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

import scala.collection.immutable.Seq
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

  val jmsSource: Source[String, KillSwitch] =
    JmsConsumer.textSource(                                                           // (1)
      JmsConsumerSettings(connectionFactory).withBufferSize(10).withQueue("test")
    )

  val webSocketFlow: Flow[ws.Message, ws.Message, Future[WebSocketUpgradeResponse]] = // (2)
    Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080/webSocket/ping"))

  val (runningSource, wsUpgradeResponse): (KillSwitch, Future[WebSocketUpgradeResponse]) =
                                                     // stream element type
    jmsSource                                        //: String
      .map(ws.TextMessage(_))                        //: ws.TextMessage                  (3)
      .viaMat(webSocketFlow)(Keep.both)              //: ws.TextMessage                  (4)
      .mapAsync(1)(wsMessageToString)                //: String                          (5)
      .map("client received: " + _)                  //: String                          (6)
      .toMat(Sink.foreach(println))(Keep.left)       //                                  (7)
      .run()

Full source

Running the example code

This example is contained in a stand-alone runnable main, it can be run from sbt like this:

Scala
sbt
> doc-examples/run
The source code for this page can be found here.