Apache Kudu

A flow and a composite sink to write element in Kudu.

Apache Kudu is a free and open source column-oriented data store of the Apache Hadoop ecosystem.

Reported issues

Tagged issues at Github


Build a converter and a tableSetting.

Converter will map the domain object to Kudu row.

case class Person(id: Int, name: String)

val cols = new util.ArrayList[ColumnSchema]()
cols.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build)
cols.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build)
val schema = new Schema(cols)

val kuduConverter: Person => PartialRow = { person =>
  val partialRow = schema.newPartialRow()
  partialRow.addInt(0, person.id)
  partialRow.addString(1, person.name)
Full source at GitHub
List<ColumnSchema> columns = new ArrayList(2);
columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build());
schema = new Schema(columns);
Function<Person, PartialRow> kuduConverter =
    person -> {
      PartialRow partialRow = schema.newPartialRow();
      partialRow.addInt(0, person.id);
      partialRow.addString(1, person.name);
      return partialRow;
Full source at GitHub

Table will be created on demand.

val rangeKeys: util.List[String] = new util.ArrayList[String]
val createTableOptions = new CreateTableOptions().setNumReplicas(1).setRangePartitionColumns(rangeKeys)
val kuduClient = new KuduClient.KuduClientBuilder("localhost:7051").build
val kuduTableSettings = KuduTableSettings(kuduClient, "test", schema, createTableOptions, kuduConverter)
Full source at GitHub
List<String> rangeKeys = new ArrayList<>();
createTableOptions =
    new CreateTableOptions().setNumReplicas(1).setRangePartitionColumns(rangeKeys);
KuduTableSettings<Person> tableSettings =
        setupKuduClient(), "tablenameSink", schema, createTableOptions, kuduConverter);
Full source at GitHub

Flow usage

val flow = KuduTableStage.flow[Person](kuduTableSettings)

val f = Source(11 to 20).map(i => Person(i, s"zozo_$i")).via(flow).runWith(Sink.fold(0)((a, d) => a + d.id))
Full source at GitHub
Flow<Person, Person, NotUsed> flow = KuduTableStage.flow(tableSettings);
Pair<NotUsed, CompletionStage<List<Person>>> run =
    Source.from(Arrays.asList(200, 201, 202, 203, 204))
        .map((i) -> new Person(i, String.format("name_%d", i)))
        .toMat(Sink.seq(), Keep.both())
Full source at GitHub

Sink usage

val sink = KuduTableStage.sink[Person](kuduTableSettings)

val f = Source(1 to 10).map(i => Person(i, s"zozo_$i")).runWith(sink)
Full source at GitHub
final Sink<Person, Future<Done>> sink = KuduTableStage.sink(tableSettings);
Future<Done> o =
    Source.from(Arrays.asList(100, 101, 102, 103, 104))
        .map((i) -> new Person(i, String.format("name %d", i)))
        .runWith(sink, materializer);
Full source at GitHub
The source code for this page can be found here.