HBase connector

A flow and a composite sink to write element in HBase.

HBase is a column family NoSQL Database backed by HDFS.

Usage

Build a converter and a tableSetting.

Converter will map the domain object to HBase column.

scala
val hBaseConverter: Person => Put = { person =>
  val put = new Put(s"id_${person.id}")
  put.addColumn("info", "name", person.name)
  put
}
java
Function<Person, Put> hBaseConverter = person -> {
    Put put = null;
    try {
        put = new Put(String.format("id_%d", person.id).getBytes("UTF-8"));
        put.addColumn("info".getBytes("UTF-8"), "name".getBytes("UTF-8"), person.name.getBytes("UTF-8"));
    } catch (UnsupportedEncodingException e) {
        e.printStackTrace();
    }
    return put;
};

Table will be created on demand.

scala
val tableSettings =
  HTableSettings(HBaseConfiguration.create(), TableName.valueOf("person"), immutable.Seq("info"), hBaseConverter)
java
HTableSettings<Person> tableSettings = HTableSettings.create(HBaseConfiguration.create(), TableName.valueOf("person1"), Arrays.asList("info"), hBaseConverter);

Flow usage

scala
val flow = HTableStage.flow[Person](tableSettings)

val f = Source(11 to 20).map(i => Person(i, s"zozo_$i")).via(flow).runWith(Sink.fold(0)((a, d) => a + d.id))
java
Flow<Person, Person, NotUsed> flow = HTableStage.flow(tableSettings);
Pair<NotUsed, CompletionStage<List<Person>>> run = Source.from(Arrays.asList(200, 201, 202, 203, 204)).map((i)
        -> new Person(i, String.format("name_%d", i))).via(flow).toMat(Sink.seq(), Keep.both()).run(materializer);

Sink usage

scala
val sink = HTableStage.sink[Person](tableSettings)

val f = Source(1 to 10).map(i => Person(i, s"zozo_$i")).runWith(sink)
java
final Sink<Person, scala.concurrent.Future<Done>> sink = HTableStage.sink(tableSettings);
Future<Done> o = Source.from(Arrays.asList(100, 101, 102, 103, 104)).map((i) -> new Person(i, String.format("name %d", i))).runWith(sink, materializer);

HBase basic command:

$HBASE_HOME/bin/start-hbase.sh

$HBASE_HOME/bin/ shell

From the hbase shell:

list //list table
scan "person" // select * from person
disable "person" //Disable table "person", before drop
drop "person" 
The source code for this page can be found here.