Cassandra Connector

The Cassandra connector allows you to read and write to Cassandra. You can query a stream of rows from CassandraSource or use prepared statements to insert or update with CassandraFlow or CassandraSink.

Unlogged batches are also supported.

Reported issues

Tagged issues at Github

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.19"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-cassandra_2.12</artifactId>
  <version>0.19</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-cassandra_2.12', version: '0.19'
}

Usage

Sources provided by this connector need a prepared session to communicate with Cassandra cluster. First, let’s initialize a Cassandra session.

Scala
implicit val session = Cluster.builder
  .addContactPoint("127.0.0.1")
  .withPort(9042)
  .build
  .connect()
Java
final Session session = Cluster.builder()
  .addContactPoint("127.0.0.1").withPort(9042)
  .build().connect();

We will also need an ActorSystem and an ActorMaterializer.

Scala
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
Java
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);

This is all preparation that we are going to need.

Source Usage

Let’s create a Cassandra statement with a query that we want to execute.

Scala
val stmt = new SimpleStatement(s"SELECT * FROM $keyspaceName.test").setFetchSize(20)
Java
final Statement stmt = new SimpleStatement("SELECT * FROM akka_stream_java_test.test").setFetchSize(20);

And finally create the source using any method from the CassandraSource factory and run it.

Scala
val rows = CassandraSource(stmt).runWith(Sink.seq)
Java
final CompletionStage<List<Row>> rows = CassandraSource.create(stmt, session)
  .runWith(Sink.seq(), materializer);

Here we used a basic sink to complete the stream by collecting all of the stream elements to a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build more advanced stream topologies.

Flow with passthrough Usage

Let’s create a Cassandra Prepared statement with a query that we want to execute.

Scala
val preparedStatement = session.prepare(s"INSERT INTO $keyspaceName.test(id) VALUES (?)")
Java
final PreparedStatement preparedStatement = session.prepare("insert into akka_stream_java_test.test (id) values (?)");

Now we need to create a ‘statement binder’, this is just a function to bind to the prepared statement. It can take in any type / data structure to fit your query values. Here we’re just using one Integer, but it can just as easily be a (case) class.

Scala
val statementBinder = (myInteger: Integer, statement: PreparedStatement) => statement.bind(myInteger)
Java
BiFunction<Integer, PreparedStatement,BoundStatement> statementBinder = (myInteger, statement) -> statement.bind(myInteger);

We run the stream persisting the elements to C* and finally folding them using a Sink.fold.

Scala
val flow = CassandraFlow.createWithPassThrough[Integer](parallelism = 2, preparedStatement, statementBinder)

val result = source.via(flow).runWith(Sink.seq)
Java
final Flow<Integer, Integer, NotUsed> flow = CassandraFlow.createWithPassThrough(2, preparedStatement, statementBinder, session, ec);

CompletionStage<List<Integer>> result = source.via(flow).runWith(Sink.seq(), materializer);

Flow with passthrough and unlogged batching Usage

Use this when most of the elements in the stream share the same partition key.

Cassandra unlogged batches that share the same partition key will only resolve to one write internally in Cassandra, boosting write performance.

Be aware that this stage does not preserve the upstream order!

For this example we will define a class that model the data to be inserted

Scala
case class ToInsert(id: Integer, cc: Integer)
Java
private class ToInsert {
  Integer id;
  Integer cc;

  public ToInsert(Integer id, Integer cc) {
    this.id = id;
    this.cc = cc;
  }
}

Let’s create a Cassandra Prepared statement with a query that we want to execute.

Scala
val preparedStatement = session.prepare(s"INSERT INTO $keyspaceName.test_batch(id, cc) VALUES (?, ?)")
Java
final PreparedStatement preparedStatement = session.prepare("insert into akka_stream_java_test.test_batch(id, cc) values (?, ?)");

Now we need to create a ‘statement binder’, this is just a function to bind to the prepared statement. In this example we are using a class.

Scala
val statementBinder =
  (elemToInsert: ToInsert, statement: PreparedStatement) => statement.bind(elemToInsert.id, elemToInsert.cc)
Java
BiFunction<ToInsert, PreparedStatement,BoundStatement> statementBinder = (toInsert, statement) -> statement.bind(toInsert.id, toInsert.cc);

You can define the amount of grouped elements, in this case we will use the default ones:

Scala
val settings: CassandraBatchSettings = CassandraBatchSettings.Defaults
Java
CassandraBatchSettings defaultSettings = CassandraBatchSettings.Defaults();

We run the stream persisting the elements to C* and finally folding them using a Sink.fold. The function T => K has to extract the Cassandra partition key from your class.

Scala
val flow = CassandraFlow.createUnloggedBatchWithPassThrough[ToInsert, Integer](parallelism = 2,
                                                                               preparedStatement,
                                                                               statementBinder,
                                                                               ti => ti.id,
                                                                               settings)

val result = source.via(flow).runWith(Sink.seq)
Java
final Flow<ToInsert, ToInsert, NotUsed> flow = CassandraFlow.createUnloggedBatchWithPassThrough(2, preparedStatement,
        statementBinder, (ti) -> ti.id, defaultSettings, session, ec);

CompletionStage<List<ToInsert>> result = source.via(flow).runWith(Sink.seq(), materializer);

Sink Usage

Let’s create a Cassandra Prepared statement with a query that we want to execute.

Scala
val preparedStatement = session.prepare(s"INSERT INTO $keyspaceName.test(id) VALUES (?)")
Java
final PreparedStatement preparedStatement = session.prepare("insert into akka_stream_java_test.test (id) values (?)");

Now we need to create a ‘statement binder’, this is just a function to bind to the prepared statement. It can take in any type / data structure to fit your query values. Here we’re just using one Integer, but it can just as easily be a (case) class.

Scala
val statementBinder = (myInteger: Integer, statement: PreparedStatement) => statement.bind(myInteger)
Java
BiFunction<Integer, PreparedStatement,BoundStatement> statementBinder = (myInteger, statement) -> statement.bind(myInteger);

Finally we run the sink from any source.

Scala
val sink = CassandraSink[Integer](parallelism = 2, preparedStatement, statementBinder)

val result = source.runWith(sink)
Java
final Sink<Integer, CompletionStage<Done>> sink = CassandraSink.create(2, preparedStatement, statementBinder, session);

CompletionStage<Done> result = source.runWith(sink, materializer);

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Test code requires Cassandra running in the background. You can start it quickly using docker:

docker-compose up cassandra

Scala
sbt
> cassandra/testOnly *.CassandraSourceSpec
Java
sbt
> cassandra/testOnly *.CassandraSourceTest
The source code for this page can be found here.