HBase

A flow and a composite sink to write element in HBase.

HBase is a column family NoSQL Database backed by HDFS.

Reported issues

Tagged issues at Github

Usage

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-hbase" % "0.20"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-hbase_2.12</artifactId>
  <version>0.20</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-hbase_2.12', version: '0.20'
}

Build a converter and a tableSetting.

Converter will map the domain object to list of HBase mutations (Append, Delete, Increment, Put).

Here some examples:

  • A Put mutation:
scala
implicit def toBytes(string: String): Array[Byte] = Bytes.toBytes(string)
case class Person(id: Int, name: String)

val hBaseConverter: Person => immutable.Seq[Mutation] = { person =>
  val put = new Put(s"id_${person.id}")
  put.addColumn("info", "name", person.name)
  List(put)
}
Full source at GitHub
java
Function<Person, List<Mutation>> hBaseConverter =
  person -> {
    try {
      Put put = new Put(String.format("id_%d", person.id).getBytes("UTF-8"));
      put.addColumn(
        "info".getBytes("UTF-8"), "name".getBytes("UTF-8"), person.name.getBytes("UTF-8"));

      return Collections.singletonList(put);
    } catch (UnsupportedEncodingException e) {
      e.printStackTrace();
      return Collections.emptyList();
    }
  };
Full source at GitHub
  • An Append mutation:
scala
val appendHBaseConverter: Person => immutable.Seq[Mutation] = { person =>
  // Append to a cell
  val append = new Append(s"id_${person.id}")
  append.add("info", "aliases", person.name)
  List(append)
}
Full source at GitHub
java
Function<Person, List<Mutation>> appendHBaseConverter =
  person -> {
    try {
      Append append = new Append(String.format("id_%d", person.id).getBytes("UTF-8"));
      append.add(
        "info".getBytes("UTF-8"), "aliases".getBytes("UTF-8"), person.name.getBytes("UTF-8"));

      return Collections.singletonList(append);
    } catch (UnsupportedEncodingException e) {
      e.printStackTrace();
      return Collections.emptyList();
    }
  };
Full source at GitHub
  • A Delete mutation:
scala
val deleteHBaseConverter: Person => immutable.Seq[Mutation] = { person =>
  // Delete the specified row
  val delete = new Delete(s"id_${person.id}")
  List(delete)
}
Full source at GitHub
java
Function<Person, List<Mutation>> deleteHBaseConverter =
  person -> {
    try {
      Delete delete = new Delete(String.format("id_%d", person.id).getBytes("UTF-8"));

      return Collections.singletonList(delete);
    } catch (UnsupportedEncodingException e) {
      e.printStackTrace();
      return Collections.emptyList();
    }
  };
Full source at GitHub
  • An Increment mutation:
scala
val incrementHBaseConverter: Person => immutable.Seq[Mutation] = { person =>
  // Increment a cell value
  val increment = new Increment(s"id_${person.id}")
  increment.addColumn("info", "numberOfChanges", 1)
  List(increment)
}
Full source at GitHub
java
Function<Person, List<Mutation>> incrementHBaseConverter =
  person -> {
    try {
      Increment increment = new Increment(String.format("id_%d", person.id).getBytes("UTF-8"));
      increment.addColumn("info".getBytes("UTF-8"), "numberOfChanges".getBytes("UTF-8"), 1);

      return Collections.singletonList(increment);
    } catch (UnsupportedEncodingException e) {
      e.printStackTrace();
      return Collections.emptyList();
    }
  };
Full source at GitHub

To ignore an object just return an empty List, this will have no effect on HBase. You can also combine mutations to perform complex business logic:

scala
val mutationsHBaseConverter: Person => immutable.Seq[Mutation] = { person =>
  if (person.id != 0) {
    if (person.name.isEmpty) {
      // Delete the specified row
      val delete = new Delete(s"id_${person.id}")
      List(delete)
    } else {
      // Insert or update a row
      val put = new Put(s"id_${person.id}")
      put.addColumn("info", "name", person.name)

      val increment = new Increment(s"id_${person.id}")
      increment.addColumn("info", "numberOfChanges", 1)

      List(put, increment)
    }
  } else {
    List.empty
  }
}
Full source at GitHub
java
Function<Person, List<Mutation>> complexHBaseConverter =
  person -> {
    try {
      byte[] id = String.format("id_%d", person.id).getBytes("UTF-8");
      byte[] infoFamily = "info".getBytes("UTF-8");

      if (person.id != 0 && person.name.isEmpty()) {
        Delete delete = new Delete(id);
        return Collections.singletonList(delete);
      } else if (person.id != 0) {
        Put put = new Put(id);
        put.addColumn(infoFamily, "name".getBytes("UTF-8"), person.name.getBytes("UTF-8"));

        Increment increment = new Increment(id);
        increment.addColumn(infoFamily, "numberOfChanges".getBytes("UTF-8"), 1);

        return Arrays.asList(put, increment);
      } else {
        return Collections.emptyList();
      }
    } catch (UnsupportedEncodingException e) {
      e.printStackTrace();
      return Collections.emptyList();
    }
  };
Full source at GitHub

Remember that if you returns a list of mutations they will be applied in the same order. The list of Mutations are not applied in an transaction, each mutation is independent.

Table will be created on demand.

scala
val tableSettings =
  HTableSettings(HBaseConfiguration.create(), TableName.valueOf("person"), immutable.Seq("info"), hBaseConverter)
Full source at GitHub
java
HTableSettings<Person> tableSettings =
  HTableSettings.create(
    HBaseConfiguration.create(),
    TableName.valueOf("person1"),
    Collections.singletonList("info"),
    hBaseConverter);
Full source at GitHub

Flow usage

scala
val flow = HTableStage.flow[Person](tableSettings)

val f = Source(11 to 20).map(i => Person(i, s"zozo_$i")).via(flow).runWith(Sink.fold(0)((a, d) => a + d.id))
Full source at GitHub
java
Flow<Person, Person, NotUsed> flow = HTableStage.flow(tableSettings);
Pair<NotUsed, CompletionStage<List<Person>>> run =
  Source.from(Arrays.asList(200, 201, 202, 203, 204))
    .map((i) -> new Person(i, String.format("name_%d", i)))
    .via(flow)
    .toMat(Sink.seq(), Keep.both())
    .run(materializer);
Full source at GitHub

Sink usage

scala
val sink = HTableStage.sink[Person](tableSettings)

val f = Source(1 to 10).map(i => Person(i, s"zozo_$i")).runWith(sink)
Full source at GitHub
java
final Sink<Person, scala.concurrent.Future<Done>> sink = HTableStage.sink(tableSettings);
Future<Done> o =
  Source.from(Arrays.asList(100, 101, 102, 103, 104))
    .map((i) -> new Person(i, String.format("name %d", i)))
    .runWith(sink, materializer);
Full source at GitHub

HBase basic command:

$HBASE_HOME/bin/start-hbase.sh

$HBASE_HOME/bin/ shell

From the hbase shell:

list //list table
scan "person" // select * from person
disable "person" //Disable table "person", before drop
drop "person" 
The source code for this page can be found here.