Apache Geode

Apache Geode is a distributed datagrid (ex Gemfire).

This connector provides flow and a sink to put element in and source to retrieve element from geode.

Basically it can store data as key, value. Key and value must be serialized, more on this later.

Reported issues

Tagged issues at Github

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-geode" % "0.20"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-geode_2.12</artifactId>
  <version>0.20</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-geode_2.12', version: '0.20'
}

Usage

Connection

First of all you need to connect to the geode cache. In a client application, connection is handle by a ClientCache. A single ClientCache per application is enough. ClientCache also holds a single PDXSerializer.

scala
val reactiveGeode = new ReactiveGeode(geodeSettings)
Full source at GitHub
java
GeodeSettings settings =
    GeodeSettings.create(geodeDockerHostname, 10334)
        .withConfiguration(func(c -> c.setPoolIdleTimeout(10)));
return new ReactiveGeode(settings);
Full source at GitHub

Apache Geode supports continuous queries. Continuous query relies on server event, thus reactive geode needs to listen to those event. This behaviour, as it consumes more resources is isolated in a scala trait and/or an specialized java class.

scala
val reactiveGeode = new ReactiveGeode(geodeSettings) with PoolSubscription
Full source at GitHub
java
return new ReactiveGeodeWithPoolSubscription(settings);
Full source at GitHub

Region

Define a region setting to describe how to access region and the key extraction function.

scala
val personsRegionSettings = RegionSettings("persons", (p: Person) => p.id)
val animalsRegionSettings = RegionSettings("animals", (a: Animal) => a.id)
val complexesRegionSettings = RegionSettings("complexes", (a: Complex) => a.id)
Full source at GitHub
java
protected RegionSettings<Integer, Person> personRegionSettings =
    new RegionSettings<>("persons", func(Person::getId));
protected RegionSettings<Integer, Animal> animalRegionSettings =
    new RegionSettings<>("animals", func(Animal::getId));
Full source at GitHub

Serialization

Object must be serialized to flow in a geode region.

  • opaque format (eq json/xml)
  • java serialisation
  • pdx geode format

PDX format is the only one supported.

PDXEncoder support many options, see gemfire_pdx_serialization.html

PdxSerializer must be provided to geode when reading or writing to a region.

scala
object PersonPdxSerializer extends AkkaPdxSerializer[Person] {
  override def clazz: Class[Person] = classOf[Person]

  override def toData(o: scala.Any, out: PdxWriter): Boolean =
    if (o.isInstanceOf[Person]) {
      val p = o.asInstanceOf[Person]
      out.writeInt("id", p.id)
      out.writeString("name", p.name)
      out.writeDate("birthDate", p.birthDate)
      true
    } else
      false

  override def fromData(clazz: Class[_], in: PdxReader): AnyRef = {
    val id: Int = in.readInt("id")
    val name: String = in.readString("name")
    val birthDate: Date = in.readDate("birthDate")
    Person(id, name, birthDate)
  }
}
Full source at GitHub
java
public class PersonPdxSerializer implements AkkaPdxSerializer<Person> {

  @Override
  public Class<Person> clazz() {
    return Person.class;
  }

  @Override
  public boolean toData(Object o, PdxWriter out) {
    if (o instanceof Person) {
      Person p = (Person) o;
      out.writeInt("id", p.getId());
      out.writeString("name", p.getName());
      out.writeDate("birthDate", p.getBirthDate());
      return true;
    }
    return false;
  }

  @Override
  public Object fromData(Class<?> clazz, PdxReader in) {
    int id = in.readInt("id");
    String name = in.readString("name");
    Date birthDate = in.readDate("birthDate");
    return new Person(id, name, birthDate);
  }
}
Full source at GitHub

This project provides a generic solution for scala user based on shapeless, then case classe serializer if not provided will be generated compile time. Java user will need to write by hand their custom serializer.

Runtime reflection is also an option see auto_serialization.html.

Flow usage

This sample stores (case) classes in Geode.

scala
val flow: Flow[Person, Person, NotUsed] = reactiveGeode.flow(personsRegionSettings)

val fut = source.via(flow).runWith(Sink.ignore)
Full source at GitHub
java
Flow<Person, Person, NotUsed> flow =
    reactiveGeode.flow(personRegionSettings, new PersonPdxSerializer());

CompletionStage<List<Person>> run =
    source.via(flow).toMat(Sink.seq(), Keep.right()).run(materializer);
Full source at GitHub

Sink usage

scala
val sink = reactiveGeode.sink(animalsRegionSettings)

val fut = source.runWith(sink)
val sink = reactiveGeode.sink(complexesRegionSettings)

val fut = source.runWith(sink)
Full source at GitHub
java
Sink<Animal, CompletionStage<Done>> sink =
    reactiveGeode.sink(animalRegionSettings, new AnimalPdxSerializer());

RunnableGraph<CompletionStage<Done>> runnableGraph = source.toMat(sink, Keep.right());
Full source at GitHub

Source usage

Simple query

Apache Geode support simple queries.

scala
val source =
  reactiveGeode
    .query[Person](s"select * from /persons order by id")
    .runWith(Sink.foreach(e => log.debug(s"$e")))
Full source at GitHub
java
CompletionStage<Done> personsDone =
    reactiveGeode
        .query("select * from /persons", new PersonPdxSerializer())
        .runForeach(
            p -> {
              LOGGER.debug(p.toString());
            },
            materializer);
Full source at GitHub

Continuous query

scala
val source =
  reactiveGeode
    .continuousQuery[Person]('test, s"select * from /persons")
    .runWith(Sink.fold(0) { (c, p) =>
      log.debug(s"$p $c")
      if (c == 19) {
        reactiveGeode.closeContinuousQuery('test).foreach { _ =>
          log.debug("test cQuery is closed")
        }

      }
      c + 1
    })
Full source at GitHub
java
CompletionStage<Done> fut =
    reactiveGeode
        .continuousQuery("test", "select * from /persons", new PersonPdxSerializer())
        .runForeach(
            p -> {
              LOGGER.debug(p.toString());
              if (p.getId() == 120) {
                reactiveGeode.closeContinuousQuery("test");
              }
            },
            materializer);
Full source at GitHub

Geode basic command:

Assuming Apache geode is installed:

gfsh

From the geode shell:

start locator --name=locator
configure pdx --read-serialized=true
start server --name=server

create region --name=animals --type=PARTITION_REDUNDANT --redundant-copies=2
create region --name=persons --type=PARTITION_REDUNDANT --redundant-copies=2

Run the test

Integration test are run against localhost geode, but IT_GEODE_HOSTNAME environment variable can change this:

Test code requires Geode running in the background. You can start it quickly using docker:

docker-compose up geode

Scala
sbt
> geode/testOnly *Spec
Java
sbt
> geode/testOnly *Test
The source code for this page can be found here.