Apache Geode connector

Apache Geode is a distributed datagrid (ex Gemfire).

This connector provides flow and a sink to put element in and source to retrieve element from geode.

Basically it can store data as key, value. Key and value must be serialized, more on this later.

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-geode" % "0.14"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-geode_2.12</artifactId>
  <version>0.14</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-geode_2.12', version: '0.14'
}

Usage

Connection

First of all you need to connect to the geode cache. In a client application, connection is handle by a ClientCache. A single ClientCache per application is enough. ClientCache also holds a single PDXSerializer.

scala
val reactiveGeode = new ReactiveGeode(geodeSettings)
java
GeodeSettings settings = GeodeSettings.create(geodeDockerHostname, 10334)
        .withConfiguration(func(c->c.setPoolIdleTimeout(10)));
return new ReactiveGeode(settings);

Apache Geode supports continuous queries. Continuous query relies on server event, thus reactive geode needs to listen to those event. This behaviour, as it consumes more resources is isolated in a scala trait and/or an specialized java class.

scala
val reactiveGeode = new ReactiveGeode(geodeSettings) with PoolSubscription
java
return new ReactiveGeodeWithPoolSubscription(settings);

Region

Define a region setting to describe how to access region and the key extraction function.

scala
val personsRegionSettings = RegionSettings("persons", (p: Person) => p.id)
val animalsRegionSettings = RegionSettings("animals", (a: Animal) => a.id)
val complexesRegionSettings = RegionSettings("complexes", (a: Complex) => a.id)
java
protected RegionSettings<Integer, Person> personRegionSettings = new RegionSettings<>("persons", func(Person::getId));
protected RegionSettings<Integer, Animal> animalRegionSettings = new RegionSettings<>("animals", func(Animal::getId));

Serialization

Object must be serialized to flow in a geode region.

  • opaque format (eq json/xml)
  • java serialisation
  • pdx geode format

PDX format is the only one supported.

PDXEncoder support many options, see gemfire_pdx_serialization.html

PdxSerializer must be provided to geode when reading or writing to a region.

scala
object PersonPdxSerializer extends AkkaPdxSerializer[Person] {
  override def clazz: Class[Person] = classOf[Person]

  override def toData(o: scala.Any, out: PdxWriter): Boolean =
    if (o.isInstanceOf[Person]) {
      val p = o.asInstanceOf[Person]
      out.writeInt("id", p.id)
      out.writeString("name", p.name)
      out.writeDate("birthDate", p.birthDate)
      true
    } else
      false

  override def fromData(clazz: Class[_], in: PdxReader): AnyRef = {
    val id: Int = in.readInt("id")
    val name: String = in.readString("name")
    val birthDate: Date = in.readDate("birthDate")
    Person(id, name, birthDate)
  }
}
java
public class PersonPdxSerializer implements AkkaPdxSerializer<Person> {

    @Override
    public Class<Person> clazz() {
        return Person.class;
    }

    @Override
    public boolean toData(Object o, PdxWriter out) {
        if(o instanceof Person){
            Person p = (Person)o;
            out.writeInt("id", p.getId());
            out.writeString("name", p.getName());
            out.writeDate("birthDate", p.getBirthDate());
            return true;
        }
        return false;
    }

    @Override
    public Object fromData(Class<?> clazz, PdxReader in) {
        int id = in.readInt("id");
        String name = in.readString("name");
        Date birthDate = in.readDate("birthDate");
        return new Person(id, name, birthDate);
    }
}

This project provides a generic solution for scala user based on shapeless, then case classe serializer if not provided will be generated compile time. Java user will need to write by hand their custom serializer.

Runtime reflection is also an option see auto_serialization.html.

Flow usage

This sample stores (case) classes in Geode.

scala
val flow: Flow[Person, Person, NotUsed] = reactiveGeode.flow(personsRegionSettings)

val fut = source.via(flow).runWith(Sink.ignore)
java
Flow<Person, Person, NotUsed> flow = reactiveGeode.flow(personRegionSettings, new PersonPdxSerializer());

CompletionStage<List<Person>> run = source
        .via(flow)
        .toMat(Sink.seq(), Keep.right())
        .run(materializer);

Sink usage

scala
val sink = reactiveGeode.sink(animalsRegionSettings)

val fut = source.runWith(sink)
val sink = reactiveGeode.sink(complexesRegionSettings)

val fut = source.runWith(sink)
java
Sink<Animal, CompletionStage<Done>> sink = reactiveGeode.sink(animalRegionSettings, new AnimalPdxSerializer());

RunnableGraph<CompletionStage<Done>> runnableGraph = source
        .toMat(sink, Keep.right());

Source usage

Simple query

Apache Geode support simple queries.

scala
val source =
  reactiveGeode
    .query[Person](s"select * from /persons order by id")
    .runWith(Sink.foreach(e => log.debug(s"$e")))
java
CompletionStage<Done> personsDone = reactiveGeode.query("select * from /persons", new PersonPdxSerializer())
        .runForeach(p -> {
            LOGGER.debug(p.toString());
        }, materializer);

Continuous query

scala
val source =
  reactiveGeode
    .continuousQuery[Person]('test, s"select * from /persons")
    .runWith(Sink.fold(0) { (c, p) =>
      log.debug(s"$p $c")
      if (c == 19) {
        reactiveGeode.closeContinuousQuery('test).foreach { _ =>
          log.debug("test cQuery is closed")
        }

      }
      c + 1
    })
java
CompletionStage<Done> fut = reactiveGeode.continuousQuery("test", "select * from /persons", new PersonPdxSerializer())
        .runForeach(p -> {
            LOGGER.debug(p.toString());
            if (p.getId() == 120) {
                reactiveGeode.closeContinuousQuery("test");
            }
        }, materializer);

Geode basic command:

Assuming Apache geode is installed:

gfsh

From the geode shell:

start locator --name=locator
configure pdx --read-serialized=true
start server --name=server

create region --name=animals --type=PARTITION_REDUNDANT --redundant-copies=2
create region --name=persons --type=PARTITION_REDUNDANT --redundant-copies=2

Run the test

Integration test are run against localhost geode, but IT_GEODE_HOSTNAME environment variable can change this:

export IT_GEODE_HOSTNAME=geode-host-locator

sbt

From sbt shell

project geode
test
The source code for this page can be found here.