Cassandra Connector

The Cassandra connector provides a way to provide the result of a Cassandra query as a stream of rows.

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.3"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-cassandra_2.11</artifactId>
  <version>0.3</version>
</dependency>
Gradle
dependencies {
  compile group: "com.lightbend.akka", name: "akka-stream-alpakka-cassandra_2.11", version: "0.3"
}

Usage

Sources provided by this connector need a prepared session to communicate with Cassandra cluster. First, lets initialize a Cassandra session.

Scala
implicit val session = Cluster.builder.addContactPoint("127.0.0.1").withPort(9042).build.connect()
Java
final Session session = Cluster.builder()
  .addContactPoint("127.0.0.1").withPort(9042)
  .build().connect();

We will also need an ActorSystem and an ActorMaterializer.

Scala
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
Java
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);

This is all preparation that we are going to need.

Source Usage

Let’s create a Cassandra statement with a query that we want to execute.

Scala
val stmt = new SimpleStatement("SELECT * FROM akka_stream_scala_test.test").setFetchSize(20)
Java
final Statement stmt = new SimpleStatement("SELECT * FROM akka_stream_java_test.test").setFetchSize(20);

And finally create the source using any method from the CassandraSource factory and run it.

Scala
val rows = CassandraSource(stmt).runWith(Sink.seq)
Java
final CompletionStage<List<Row>> rows = CassandraSource.create(stmt, session)
  .runWith(Sink.seq(), materializer);

Here we used a basic sink to complete the stream by collecting all of the stream elements to a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build more advanced stream topologies.

Sink Usage

Let’s create a Cassandra Prepared statement with a query that we want to execute.

Scala
val preparedStatement = session.prepare("INSERT INTO akka_stream_scala_test.test(id) VALUES (?)")
Java
final PreparedStatement preparedStatement = session.prepare("insert into akka_stream_java_test.test (id) values (?)");

Now lets we need to create a ‘statement binder’, this is just a function to bind to the prepared statement. It can take in any type / data structure to fit your query values. Here we’re just using one Integer, but it can just as easily be a (case) class.

Scala
val statementBinder = (myInteger: Integer, statement: PreparedStatement) => statement.bind(myInteger)
Java
BiFunction<Integer, PreparedStatement,BoundStatement> statementBinder = (myInteger, statement) -> {
  return statement.bind(myInteger);
};

Finally we run the sink from any source.

Scala
val sink = CassandraSink[Integer](parallelism = 2, preparedStatement, statementBinder)

val result = source.runWith(sink)
Java
final Sink<Integer, CompletionStage<Done>> sink = CassandraSink.create(2, preparedStatement, statementBinder, session, system.dispatcher());

CompletionStage<Done> result = source.runWith(sink, materializer);

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Test code requires Cassandra server running in the background. You can start one quickly using docker:

docker run --rm -p 9042:9042 cassandra:3

Scala
sbt
> cassandra/testOnly *.CassandraSourceSpec
Java
sbt
> cassandra/testOnly *.CassandraSourceTest