MQTT Connector

The MQTT connector provides an Akka Stream source, sink and flow to connect to MQTT servers. It is based on Eclipse Paho.

Reported issues

Tagged issues at Github

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-mqtt" % "0.19"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-mqtt_2.12</artifactId>
  <version>0.19</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-mqtt_2.12', version: '0.19'
}

Setup

Use delayed stream restarts

Note that the following examples do not provide any connection management and are designed to get you going quickly. Consider empty client IDs to auto-generate unique identifiers and the use of delayed stream restarts. The underlying Paho library’s auto-reconnect feature does not handle initial connections by design.

First we need to define various settings, that are required when connecting to an MQTT server.

Scala
val connectionSettings = MqttConnectionSettings(
  "tcp://localhost:1883",
  "test-scala-client",
  new MemoryPersistence
)
Java
final MqttConnectionSettings connectionSettings = MqttConnectionSettings.create(
  "tcp://localhost:1883",
  "test-java-client",
  new MemoryPersistence()
);

Here we used MqttConnectionSettings factory to set the address of the server, client ID, which needs to be unique for every client, and client persistence implementation (MemoryPersistence) which allows to control reliability guarantees.

Most settings are passed on to Paho’s MqttConnectOptions and documented there.

Reading from MQTT

Then let’s create a source that is going to connect to the MQTT server upon materialization and receive messages that are sent to the subscribed topics.

Scala
val settings = MqttSourceSettings(
  sourceSettings,
  Map(topic1 -> MqttQoS.AtLeastOnce, topic2 -> MqttQoS.AtLeastOnce)
)

val mqttSource = MqttSource.atMostOnce(settings, bufferSize = 8)
Java
final MqttSourceSettings settings = MqttSourceSettings
  .create(connectionSettings.withClientId("source-test/source"))
  .withSubscriptions(
    Pair.create(topic1, MqttQoS.atMostOnce()),
    Pair.create(topic2, MqttQoS.atMostOnce())
  );

final Integer bufferSize = 8;
final Source<MqttMessage, CompletionStage<Done>> mqttSource = MqttSource.create(settings, bufferSize);

And finally run the source.

Scala
val (subscribed, result) = mqttSource
  .take(messages.size)
  .toMat(Sink.seq)(Keep.both)
  .run()
Java
final Pair<CompletionStage<Done>, CompletionStage<List<String>>> result = mqttSource
  .map(m -> m.topic() + "-" + m.payload().utf8String())
  .take(messageCount * 2)
  .toMat(Sink.seq(), Keep.both())
  .run(materializer);

This source has a materialized value (Future in Scala API and CompletionStage in Java API) which is completed when the subscription to the MQTT broker has been completed.

MQTT automatically acknowledges messages back to the server once they are passed downstream. The atLeastOnce source allow users to acknowledge the messages anywhere downstream. Please note that for manual acks to work CleanSession should be set to false and MqttQoS should be AtLeastOnce.

Scala
val sourceConnectionSettings = connectionSettings
  .withClientId(clientId = "source-spec/source1")
  .withCleanSession(false)
val mqttSourceSettings = MqttSourceSettings(sourceConnectionSettings, Map(topic -> MqttQoS.AtLeastOnce))

val mqttSource: Source[MqttCommittableMessage, Future[Done]] =
  MqttSource.atLeastOnce(mqttSourceSettings, 8)
val connectionSettings = sourceSettings.withCleanSession(false)
val mqttSourceSettings = MqttSourceSettings(connectionSettings, Map(topic -> MqttQoS.AtLeastOnce))
val mqttSource = MqttSource.atLeastOnce(mqttSourceSettings, 8)
Java
MqttConnectionSettings connectionSettings = sourceSettings.withCleanSession(false);
@SuppressWarnings("unchecked")
MqttSourceSettings mqttSourceSettings = MqttSourceSettings.create(connectionSettings)
  .withSubscriptions(Pair.create(topic, MqttQoS.atLeastOnce()));
final Source<MqttCommittableMessage, CompletionStage<Done>> mqttSource = MqttSource.atLeastOnce(mqttSourceSettings, 8);

The atLeastOnce source returns MqttCommittableMessage so you can acknowledge them by calling messageArrivedComplete.

Scala
val result = mqttSource
  .mapAsync(1)(cm => cm.messageArrivedComplete().map(_ => cm.message))
  .take(input.size)
  .runWith(Sink.seq)
Java
final CompletionStage<List<MqttMessage>> result = mqttSource
  .mapAsync(1, cm -> cm.messageArrivedComplete().thenApply(unused2 -> cm.message()))
  .take(input.size())
  .runWith(Sink.seq(), materializer);

Publishing to MQTT

To publish messages to the MQTT server create a sink and run it.

Scala
Source(messages).runWith(MqttSink(connectionSettings, MqttQoS.AtLeastOnce))
Java
Sink<MqttMessage, CompletionStage<Done>> mqttSink = MqttSink.create(
  connectionSettings.withClientId("source-test/sink"),
  MqttQoS.atLeastOnce()
);
Source.from(messages).runWith(mqttSink, materializer);

The QoS and the retained flag can be configured on a per-message basis.

Scala
val lastWill = MqttMessage(willTopic, ByteString("ohi"), Some(MqttQoS.AtLeastOnce), retained = true)
Java
MqttMessage lastWill = MqttMessage.create(
        willTopic,
        ByteString.fromString("ohi"),
        MqttQoS.atLeastOnce(),
        true);

It is also possible to connect to the MQTT server in bidirectional fashion, using a single underlying connection (and client ID). To do that create an MQTT flow that combines the functionalities of an MQTT source and an MQTT sink.

Scala
val mqttFlow = MqttFlow(settings, 8, MqttQoS.atLeastOnce)
Java
final Flow<MqttMessage, MqttMessage, CompletionStage<Done>> mqttFlow =
  MqttFlow.create(settings, 8, MqttQoS.atLeastOnce());

Run the flow by connecting a source of messages to be published and a sink for received messages.

Scala
val ((mqttMessagePromise, subscribed), result) = source
  .viaMat(mqttFlow)(Keep.both)
  .toMat(Sink.seq)(Keep.both)
  .run()
Java
final Pair<Pair<CompletableFuture<Optional<MqttMessage>>, CompletionStage<Done>>, CompletionStage<List<MqttMessage>>> result =
  source
    .viaMat(mqttFlow, Keep.both())
    .toMat(Sink.seq(), Keep.both())
    .run(materializer);

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Test code requires a MQTT server running in the background. You can start one quickly using docker:

docker-compose up mqtt

Scala
sbt
> mqtt/testOnly *.MqttSourceSpec
Java
sbt
> mqtt/testOnly *.MqttSourceTest
The source code for this page can be found here.