MongoDB Connector

The MongoDB connector allows you to read and save documents. You can query as a stream of documents from MongoSource or update documents in a collection with MongoSink.

This connector is based off the mongo-scala-driver and does not have a java interface. It supports driver macros and codec allowing to read or write scala case class objects.

Alternative connector

Another MongoDB connector is available.

ReactiveMongo is a Scala driver that provides fully non-blocking and asynchronous I/O operations.

Please read more about it in the ReactiveMongo documentation.

Reported issues

Tagged issues at Github

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.19"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-mongodb_2.12</artifactId>
  <version>0.19</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-mongodb_2.12', version: '0.19'
}

Usage

Sources provided by this connector need a prepared session to communicate with MongoDB server.

For codec and macros support, you first need to provide a case class and a codecRegistry.

Scala
case class Number(_id: Int)
val codecRegistry = fromRegistries(fromProviders(classOf[Number]), DEFAULT_CODEC_REGISTRY)

Then, lets initialize a MongoDB connection.

Scala
private val client = MongoClient(s"mongodb://localhost:27017")
private val db = client.getDatabase("alpakka-mongo")
private val numbersColl = db.getCollection("numbers")

For codec support, add the registry to the database or the collection.

Scala
private val numbersObjectColl = db.getCollection("numbers").withCodecRegistry(codecRegistry)

We will also need an ActorSystem and an ActorMaterializer.

Scala
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()

This is all preparation that we are going to need.

Source Usage

Let’s create a source from a MongoDB collection observable, which can optionally take a filter.

Scala
val source: Source[Document, NotUsed] =
  MongoSource(numbersColl.find())

With codec support, adapt the type of the source.

Scala
val source: Source[Number, NotUsed] =
  MongoSource[Number](numbersObjectColl.find())

And finally we can run it.

Scala
val rows: Future[Seq[Document]] = source.runWith(Sink.seq)

With codec support

Scala
val rows: Future[Seq[Number]] = source.runWith(Sink.seq)

Here we used a basic sink to complete the stream by collecting all of the stream elements to a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build more advanced stream topologies.

Flow and Sink Usage

Each of these sink factory methods have a corresponding factory in insertOne which will emit the written document or result of the operation downstream.

For codec support, the type must be specified in the database or collection declaration.

Scala
private val numbersObjectColl: MongoCollection[Number] = db.getCollection("numbersSink")

Insert

We can use a Source of documents to save them to a mongo collection using insertOne or insertMany.

Scala
val source: Source[Document, NotUsed] = ???
source.runWith(MongoSink.insertOne(parallelism = 2, collection = numbersColl))

With codec support

Scala
val source: Source[Number, NotUsed] = ???
source.runWith(MongoSink.insertOne[Number](parallelism = 2, collection = numbersObjectColl))

Insert Many

Insert many can be used if you have a collection of documents to insert at once.

Scala
val source: Source[Seq[Document], NotUsed] = ???
source.runWith(MongoSink.insertMany(parallelism = 2, collection = numbersColl))

With codec support

Scala
val source: Source[Seq[Number], NotUsed] = ???
source.runWith(MongoSink.insertMany[Number](parallelism = 2, collection = numbersObjectColl))

Update

We can update documents with a Source of DocumentUpdate which is a filter and a update definition. Use either updateOne or updateMany if the filter should target one or many documents.

Scala
import org.mongodb.scala.model.{Filters, Updates}

val source: Source[DocumentUpdate, NotUsed] = Source
  .single(DocumentUpdate(filter = Filters.eq("id", 1), update = Updates.set("updateValue", 0)))

source.runWith(MongoSink.updateOne(2, numbersColl))

Delete

We can delete documents with a Source of filters. Use either deleteOne or deleteMany if the filter should target one or many documents.

Scala
val source: Source[Bson, NotUsed] = Source.single(Filters.eq("id", 1))
source.runWith(MongoSink.deleteOne(2, numbersColl))

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Test code requires a MongoDB server running in the background. You can start one quickly using docker:

docker-compose up mongo

Scala
sbt
> mongodb/test
The source code for this page can be found here.