MQTT v5

MQTT

MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks. The design principles are to minimise network bandwidth and device resource requirements whilst also attempting to ensure reliability and some degree of assurance of delivery. These principles also turn out to make the protocol ideal of the emerging “machine-to-machine” (M2M) or “Internet of Things” world of connected devices, and for mobile applications where bandwidth and battery power are at a premium.

Further information on mqtt.org.

The Alpakka MQTT v5 connector provides an Akka Stream source, sink and flow to connect to MQTT brokers. It is based on the Eclipse Paho Java client.

Project Info: Alpakka MQTT v5
Artifact
com.lightbend.akka
akka-stream-alpakka-mqttv5
2.0.0
JDK versions
Adopt OpenJDK 11
Scala versions2.13.11
JPMS module nameakka.stream.alpakka.mqttv5
License
Readiness level
Since 2.0, 2023-09-04
Home pagehttps://doc.akka.io/docs/akka-enhancements/current/index.html
API documentation
Forums
IssuesIssue tracker
Sourceshttps://github.com/lightbend/akka-enhancements

Artifacts

These dependencies are available on request from Lightbend, please contact your sales representative.

Additionally, add the dependencies as below.

sbt
val AkkaVersion = "2.8.4"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-mqttv5" % "2.0.0",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion
)
Maven
<properties>
  <akka.version>2.8.4</akka.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-mqttv5_${scala.binary.version}</artifactId>
    <version>2.0.0</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.8.4",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-mqttv5_${versions.ScalaBinary}:2.0.0"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.typesafe.akkaakka-stream_2.132.8.4
org.eclipse.pahoorg.eclipse.paho.mqttv5.client1.2.5
org.scala-langscala-library2.13.11
Dependency tree
com.typesafe.akka    akka-stream_2.13    2.8.4    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.8.4    BUSL-1.1
        com.typesafe    config    1.4.2    Apache-2.0
        org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
            org.scala-lang    scala-library    2.13.11    Apache-2.0
        org.scala-lang    scala-library    2.13.11    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.8.4    BUSL-1.1
    com.typesafe    ssl-config-core_2.13    0.6.1    Apache-2.0
        com.typesafe    config    1.4.2    Apache-2.0
        org.scala-lang    scala-library    2.13.11    Apache-2.0
    org.reactivestreams    reactive-streams    1.0.4    MIT-0
    org.scala-lang    scala-library    2.13.11    Apache-2.0
org.eclipse.paho    org.eclipse.paho.mqttv5.client    1.2.5
org.scala-lang    scala-library    2.13.11    Apache-2.0

Settings

The required MqttConnectionSettings (API) settings to connect to an MQTT server are

  1. the MQTT broker address
  2. a unique ID for the client (setting it to the empty string should let the MQTT broker assign it, but not all do; you might want to generate it)
  3. the MQTT client persistence to use (e.g. org.eclipse.paho.client.mqttv5.persist.MemoryPersistence) which allows to control reliability guarantees
Scala
sourceimport akka.stream.alpakka.mqttv5._
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence

val connectionSettings = MqttConnectionSettings(
  "tcp://localhost:1883", // (1)
  "test-scala-client", // (2)
  new MemoryPersistence // (3)
)
Java
sourceimport akka.stream.alpakka.mqttv5.*;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;

MqttConnectionSettings connectionSettings =
    MqttConnectionSettings.create(
        "tcp://localhost:1883", // (1)
        "test-java-client", // (2)
        new MemoryPersistence() // (3)
        );

Most settings are passed on to Paho’s org.eclipse.paho.client.mqttv5.MqttConnectOptions.

Use delayed stream restarts

Note that the following examples do not provide any connection management and are designed to get you going quickly. Consider empty client IDs to auto-generate unique identifiers and the use of delayed stream restarts. The underlying Paho library’s auto-reconnect feature does not handle initial connections by design.

Configure encrypted connections

To connect with transport-level security configure the address as ssl://, set authentication details and pass in a socket factory.

Scala
sourceval connectionSettings = MqttConnectionSettings("ssl://localhost:1885", "ssl-client", new MemoryPersistence)
  .withAuth("mqttUser", ByteString("mqttPassword"))
  .withSocketFactory(SSLContext.getDefault.getSocketFactory)
Java
sourceMqttConnectionSettings connectionSettings =
    MqttConnectionSettings.create("ssl://localhost:1885", "ssl-client", new MemoryPersistence())
        .withAuth("mqttUser", ByteString.fromString("mqttPassword"))
        .withSocketFactory(SSLContext.getDefault().getSocketFactory());

Reading from MQTT

At most once

Then let’s create a source that connects to the MQTT server and receives messages from the subscribed topics.

The bufferSize sets the maximum number of messages read from MQTT before back-pressure applies.

Scala
sourceval mqttSource: Source[MqttMessage, Future[Done]] =
  MqttSource.atMostOnce(
    connectionSettings.withClientId(clientId = "source-spec/source"),
    MqttSubscriptions(Map(topic1 -> MqttQoS.AtLeastOnce, topic2 -> MqttQoS.AtLeastOnce)),
    bufferSize = 8)

val (subscribed, received) = mqttSource.take(messages.size).toMat(Sink.seq)(Keep.both).run()
Java
sourceMqttSubscriptions subscriptions =
    MqttSubscriptions.create(topic1, MqttQoS.atMostOnce())
        .addSubscription(topic2, MqttQoS.atMostOnce());

Source<MqttMessage, CompletionStage<Done>> mqttSource =
    MqttSource.atMostOnce(
        connectionSettings.withClientId("source-test/source"), subscriptions, bufferSize);

Pair<CompletionStage<Done>, CompletionStage<List<String>>> materialized =
    mqttSource
        .map(m -> m.topic() + "-" + m.payload().utf8String())
        .take(messageCount * 2)
        .toMat(Sink.seq(), Keep.both())
        .run(system);

CompletionStage<Done> subscribed = materialized.first();
CompletionStage<List<String>> streamResult = materialized.second();

This source has a materialized value (Future[Done]CompletionStage&lt;Done&gt;) which is completed when the subscription to the MQTT broker has been established.

MQTT atMostOnce automatically acknowledges messages back to the server when they are passed downstream.

At least once

The atLeastOnce source allow users to acknowledge the messages anywhere downstream. Please note that for manual acks to work CleanStart should be set to false and MqttQoS should be AtLeastOnce.

The bufferSize sets the maximum number of messages read from MQTT before back-pressure applies.

Scala
sourceval mqttSource: Source[MqttMessageWithAck, Future[Done]] =
  MqttSource.atLeastOnce(
    connectionSettings.withCleanStart(false),
    MqttSubscriptions(topic2, MqttQoS.AtLeastOnce),
    bufferSize = 8)
Java
sourceSource<MqttMessageWithAck, CompletionStage<Done>> mqttSource =
    MqttSource.atLeastOnce(
        connectionSettings
            .withClientId("source-test/source-withoutAutoAck")
            .withCleanStart(false),
        MqttSubscriptions.create(topic, MqttQoS.atLeastOnce()),
        bufferSize);

The atLeastOnce source returns MqttMessageWithAckMqttMessageWithAck so you can acknowledge them by calling ack().

Scala
sourceval (subscribed, received) = mqttSource
  .via(businessLogic)
  .mapAsync(1)(messageWithAck => messageWithAck.ack().map(_ => messageWithAck.message))
  .toMat(Sink.seq)(Keep.both)
  .run()
Java
sourcefinal CompletionStage<List<MqttMessage>> result =
    mqttSource
        .via(businessLogic)
        .mapAsync(
            1,
            messageWithAck ->
                messageWithAck.ack().thenApply(unused2 -> messageWithAck.message()))
        .take(input.size())
        .runWith(Sink.seq(), system);

Publishing to MQTT

To publish messages to the MQTT server create a sink be specifying MqttConnectionSettings (API) and a default Quality of Service-level.

Scala
sourceval sink: Sink[MqttMessage, Future[Done]] =
  MqttSink(connectionSettings, MqttQoS.AtLeastOnce)
Source(messages)
  .runWith(sink)
Java
sourceSink<MqttMessage, CompletionStage<Done>> mqttSink =
    MqttSink.create(connectionSettings.withClientId("source-test/sink"), MqttQoS.atLeastOnce());
Source.from(messages).runWith(mqttSink, system);

The Quality of Service-level and the retained flag can be configured on a per-message basis.

Scala
sourceval lastWill = MqttMessage(willTopic, ByteString("ohi")).withQos(MqttQoS.AtLeastOnce).withRetained(true)
Java
sourceMqttMessage lastWill =
    MqttMessage.create(willTopic, ByteString.fromString("ohi"))
        .withQos(MqttQoS.atLeastOnce())
        .withRetained(true);

Publishing with a pass-through

It is additionally possible to create a publishing-only flow. The flow will emit a “pass-through” element for each successful publish; the pass-through could, for instance, be a Kafka offset to be committed or an ActorRef to which an acknowledgement should be sent after successfully publishing. To associate a pass-through element (for this example, the offset) with a message to be published, the message and pass-through are wrapped in a PublishWithPassThroughPublishWithPassThrough:

Scala
sourceimport akka.stream.alpakka.mqttv5.scaladsl.PublishWithPassThrough

PublishWithPassThrough(mqttMessage, offset)
Java
sourcePublishWithPassThrough<CommittableOffset> toPublish = PublishWithPassThrough.create(message, offset);

Publishing-only flows with pass-through support are available through MqttPublisherfactories in MqttPublisher. The flows support, optionally, providing a default Quality of Service level; if no default is provided, all messages must have a specific Quality of Service set (or the flow will fail). The flows also require declaring a publishing concurrency: they will backpressure if there are that many messages pending.

For messages where the effective (explicitly set per-message or if not explicity set and a default has been provided) Quality of Service is AtMostOnceatMostOnce(), the pass-through will not be emitted before the underlying Paho client reports that it has sent the message over the network. An effective Quality of Service of AtLeastOnceatLeastOnce() will result in no pass-through being emitted before the Paho client reports that the server has accepted the message. The ExactlyOnceexactlyOnce() Quality of Service will result in no pass-through until the server has acknowledged our acknowledgement of its acceptance of the message.

The orderedFlow will preserve the ordering of incoming messages in its emissions. This makes it suitable for pass-throughs which convey a “high-water-mark” in processing, such as Kafka or Akka Persistence offsets or Event Hub checkpoints. This flow is susceptible to head-of-line-blocking if for some reason an earlier message takes longer than later messages and in situations where a message fails to publish successfully, some pass-throughs from messages that did in fact successfully publish might not be seen by the downstream. It is therefore especially suited to applications which are built from the ground-up on at-least-once processing.

Scala
sourceimport akka.stream.alpakka.mqttv5.scaladsl.MqttPublisher

MqttPublisher.orderedFlow[CommittableOffset](connectionSettings, Some(MqttQoS.AtMostOnce), 10)
Java
sourceFlow<PublishWithPassThrough<CommittableOffset>, CommittableOffset, MqttControl> publishingFlow =
    MqttPublisher.orderedFlow(connectionSettings, MqttQoS.atMostOnce(), 10);

The unorderedFlow, conversely does not promise to preserve order, which means it is not susceptible to head-of-line-blocking. If the pass-through elements can be considered completely independently, as might be the case if the pass-through is an ActorRef and a message to send to that actor, this may be the most suitable choice.

Scala
sourceimport akka.stream.alpakka.mqttv5.scaladsl.MqttPublisher

MqttPublisher.unorderedFlow[CommittableOffset](connectionSettings, Some(MqttQoS.AtLeastOnce), 10)
Java
sourceFlow<PublishWithPassThrough<CommittableOffset>, CommittableOffset, MqttControl> publishingFlow =
    MqttPublisher.unorderedFlow(connectionSettings, MqttQoS.atLeastOnce(), 10);

Akka Streams has support for context-propagation alongside stream elements in streams that may drop, but cannot reorder elements. Taking advantage of this support is possible with the “withContext” flows. For example, in situations where the only message-specific information needed downstream is contained in the context (e.g. a Kafka or projection offset from a SourceWithContext), the doneFlowWithContext will emit a Done as pass-through for every message while propagating the context.

Scala
sourceimport akka.stream.alpakka.mqttv5.scaladsl.MqttPublisher

MqttPublisher.doneFlowWithContext[CommittableOffset](connectionSettings, Some(MqttQoS.AtLeastOnce), 10)
Java
sourceFlowWithContext<MqttMessage, CommittableOffset, Done, CommittableOffset, MqttControl> publishingFlow =
    MqttPublisher.doneFlowWithContext(connectionSettings, MqttQoS.atLeastOnce(), 10);

Because FlowWithContext is not allowed to reorder elements, the withContext variants always have the semantics of the orderedFlow.

The publishing-only flows have an MqttControlMqttControl as their materialized value. The control object which is materialized allows one to request that the flow drain() before it shuts down. When draining, the flow will cease demanding new messages to publish but will wait for previously demanded messages to publish (or fail to publish). The FutureCompletionStage returned from drain() will complete when the flow starts to drain, not when the flow eventually completes.

Scala
sourceimport system.dispatcher

val control: MqttControl =
  source
    .map { incomingMessage =>
      PublishWithPassThrough(incomingMessage.messageToPublish, incomingMessage.offset)
    }
    .viaMat(MqttPublisher.orderedFlow(connectionSettings, None, 10))(Keep.right)
    .wireTap { offset => system.log.info("Successfully published to MQTT based on offset {}", offset) }
    .to(offsetCommittingSink)
    .run()

val drainingStarted = control.drain()
drainingStarted.foreach { _ => system.log.info("Stream is draining") }
Java
sourceMqttControl control = source
    .map(incomingMessage ->
            PublishWithPassThrough.create(
                incomingMessage.getMessageToPublish(),
                incomingMessage.getOffset()))
    .viaMat(MqttPublisher.orderedFlow(connectionSettings, 10), Keep.right())
    .wireTap(
            Sink.foreach(offset ->
                log.info("Successfully published to MQTT based on offset {}", offset)))
    .to(offsetCommittingSink)
    .run(system);

CompletionStage<Done> drainingStarted = control.drain();
drainingStarted.thenAcceptAsync(done -> log.info("Stream is draining"), system.dispatcher());

It is also possible to call shutdown() on the control. This will cause the flow to cease demand and complete as soon as possible by ignoring whether in-flight publishing attempts succeed or fail. The FutureCompletionStage returned will complete when the flow disconnects from the MQTT server.

Publish and subscribe in a single flow

It is also possible to connect to the MQTT server in bidirectional fashion, using a single underlying connection (and client ID). To do that create an MQTT flow that combines the functionalities of an MQTT source and an MQTT sink.

The bufferSize sets the maximum number of messages read from MQTT before back-pressure applies.

Scala
sourceval mqttFlow: Flow[MqttMessage, MqttMessage, Future[Done]] =
  MqttFlow.atMostOnce(
    connectionSettings.withClientId("flow-spec/flow"),
    MqttSubscriptions(topic, MqttQoS.AtLeastOnce),
    bufferSize = 8,
    MqttQoS.AtLeastOnce)
Java
sourcefinal Flow<MqttMessage, MqttMessage, CompletionStage<Done>> mqttFlow =
    MqttFlow.atMostOnce(
        connectionSettings,
        MqttSubscriptions.create("flow-test/topic", MqttQoS.atMostOnce()),
        bufferSize,
        MqttQoS.atLeastOnce());

Run the flow by connecting a source of messages to be published and a sink for received messages.

Scala
sourceval ((mqttMessagePromise, subscribed), result) =
  source.viaMat(mqttFlow)(Keep.both).toMat(Sink.seq)(Keep.both).run()
Java
sourcefinal Pair<
        Pair<CompletableFuture<Optional<MqttMessage>>, CompletionStage<Done>>,
        CompletionStage<List<MqttMessage>>>
    materialized =
        source.viaMat(mqttFlow, Keep.both()).toMat(Sink.seq(), Keep.both()).run(system);

CompletableFuture<Optional<MqttMessage>> mqttMessagePromise = materialized.first().first();
CompletionStage<Done> subscribedToMqtt = materialized.first().second();
CompletionStage<List<MqttMessage>> streamResult = materialized.second();

Using flow with Acknowledge on message sent

It is possible to create a flow that receives MqttMessageWithAck instead of MqttMessage. In this case, when the message is successfully sent to the broker, an ack is sent. This flow can be used when the source must be acknowledged only when the message is successfully sent to the destination topic. This provides at-least-once semantics.

The flow emits MqttMessageWithAcks with the message swapped with the new content and keeps the ack function from the original source.

Scala
sourceval mqttFlow: Flow[MqttMessageWithAck, MqttMessageWithAck, Future[Done]] =
  MqttFlow.atLeastOnceWithAck(
    connectionSettings,
    MqttSubscriptions(topic, MqttQoS.AtLeastOnce),
    bufferSize = 8,
    MqttQoS.AtLeastOnce)
Java
sourcefinal Flow<MqttMessageWithAck, MqttMessageWithAck, CompletionStage<Done>> mqttFlow =
    MqttFlow.atLeastOnceWithAck(
        connectionSettings,
        MqttSubscriptions.create("flow-test/topic-ack", MqttQoS.atMostOnce()),
        bufferSize,
        MqttQoS.atLeastOnce());

Run the flow by connecting a source of messages to be published and a sink for received messages. When the message are sent, an ack is called.

Scala
sourceval (subscribed, result) = source.viaMat(mqttFlow)(Keep.right).toMat(Sink.seq)(Keep.both).run()
Java
sourcefinal Pair<Pair<NotUsed, CompletionStage<Done>>, CompletionStage<List<MqttMessageWithAck>>>
    materialized =
        source.viaMat(mqttFlow, Keep.both()).toMat(Sink.seq(), Keep.both()).run(system);

Capturing MQTT client logging

The Paho library uses its own logging adapter and contains a default implementation to use java.util.logging. See Paho/Log and Debug.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.