Apache Cassandra
The Cassandra connector allows you to read and write to Cassandra. You can query a stream of rows from CassandraSource or use prepared statements to insert or update with CassandraFlow or CassandraSink.
Unlogged batches are also supported.
Project Info: Alpakka Cassandra | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-cassandra
1.0.2
|
JDK versions | OpenJDK 8 |
Scala versions | 2.12.7, 2.11.12, 2.13.0-M5 |
JPMS module name | akka.stream.alpakka.cassandra |
License | |
Readiness level |
Since 0.3, 2016-12-02
|
Home page | https://doc.akka.io/docs/alpakka/current/ |
API documentation | |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "1.0.2"
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-cassandra_2.12</artifactId> <version>1.0.2</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-cassandra_2.12', version: '1.0.2' }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version License com.datastax.cassandra cassandra-driver-core 3.5.1 Apache 2 com.typesafe.akka akka-stream_2.12 2.5.22 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause - Dependency tree
com.datastax.cassandra cassandra-driver-core 3.5.1 Apache 2 com.github.jnr jnr-ffi 2.1.7 The Apache Software License, Version 2.0 com.github.jnr jffi 1.2.16 The Apache Software License, Version 2.0 com.github.jnr jnr-x86asm 1.0.2 MIT License org.ow2.asm asm-analysis 5.0.3 BSD org.ow2.asm asm-tree 5.0.3 BSD org.ow2.asm asm 5.0.3 BSD org.ow2.asm asm-commons 5.0.3 BSD org.ow2.asm asm-tree 5.0.3 BSD org.ow2.asm asm 5.0.3 BSD org.ow2.asm asm-tree 5.0.3 BSD org.ow2.asm asm 5.0.3 BSD org.ow2.asm asm-util 5.0.3 BSD org.ow2.asm asm-tree 5.0.3 BSD org.ow2.asm asm 5.0.3 BSD org.ow2.asm asm 5.0.3 BSD com.github.jnr jnr-posix 3.0.44 Common Public License - v 1.0 com.github.jnr jnr-constants 0.9.9 The Apache Software License, Version 2.0 com.github.jnr jnr-ffi 2.1.7 The Apache Software License, Version 2.0 com.github.jnr jffi 1.2.16 The Apache Software License, Version 2.0 com.github.jnr jnr-x86asm 1.0.2 MIT License org.ow2.asm asm-analysis 5.0.3 BSD org.ow2.asm asm-tree 5.0.3 BSD org.ow2.asm asm 5.0.3 BSD org.ow2.asm asm-commons 5.0.3 BSD org.ow2.asm asm-tree 5.0.3 BSD org.ow2.asm asm 5.0.3 BSD org.ow2.asm asm-tree 5.0.3 BSD org.ow2.asm asm 5.0.3 BSD org.ow2.asm asm-util 5.0.3 BSD org.ow2.asm asm-tree 5.0.3 BSD org.ow2.asm asm 5.0.3 BSD org.ow2.asm asm 5.0.3 BSD com.google.guava guava 19.0 The Apache Software License, Version 2.0 io.dropwizard.metrics metrics-core 3.2.2 Apache License 2.0 org.slf4j slf4j-api 1.7.25 MIT License io.netty netty-handler 4.0.56.Final Apache License, Version 2.0 io.netty netty-buffer 4.0.56.Final Apache License, Version 2.0 io.netty netty-common 4.0.56.Final Apache License, Version 2.0 io.netty netty-codec 4.0.56.Final Apache License, Version 2.0 io.netty netty-transport 4.0.56.Final Apache License, Version 2.0 io.netty netty-buffer 4.0.56.Final Apache License, Version 2.0 io.netty netty-common 4.0.56.Final Apache License, Version 2.0 io.netty netty-transport 4.0.56.Final Apache License, Version 2.0 io.netty netty-buffer 4.0.56.Final Apache License, Version 2.0 io.netty netty-common 4.0.56.Final Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License com.typesafe.akka akka-stream_2.12 2.5.22 Apache License, Version 2.0 com.typesafe.akka akka-actor_2.12 2.5.22 Apache License, Version 2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-protobuf_2.12 2.5.22 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe ssl-config-core_2.12 0.3.7 Apache-2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-parser-combinators_2.12 1.1.1 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.reactivestreams reactive-streams 1.0.2 CC0 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause
We intend to bring in the Cassandra client part of Akka Persistence Cassandra to Alpakka. This will mean changes to this API.
See issue #1213
Source
Sources provided by this connector need a prepared session to communicate with Cassandra cluster. First, let’s initialize a Cassandra session.
- Scala
-
implicit val session = Cluster.builder .addContactPoint("127.0.0.1") .withPort(9042) .build .connect()
- Java
-
final Session session = Cluster.builder().addContactPoint("127.0.0.1").withPort(9042).build().connect();
We will also need an ActorSystem and an ActorMaterializer.
- Scala
-
implicit val system = ActorSystem() implicit val mat = ActorMaterializer()
- Java
-
final ActorSystem system = ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system);
Let’s create a Cassandra statement with a query that we want to execute.
- Scala
-
val stmt = new SimpleStatement(s"SELECT * FROM $keyspaceName.test").setFetchSize(20)
- Java
-
final Statement stmt = new SimpleStatement("SELECT * FROM akka_stream_java_test.test").setFetchSize(20);
And finally create the source using any method from the CassandraSourceCassandraSource factory and run it.
- Scala
-
val rows = CassandraSource(stmt).runWith(Sink.seq)
- Java
-
final CompletionStage<List<Row>> rows = CassandraSource.create(stmt, session).runWith(Sink.seq(), materializer);
Here we used a basic sink to complete the stream by collecting all of the stream elements to a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build more advanced stream topologies.
Flow with passthrough
Let’s create a Cassandra Prepared statement with a query that we want to execute.
- Scala
-
val preparedStatement = session.prepare(s"INSERT INTO $keyspaceName.test(id) VALUES (?)")
- Java
-
final PreparedStatement preparedStatement = session.prepare("insert into akka_stream_java_test.test (id) values (?)");
Now we need to create a ‘statement binder’, this is just a function to bind to the prepared statement. It can take in any type / data structure to fit your query values. Here we’re just using one Integer, but it can just as easily be a (case) class.
- Scala
-
val statementBinder = (myInteger: Integer, statement: PreparedStatement) => statement.bind(myInteger)
- Java
-
BiFunction<Integer, PreparedStatement, BoundStatement> statementBinder = (myInteger, statement) -> statement.bind(myInteger);
We run the stream persisting the elements to C* and finally folding them using a Sink.fold
.
- Scala
-
val flow = CassandraFlow.createWithPassThrough[Integer](parallelism = 2, preparedStatement, statementBinder) val result = source.via(flow).runWith(Sink.seq)
- Java
-
final Flow<Integer, Integer, NotUsed> flow = CassandraFlow.createWithPassThrough(2, preparedStatement, statementBinder, session); CompletionStage<List<Integer>> result = source.via(flow).runWith(Sink.seq(), materializer);
Flow with passthrough and unlogged batching
Use this when most of the elements in the stream share the same partition key.
Cassandra unlogged batches that share the same partition key will only resolve to one write internally in Cassandra, boosting write performance.
Be aware that this stage does not preserve the upstream order!
For this example we will define a class that model the data to be inserted
- Scala
-
case class ToInsert(id: Integer, cc: Integer)
- Java
-
private class ToInsert { Integer id; Integer cc; public ToInsert(Integer id, Integer cc) { this.id = id; this.cc = cc; } }
Let’s create a Cassandra Prepared statement with a query that we want to execute.
- Scala
-
val preparedStatement = session.prepare(s"INSERT INTO $keyspaceName.test_batch(id, cc) VALUES (?, ?)")
- Java
-
final PreparedStatement preparedStatement = session.prepare("insert into akka_stream_java_test.test_batch(id, cc) values (?, ?)");
Now we need to create a ‘statement binder’, this is just a function to bind to the prepared statement. In this example we are using a class.
- Scala
-
val statementBinder = (elemToInsert: ToInsert, statement: PreparedStatement) => statement.bind(elemToInsert.id, elemToInsert.cc)
- Java
-
BiFunction<ToInsert, PreparedStatement, BoundStatement> statementBinder = (toInsert, statement) -> statement.bind(toInsert.id, toInsert.cc);
You can define the amount of grouped elements, in this case we will use the default ones:
- Scala
-
val settings: CassandraBatchSettings = CassandraBatchSettings()
- Java
-
CassandraBatchSettings defaultSettings = CassandraBatchSettings.create();
We run the stream persisting the elements to C* and finally folding them using a Sink.fold
. The function T => K has to extract the Cassandra partition key from your class.
- Scala
-
val flow = CassandraFlow.createUnloggedBatchWithPassThrough[ToInsert, Integer](parallelism = 2, preparedStatement, statementBinder, ti => ti.id, settings) val result = source.via(flow).runWith(Sink.seq)
- Java
-
final Flow<ToInsert, ToInsert, NotUsed> flow = CassandraFlow.createUnloggedBatchWithPassThrough( 2, preparedStatement, statementBinder, (ti) -> ti.id, defaultSettings, session); CompletionStage<List<ToInsert>> result = source.via(flow).runWith(Sink.seq(), materializer);
Sink
Let’s create a Cassandra Prepared statement with a query that we want to execute.
- Scala
-
val preparedStatement = session.prepare(s"INSERT INTO $keyspaceName.test(id) VALUES (?)")
- Java
-
final PreparedStatement preparedStatement = session.prepare("insert into akka_stream_java_test.test (id) values (?)");
Now we need to create a ‘statement binder’, this is just a function to bind to the prepared statement. It can take in any type / data structure to fit your query values. Here we’re just using one Integer, but it can just as easily be a (case) class.
- Scala
-
val statementBinder = (myInteger: Integer, statement: PreparedStatement) => statement.bind(myInteger)
- Java
-
BiFunction<Integer, PreparedStatement, BoundStatement> statementBinder = (myInteger, statement) -> statement.bind(myInteger);
Finally we run the sink from any source.
- Scala
-
val sink = CassandraSink[Integer](parallelism = 2, preparedStatement, statementBinder) val result = source.runWith(sink)
- Java
-
final Sink<Integer, CompletionStage<Done>> sink = CassandraSink.create(2, preparedStatement, statementBinder, session); CompletionStage<Done> result = source.runWith(sink, materializer);
Running the example code
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
Test code requires Cassandra running in the background. You can start it quickly using docker:
docker-compose up cassandra
- Scala
-
sbt > cassandra/testOnly *.CassandraSourceSpec
- Java
-
sbt > cassandra/testOnly *.CassandraSourceTest