MongoDB Connector

The MongoDB connector allows you to read and save documents. You can query as a stream of documents from MongoSource or update documents in a collection with MongoSink.

This connector is based off the mongo-scala-driver and does not have a java interface. It supports driver macros and codec allowing to read or write scala case class objects.

Alternative connector

Another MongoDB connector is available.

ReactiveMongo is a Scala driver that provides fully non-blocking and asynchronous I/O operations.

Please read more about it in the ReactiveMongo documentation.

Reported issues

Tagged issues at Github


libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.19"
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-mongodb_2.12', version: '0.19'


Sources provided by this connector need a prepared session to communicate with MongoDB server.

For codec and macros support, you first need to provide a case class and a codecRegistry.

case class Number(_id: Int)
val codecRegistry = fromRegistries(fromProviders(classOf[Number]), DEFAULT_CODEC_REGISTRY)

Then, lets initialize a MongoDB connection.

private val client = MongoClient(s"mongodb://localhost:27017")
private val db = client.getDatabase("alpakka-mongo")
private val numbersColl = db.getCollection("numbers")

For codec support, add the registry to the database or the collection.

private val numbersObjectColl = db.getCollection("numbers").withCodecRegistry(codecRegistry)

We will also need an ActorSystem and an ActorMaterializer.

implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()

This is all preparation that we are going to need.

Source Usage

Let’s create a source from a MongoDB collection observable, which can optionally take a filter.

val source: Source[Document, NotUsed] =

With codec support, adapt the type of the source.

val source: Source[Number, NotUsed] =

And finally we can run it.

val rows: Future[Seq[Document]] = source.runWith(Sink.seq)

With codec support

val rows: Future[Seq[Number]] = source.runWith(Sink.seq)

Here we used a basic sink to complete the stream by collecting all of the stream elements to a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build more advanced stream topologies.

Flow and Sink Usage

Each of these sink factory methods have a corresponding factory in insertOne which will emit the written document or result of the operation downstream.

For codec support, the type must be specified in the database or collection declaration.

private val numbersObjectColl: MongoCollection[Number] = db.getCollection("numbersSink")


We can use a Source of documents to save them to a mongo collection using insertOne or insertMany.

val source: Source[Document, NotUsed] = ???
source.runWith(MongoSink.insertOne(parallelism = 2, collection = numbersColl))

With codec support

val source: Source[Number, NotUsed] = ???
source.runWith(MongoSink.insertOne[Number](parallelism = 2, collection = numbersObjectColl))

Insert Many

Insert many can be used if you have a collection of documents to insert at once.

val source: Source[Seq[Document], NotUsed] = ???
source.runWith(MongoSink.insertMany(parallelism = 2, collection = numbersColl))

With codec support

val source: Source[Seq[Number], NotUsed] = ???
source.runWith(MongoSink.insertMany[Number](parallelism = 2, collection = numbersObjectColl))


We can update documents with a Source of DocumentUpdate which is a filter and a update definition. Use either updateOne or updateMany if the filter should target one or many documents.

import org.mongodb.scala.model.{Filters, Updates}

val source: Source[DocumentUpdate, NotUsed] = Source
  .single(DocumentUpdate(filter = Filters.eq("id", 1), update = Updates.set("updateValue", 0)))

source.runWith(MongoSink.updateOne(2, numbersColl))


We can delete documents with a Source of filters. Use either deleteOne or deleteMany if the filter should target one or many documents.

val source: Source[Bson, NotUsed] = Source.single(Filters.eq("id", 1))
source.runWith(MongoSink.deleteOne(2, numbersColl))

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Test code requires a MongoDB server running in the background. You can start one quickly using docker:

docker-compose up mongo

> mongodb/test
The source code for this page can be found here.