Slick (JDBC) Connector

The Slick connector provides Scala and Java DSLs to create a Source to stream the results of a SQL database query and a Flow/Sink to perform SQL actions (like inserts, updates, and deletes) for each element in a stream. It is built on the Slick library to interact with a long list of supported relational databases.

Reported issues

Tagged issues at Github

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-slick" % "0.19"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-slick_2.12</artifactId>
  <version>0.19</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-slick_2.12', version: '0.19'
}

You will also need to add the JDBC driver(s) for the specific relational database(s) to your project. Most of those database have drivers that are not available from public repositories so unfortunately some manual steps will probably be required. The Slick documentation has information on where to download the drivers.

Usage

As always, before we get started we will need an ActorSystem and a Materializer.

Scala
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
Java
system = ActorSystem.create();
materializer = ActorMaterializer.create(system);

You will also always need the following important imports:

Scala
import akka.stream.scaladsl._
import akka.stream.alpakka.slick.scaladsl._
Java
import akka.stream.javadsl.*;
import akka.stream.alpakka.slick.javadsl.*;

The full examples for using the Source, Sink, and Flow (listed further down) also include all required imports.

Starting a Database Session

All functionality provided by this connector requires the user to first create an instance of SlickSession, which is a thin wrapper around Slick’s database connection management and database profile API.

If you are using Slick in your project, you can create a SlickSession instance by sharing the database configuration:

Scala
val databaseConfig = DatabaseConfig.forConfig[JdbcProfile]("slick-h2")
implicit val session = SlickSession.forConfig(databaseConfig)

Otherwise, you can configure your database using typesafe-config by adding a named configuration to your application.conf and then referring to that configuration when starting the session:

Scala
implicit val session = SlickSession.forConfig("slick-h2")
Java
private static final SlickSession session = SlickSession.forConfig("slick-h2");

Here is an example configuration for the H2 database, which is used for the unit tests of the Slick connector itself:

Configuration
# Load using SlickSession.forConfig("slick-h2")
slick-h2 {
  profile = "slick.jdbc.H2Profile$"
  db {
    connectionPool = disabled
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "org.h2.Driver"
      url = "jdbc:h2:/tmp/alpakka-slick-h2-test"
    }
  }
}

You can specify multiple different database configurations, as long as you use unique names. These can then be loaded by fully qualified configuration name using the SlickSession.forConfig() method described above.

The Slick connector supports all the various ways Slick allows you to configure your JDBC database drivers, connection pools, etc., but we strongly recommend using the so-called “DatabaseConfig” method of configuration, which is the only method explicitly tested to work with the Slick connector.

Below are a few configuration examples for other databases. The Slick connector supports all databases supported by Slick (as of Slick 3.2.x)

Postgres
# Load using SlickSession.forConfig("slick-postgres")
slick-postgres {
  profile = "slick.jdbc.PostgresProfile$"
  db {
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "org.postgresql.Driver"
      url = "jdbc:postgresql://127.0.0.1/slickdemo"
      user = slick
      password = ""
    }
  }
}
MySQL
# Load using SlickSession.forConfig("slick-mysql")
slick-mysql {
  profile = "slick.jdbc.MySQLProfile$"
  db {
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "com.mysql.jdbc.Driver"
      url = "jdbc:mysql://localhost:3306/"
      user = slick
      password = ""
    }
  }
}
DB2
# Load using SlickSession.forConfig("slick-db2")
slick-db2 {
  profile = "slick.jdbc.DB2Profile$"
  db {
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "com.ibm.db2.jcc.DB2Driver"
      url = "jdbc:db2://localhost:50000/sample"
      user = "db2inst1"
      password = "db2-admin-password"
    }
  }
}
Oracle
# Load using SlickSession.forConfig("slick-oracle")
slick-oracle {
  profile = "slick.jdbc.OracleProfile$"
  db {
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "oracle.jdbc.OracleDriver"
      url = "jdbc:oracle:thin:@//localhost:49161/xe"
      user = slick
      password = ""
    }
  }
}
SQL Server
# Load using SlickSession.forConfig("slick-sqlserver")
slick-sqlserver {
  profile = "slick.jdbc.SQLServerProfile$"
  db {
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
      url = "jdbc:sqlserver://localhost:1433"
      user = slick
      password = ""
    }
  }
}

Of course these are just examples. Please visit the Slick documentation for DatabaseConfig.fromConfig for the full list of things to configure.

Closing a Database Session

Slick requires you to eventually close your database session to free up connection pool resources. You would usually do this when terminating the ActorSystem, by registering a termination handler like this:

Scala
system.registerOnTermination(() => session.close())
Java
system.registerOnTermination(session::close);

Using a Slick Source

The Slick connector allows you to perform a SQL query and expose the resulting stream of results as an Akka Streams Source[T]. Where T is any type that can be constructed using a database row.

Plain SQL queries

Both the Scala and Java DSLs support the use of plain SQL queries.

The Scala DSL expects you to use the special sql"...", sqlu"...", and sqlt"..." String interpolators provided by Slick to construct queries.

Unfortunately, String interpolation is a Scala language feature that cannot be directly translated to Java. This means that query strings in the Java DSL will need to be manually prepared using plain Java Strings (or a StringBuilder).

The following examples put it all together to perform a simple streaming query. The full source code for these examples can be found together with the unit tests of the Slick connector on Github.

Scala
import scala.concurrent.Future

import akka.Done
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer

import akka.stream.scaladsl._
import akka.stream.alpakka.slick.scaladsl._

import slick.jdbc.GetResult

object SlickSourceWithPlainSQLQueryExample extends App {
  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()
  implicit val ec = system.dispatcher

  implicit val session = SlickSession.forConfig("slick-h2")

  // The example domain
  case class User(id: Int, name: String)

  // We need this to automatically transform result rows
  // into instances of the User class.
  // Please import slick.jdbc.GetResult
  // See also: "http://slick.lightbend.com/doc/3.2.1/sql.html#result-sets"
  implicit val getUserResult = GetResult(r => User(r.nextInt, r.nextString))

  // This import enables the use of the Slick sql"...",
  // sqlu"...", and sqlt"..." String interpolators.
  // See also: "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation"
  import session.profile.api._

  // Stream the results of a query
  val done: Future[Done] =
    Slick
      .source(sql"SELECT ID, NAME FROM ALPAKKA_SLICK_SCALADSL_TEST_USERS".as[User])
      .log("user")
      .runWith(Sink.ignore)

  done.onComplete {
    case _ =>
      session.close()
      system.terminate()
  }
}
Java
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import akka.Done;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;

import akka.stream.javadsl.*;
import akka.stream.alpakka.slick.javadsl.*;

public class DocSnippetSource {
  public static void main(String[] args) throws Exception {
    final ActorSystem system = ActorSystem.create();
    final Materializer materializer = ActorMaterializer.create(system);

    final SlickSession session = SlickSession.forConfig("slick-h2");

    final CompletionStage<Done> done =
      Slick
        .source(
          session,
          "SELECT ID, NAME FROM ALPAKKA_SLICK_JAVADSL_TEST_USERS",
          (SlickRow row) -> new User(row.nextInt(), row.nextString())
        )
        .log("user")
        .runWith(Sink.ignore(), materializer);

    done.whenComplete((value, exception) -> {
      session.close();
      system.terminate();
    });
  }
}

Typed Queries

The Scala DSL also supports the use of Slick Scala queries, which are more type-safe then their plain SQL equivalent. The code will look very similar to the plain SQL example.

Scala
import scala.concurrent.Future

import akka.Done
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer

import akka.stream.scaladsl._
import akka.stream.alpakka.slick.scaladsl._

object SlickSourceWithTypedQueryExample extends App {
  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()
  implicit val ec = system.dispatcher

  implicit val session = SlickSession.forConfig("slick-h2")

  // This import brings everything you need into scope
  import session.profile.api._

  // The example domain
  class Users(tag: Tag) extends Table[(Int, String)](tag, "ALPAKKA_SLICK_SCALADSL_TEST_USERS") {
    def id = column[Int]("ID")
    def name = column[String]("NAME")
    def * = (id, name)
  }

  // Stream the results of a query
  val done: Future[Done] =
    Slick
      .source(TableQuery[Users].result)
      .log("user")
      .runWith(Sink.ignore)

  done.onComplete {
    case _ =>
      session.close()
      system.terminate()
  }
}

Using a Slick Flow or Sink

If you want to take stream of elements and turn them into side-effecting actions in a relational database, the Slick connector allows you to perform any DML or DDL statement using either a Sink or a Flow. This includes the typical insert/update/delete statements but also create table, drop table, etc. The unit tests have a couple of good examples of the latter usage.

The following example show the use of a Slick Sink to take a stream of elements and insert them into the database. There is an optional parallelism argument to specify how many concurrent streams will be sent to the database. The unit tests for the slick connector have example of performing parallel inserts.

Scala
import scala.concurrent.Future

import akka.Done
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer

import akka.stream.scaladsl._
import akka.stream.alpakka.slick.scaladsl._

object SlickSinkExample extends App {
  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()
  implicit val ec = system.dispatcher

  implicit val session = SlickSession.forConfig("slick-h2")

  // The example domain
  case class User(id: Int, name: String)
  val users = (1 to 42).map(i => User(i, s"Name$i"))

  // This import enables the use of the Slick sql"...",
  // sqlu"...", and sqlt"..." String interpolators.
  // See "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation"
  import session.profile.api._

  // Stream the users into the database as insert statements
  val done: Future[Done] =
    Source(users)
      .runWith(
        // add an optional first argument to specify the parallism factor (Int)
        Slick.sink(user => sqlu"INSERT INTO ALPAKKA_SLICK_SCALADSL_TEST_USERS VALUES(${user.id}, ${user.name})")
      )

  done.onComplete {
    case _ =>
      session.close()
      system.terminate()
  }
}
Java
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import akka.Done;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;

import akka.stream.javadsl.*;
import akka.stream.alpakka.slick.javadsl.*;

public class DocSnippetSink {
  public static void main(String[] args) throws Exception {
    final ActorSystem system = ActorSystem.create();
    final Materializer materializer = ActorMaterializer.create(system);

    final SlickSession session = SlickSession.forConfig("slick-h2");

    final List<User> users = IntStream.range(0, 42).boxed().map((i) -> new User(i, "Name"+i)).collect(Collectors.toList());

    final CompletionStage<Done> done =
      Source
        .from(users)
        .runWith(
          Slick.<User>sink(
            session,
            // add an optional second argument to specify the parallism factor (int)
            (user) -> "INSERT INTO ALPAKKA_SLICK_JAVADSL_TEST_USERS VALUES (" + user.id + ", '" + user.name + "')"
          ),
          materializer
        );

    done.whenComplete((value, exception) -> {
      session.close();
      system.terminate();
    });
  }
}

For completeness, the Slick connector also exposes a Flow that has the exact same functionality as the Sink but it allows you to continue the stream for further processing. The return value of every executed statement, e.g. the element values is the fixed type Int denoting the number of updated/inserted/deleted rows.

Scala
import scala.concurrent.Future

import akka.Done
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer

import akka.stream.scaladsl._
import akka.stream.alpakka.slick.scaladsl._

object SlickFlowExample extends App {
  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()
  implicit val ec = system.dispatcher

  implicit val session = SlickSession.forConfig("slick-h2")

  // The example domain
  case class User(id: Int, name: String)
  val users = (1 to 42).map(i => User(i, s"Name$i"))

  // This import enables the use of the Slick sql"...",
  // sqlu"...", and sqlt"..." String interpolators.
  // See "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation"
  import session.profile.api._

  // Stream the users into the database as insert statements
  val done: Future[Done] =
    Source(users)
      .via(
        // add an optional first argument to specify the parallism factor (Int)
        Slick.flow(user => sqlu"INSERT INTO ALPAKKA_SLICK_SCALADSL_TEST_USERS VALUES(${user.id}, ${user.name})")
      )
      .log("nr-of-updated-rows")
      .runWith(Sink.ignore)

  done.onComplete {
    case _ =>
      session.close()
      system.terminate()
  }
}
Java
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import akka.Done;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;

import akka.stream.javadsl.*;
import akka.stream.alpakka.slick.javadsl.*;

public class DocSnippetFlow {
  public static void main(String[] args) throws Exception {
    final ActorSystem system = ActorSystem.create();
    final Materializer materializer = ActorMaterializer.create(system);

    final SlickSession session = SlickSession.forConfig("slick-h2");

    final List<User> users = IntStream.range(0, 42).boxed().map((i) -> new User(i, "Name"+i)).collect(Collectors.toList());

    final CompletionStage<Done> done =
      Source
        .from(users)
        .via(
          Slick.<User>flow(
            session,
            // add an optional second argument to specify the parallism factor (int)
            (user) -> "INSERT INTO ALPAKKA_SLICK_JAVADSL_TEST_USERS VALUES (" + user.id + ", '" + user.name + "')"
          )
        )
        .log("nr-of-updated-rows")
        .runWith(Sink.ignore(), materializer);

    done.whenComplete((value, exception) -> {
      session.close();
      system.terminate();
    });
  }
}

To have a different return type, use the flowWithPassThrough function. E.g. when consuming Kafka messages, this allows you to maintain the kafka committable offset so the message can be committed in a next stage in the flow.

Scala
import scala.concurrent.Future

import akka.Done
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer

import akka.stream.scaladsl._
import akka.stream.alpakka.slick.scaladsl._

// We're going to pretend we got messages from kafka.
// After we've written them to a db with Slick, we want
// to commit the offset to Kafka
object SlickFlowWithPassThroughExample extends App {

  // mimics a Kafka 'Committable' type
  case class CommittableOffset(offset: Int) {
    def commit: Future[Done] = Future.successful(Done)
  }
  case class KafkaMessage[A](msg: A, offset: CommittableOffset) {
    // map the msg and keep the offset
    def map[B](f: A => B): KafkaMessage[B] = KafkaMessage(f(msg), offset)
  }

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()
  implicit val ec = system.dispatcher

  implicit val session = SlickSession.forConfig("slick-h2")

  // The example domain
  case class User(id: Int, name: String)
  val users = (1 to 42).map(i => User(i, s"Name$i"))
  val messagesFromKafka = users.zipWithIndex.map { case (user, index) => KafkaMessage(user, CommittableOffset(index)) }

  // This import enables the use of the Slick sql"...",
  // sqlu"...", and sqlt"..." String interpolators.
  // See "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation"
  import session.profile.api._

  // Stream the users into the database as insert statements
  val done: Future[Done] =
    Source(messagesFromKafka)
      .via(
        // add an optional first argument to specify the parallism factor (Int)
        Slick.flowWithPassThrough { kafkaMessage =>
          val user = kafkaMessage.msg
          (sqlu"INSERT INTO ALPAKKA_SLICK_SCALADSL_TEST_USERS VALUES(${user.id}, ${user.name})")
            .map { insertCount => // map db result to something else
              // allows to keep the kafka message offset so it can be committed in a next stage
              kafkaMessage.map(user => (user, insertCount))
            }
        }
      )
      .log("nr-of-updated-rows")
      .mapAsync(1) { // in correct order
        kafkaMessage =>
          kafkaMessage.offset.commit // commit kafka messages
      }
      .runWith(Sink.ignore)

  done.onComplete {
    case _ =>
      session.close()
      system.terminate()
  }
}
Java

import akka.Done; import akka.actor.ActorSystem; import akka.japi.Pair; import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; // We're going to pretend we got messages from kafka. // After we've written them to a db with Slick, we want // to commit the offset to Kafka public class DocSnippetFlowWithPassThrough { // mimics a Kafka 'Committable' type static class CommittableOffset { private Integer offset; public CommittableOffset(Integer offset) { this.offset = offset; } public CompletableFuture<Done> commit() { return CompletableFuture.completedFuture(Done.getInstance()); } } static class KafkaMessage<A> { final A msg; final CommittableOffset offset; public KafkaMessage(A msg, CommittableOffset offset) { this.msg = msg; this.offset = offset; } public <B> KafkaMessage<B> map(Function<A, B> f) { return new KafkaMessage(f.apply(msg), offset); } } public static void main(String[] args) throws Exception { final ActorSystem system = ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system); final SlickSession session = SlickSession.forConfig("slick-h2"); final List<User> users = IntStream.range(0, 42).boxed().map((i) -> new User(i, "Name"+i)).collect(Collectors.toList()); List<KafkaMessage<User>> messagesFromKafka = users.stream() .map(user -> new KafkaMessage<>(user, new CommittableOffset(users.indexOf(user)))) .collect(Collectors.toList()); final CompletionStage<Done> done = Source .from(messagesFromKafka) .via( Slick.flowWithPassThrough( session, system.dispatcher(), // add an optional second argument to specify the parallism factor (int) (kafkaMessage) -> "INSERT INTO ALPAKKA_SLICK_JAVADSL_TEST_USERS VALUES (" + kafkaMessage.msg.id + ", '" + kafkaMessage.msg.name + "')", (kafkaMessage, insertCount) -> kafkaMessage.map(user -> new Pair(user, insertCount)) // allows to keep the kafka message offset so it can be committed in a next stage ) ) .log("nr-of-updated-rows") .mapAsync(1, kafkaMessage -> kafkaMessage.offset.commit()) // in correct order, commit Kafka message .runWith(Sink.ignore(), materializer); done.whenComplete((value, exception) -> { session.close(); system.terminate(); }); } }
The source code for this page can be found here.