OrientDB

OrientDB

OrientDB is a multi-model database, supporting graph, document, key/value, and object models, but the relationships are managed as in graph databases with direct connections between records. It supports schema-less, schema-full and schema-mixed modes. It has a strong security profiling system based on users and roles and supports querying with Gremlin along with SQL extended for graph traversal.

For more information about OrientDB please visit the official documentation, more details are available in the OrientDB manual.

The Alpakka OrientDB connector provides Akka Stream sources and sinks for OrientDB.

Project Info: Alpakka OrientDB
Artifact
com.lightbend.akka
akka-stream-alpakka-orientdb
7.0.2
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Scala versions2.13.12
JPMS module nameakka.stream.alpakka.orientdb
License
Readiness level
Since 0.17, 2018-02-19
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.

sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
Maven
<project>
  ...
  <repositories>
    <repository>
      <id>akka-repository</id>
      <name>Akka library repository</name>
      <url>https://repo.akka.io/maven</url>
    </repository>
  </repositories>
</project>
Gradle
repositories {
    mavenCentral()
    maven {
        url "https://repo.akka.io/maven"
    }
}

Additionally, add the dependencies as below.

sbt
val AkkaVersion = "2.9.0"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-orientdb" % "7.0.2",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion
)
Maven
<properties>
  <akka.version>2.9.0</akka.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-orientdb_${scala.binary.version}</artifactId>
    <version>7.0.2</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.9.0",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-orientdb_${versions.ScalaBinary}:7.0.2"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.orientechnologiesorientdb-graphdb3.1.9
com.orientechnologiesorientdb-object3.1.9
com.typesafe.akkaakka-stream_2.132.9.0
org.scala-langscala-library2.13.12
Dependency tree
com.orientechnologies    orientdb-graphdb    3.1.9
    com.fasterxml.jackson.core    jackson-databind    2.12.1    The Apache Software License, Version 2.0
        com.fasterxml.jackson.core    jackson-annotations    2.12.1    The Apache Software License, Version 2.0
        com.fasterxml.jackson.core    jackson-core    2.12.1    The Apache Software License, Version 2.0
    com.orientechnologies    orientdb-core    3.1.9
        com.github.jnr    jnr-posix    3.0.50    Eclipse Public License - v 2.0
            com.github.jnr    jnr-constants    0.9.12    The Apache Software License, Version 2.0
            com.github.jnr    jnr-ffi    2.1.10    The Apache Software License, Version 2.0
                com.github.jnr    jffi    1.2.19    The Apache Software License, Version 2.0
                com.github.jnr    jnr-a64asm    1.0.0    The Apache Software License, Version 2.0
                com.github.jnr    jnr-x86asm    1.0.2    MIT License
                org.ow2.asm    asm-analysis    7.1    BSD
                    org.ow2.asm    asm-tree    7.1    BSD
                        org.ow2.asm    asm    7.1    BSD
                org.ow2.asm    asm-commons    7.1    BSD
                    org.ow2.asm    asm-analysis    7.1    BSD
                        org.ow2.asm    asm-tree    7.1    BSD
                            org.ow2.asm    asm    7.1    BSD
                    org.ow2.asm    asm-tree    7.1    BSD
                        org.ow2.asm    asm    7.1    BSD
                    org.ow2.asm    asm    7.1    BSD
                org.ow2.asm    asm-tree    7.1    BSD
                    org.ow2.asm    asm    7.1    BSD
                org.ow2.asm    asm-util    7.1    BSD
                    org.ow2.asm    asm-analysis    7.1    BSD
                        org.ow2.asm    asm-tree    7.1    BSD
                            org.ow2.asm    asm    7.1    BSD
                    org.ow2.asm    asm-tree    7.1    BSD
                        org.ow2.asm    asm    7.1    BSD
                    org.ow2.asm    asm    7.1    BSD
                org.ow2.asm    asm    7.1    BSD
        com.googlecode.concurrentlinkedhashmap    concurrentlinkedhashmap-lru    1.4.2    The Apache Software License, Version 2.0
        com.ibm.icu    icu4j    65.1    Unicode/ICU License
        commons-lang    commons-lang    2.6
        javax.activation    javax.activation-api    1.2.0
        org.lz4    lz4-java    1.4.0    The Apache Software License, Version 2.0
    com.orientechnologies    orientdb-server    3.1.9
        com.orientechnologies    orientdb-client    3.1.9
            com.orientechnologies    orientdb-core    3.1.9
                com.github.jnr    jnr-posix    3.0.50    Eclipse Public License - v 2.0
                    com.github.jnr    jnr-constants    0.9.12    The Apache Software License, Version 2.0
                    com.github.jnr    jnr-ffi    2.1.10    The Apache Software License, Version 2.0
                        com.github.jnr    jffi    1.2.19    The Apache Software License, Version 2.0
                        com.github.jnr    jnr-a64asm    1.0.0    The Apache Software License, Version 2.0
                        com.github.jnr    jnr-x86asm    1.0.2    MIT License
                        org.ow2.asm    asm-analysis    7.1    BSD
                            org.ow2.asm    asm-tree    7.1    BSD
                                org.ow2.asm    asm    7.1    BSD
                        org.ow2.asm    asm-commons    7.1    BSD
                            org.ow2.asm    asm-analysis    7.1    BSD
                                org.ow2.asm    asm-tree    7.1    BSD
                                    org.ow2.asm    asm    7.1    BSD
                            org.ow2.asm    asm-tree    7.1    BSD
                                org.ow2.asm    asm    7.1    BSD
                            org.ow2.asm    asm    7.1    BSD
                        org.ow2.asm    asm-tree    7.1    BSD
                            org.ow2.asm    asm    7.1    BSD
                        org.ow2.asm    asm-util    7.1    BSD
                            org.ow2.asm    asm-analysis    7.1    BSD
                                org.ow2.asm    asm-tree    7.1    BSD
                                    org.ow2.asm    asm    7.1    BSD
                            org.ow2.asm    asm-tree    7.1    BSD
                                org.ow2.asm    asm    7.1    BSD
                            org.ow2.asm    asm    7.1    BSD
                        org.ow2.asm    asm    7.1    BSD
                com.googlecode.concurrentlinkedhashmap    concurrentlinkedhashmap-lru    1.4.2    The Apache Software License, Version 2.0
                com.ibm.icu    icu4j    65.1    Unicode/ICU License
                commons-lang    commons-lang    2.6
                javax.activation    javax.activation-api    1.2.0
                org.lz4    lz4-java    1.4.0    The Apache Software License, Version 2.0
        com.orientechnologies    orientdb-tools    3.1.9
            com.orientechnologies    orientdb-client    3.1.9
                com.orientechnologies    orientdb-core    3.1.9
                    com.github.jnr    jnr-posix    3.0.50    Eclipse Public License - v 2.0
                        com.github.jnr    jnr-constants    0.9.12    The Apache Software License, Version 2.0
                        com.github.jnr    jnr-ffi    2.1.10    The Apache Software License, Version 2.0
                            com.github.jnr    jffi    1.2.19    The Apache Software License, Version 2.0
                            com.github.jnr    jnr-a64asm    1.0.0    The Apache Software License, Version 2.0
                            com.github.jnr    jnr-x86asm    1.0.2    MIT License
                            org.ow2.asm    asm-analysis    7.1    BSD
                                org.ow2.asm    asm-tree    7.1    BSD
                                    org.ow2.asm    asm    7.1    BSD
                            org.ow2.asm    asm-commons    7.1    BSD
                                org.ow2.asm    asm-analysis    7.1    BSD
                                    org.ow2.asm    asm-tree    7.1    BSD
                                        org.ow2.asm    asm    7.1    BSD
                                org.ow2.asm    asm-tree    7.1    BSD
                                    org.ow2.asm    asm    7.1    BSD
                                org.ow2.asm    asm    7.1    BSD
                            org.ow2.asm    asm-tree    7.1    BSD
                                org.ow2.asm    asm    7.1    BSD
                            org.ow2.asm    asm-util    7.1    BSD
                                org.ow2.asm    asm-analysis    7.1    BSD
                                    org.ow2.asm    asm-tree    7.1    BSD
                                        org.ow2.asm    asm    7.1    BSD
                                org.ow2.asm    asm-tree    7.1    BSD
                                    org.ow2.asm    asm    7.1    BSD
                                org.ow2.asm    asm    7.1    BSD
                            org.ow2.asm    asm    7.1    BSD
                    com.googlecode.concurrentlinkedhashmap    concurrentlinkedhashmap-lru    1.4.2    The Apache Software License, Version 2.0
                    com.ibm.icu    icu4j    65.1    Unicode/ICU License
                    commons-lang    commons-lang    2.6
                    javax.activation    javax.activation-api    1.2.0
                    org.lz4    lz4-java    1.4.0    The Apache Software License, Version 2.0
            com.orientechnologies    orientdb-core    3.1.9
                com.github.jnr    jnr-posix    3.0.50    Eclipse Public License - v 2.0
                    com.github.jnr    jnr-constants    0.9.12    The Apache Software License, Version 2.0
                    com.github.jnr    jnr-ffi    2.1.10    The Apache Software License, Version 2.0
                        com.github.jnr    jffi    1.2.19    The Apache Software License, Version 2.0
                        com.github.jnr    jnr-a64asm    1.0.0    The Apache Software License, Version 2.0
                        com.github.jnr    jnr-x86asm    1.0.2    MIT License
                        org.ow2.asm    asm-analysis    7.1    BSD
                            org.ow2.asm    asm-tree    7.1    BSD
                                org.ow2.asm    asm    7.1    BSD
                        org.ow2.asm    asm-commons    7.1    BSD
                            org.ow2.asm    asm-analysis    7.1    BSD
                                org.ow2.asm    asm-tree    7.1    BSD
                                    org.ow2.asm    asm    7.1    BSD
                            org.ow2.asm    asm-tree    7.1    BSD
                                org.ow2.asm    asm    7.1    BSD
                            org.ow2.asm    asm    7.1    BSD
                        org.ow2.asm    asm-tree    7.1    BSD
                            org.ow2.asm    asm    7.1    BSD
                        org.ow2.asm    asm-util    7.1    BSD
                            org.ow2.asm    asm-analysis    7.1    BSD
                                org.ow2.asm    asm-tree    7.1    BSD
                                    org.ow2.asm    asm    7.1    BSD
                            org.ow2.asm    asm-tree    7.1    BSD
                                org.ow2.asm    asm    7.1    BSD
                            org.ow2.asm    asm    7.1    BSD
                        org.ow2.asm    asm    7.1    BSD
                com.googlecode.concurrentlinkedhashmap    concurrentlinkedhashmap-lru    1.4.2    The Apache Software License, Version 2.0
                com.ibm.icu    icu4j    65.1    Unicode/ICU License
                commons-lang    commons-lang    2.6
                javax.activation    javax.activation-api    1.2.0
                org.lz4    lz4-java    1.4.0    The Apache Software License, Version 2.0
            javax.xml.bind    jaxb-api    2.3.0
        com.sun.xml.bind    jaxb-core    2.3.0.1
        com.sun.xml.bind    jaxb-impl    2.3.0.1
        javax.xml.bind    jaxb-api    2.3.0
    com.orientechnologies    orientdb-tools    3.1.9
        com.orientechnologies    orientdb-client    3.1.9
            com.orientechnologies    orientdb-core    3.1.9
                com.github.jnr    jnr-posix    3.0.50    Eclipse Public License - v 2.0
                    com.github.jnr    jnr-constants    0.9.12    The Apache Software License, Version 2.0
                    com.github.jnr    jnr-ffi    2.1.10    The Apache Software License, Version 2.0
                        com.github.jnr    jffi    1.2.19    The Apache Software License, Version 2.0
                        com.github.jnr    jnr-a64asm    1.0.0    The Apache Software License, Version 2.0
                        com.github.jnr    jnr-x86asm    1.0.2    MIT License
                        org.ow2.asm    asm-analysis    7.1    BSD
                            org.ow2.asm    asm-tree    7.1    BSD
                                org.ow2.asm    asm    7.1    BSD
                        org.ow2.asm    asm-commons    7.1    BSD
                            org.ow2.asm    asm-analysis    7.1    BSD
                                org.ow2.asm    asm-tree    7.1    BSD
                                    org.ow2.asm    asm    7.1    BSD
                            org.ow2.asm    asm-tree    7.1    BSD
                                org.ow2.asm    asm    7.1    BSD
                            org.ow2.asm    asm    7.1    BSD
                        org.ow2.asm    asm-tree    7.1    BSD
                            org.ow2.asm    asm    7.1    BSD
                        org.ow2.asm    asm-util    7.1    BSD
                            org.ow2.asm    asm-analysis    7.1    BSD
                                org.ow2.asm    asm-tree    7.1    BSD
                                    org.ow2.asm    asm    7.1    BSD
                            org.ow2.asm    asm-tree    7.1    BSD
                                org.ow2.asm    asm    7.1    BSD
                            org.ow2.asm    asm    7.1    BSD
                        org.ow2.asm    asm    7.1    BSD
                com.googlecode.concurrentlinkedhashmap    concurrentlinkedhashmap-lru    1.4.2    The Apache Software License, Version 2.0
                com.ibm.icu    icu4j    65.1    Unicode/ICU License
                commons-lang    commons-lang    2.6
                javax.activation    javax.activation-api    1.2.0
                org.lz4    lz4-java    1.4.0    The Apache Software License, Version 2.0
        com.orientechnologies    orientdb-core    3.1.9
            com.github.jnr    jnr-posix    3.0.50    Eclipse Public License - v 2.0
                com.github.jnr    jnr-constants    0.9.12    The Apache Software License, Version 2.0
                com.github.jnr    jnr-ffi    2.1.10    The Apache Software License, Version 2.0
                    com.github.jnr    jffi    1.2.19    The Apache Software License, Version 2.0
                    com.github.jnr    jnr-a64asm    1.0.0    The Apache Software License, Version 2.0
                    com.github.jnr    jnr-x86asm    1.0.2    MIT License
                    org.ow2.asm    asm-analysis    7.1    BSD
                        org.ow2.asm    asm-tree    7.1    BSD
                            org.ow2.asm    asm    7.1    BSD
                    org.ow2.asm    asm-commons    7.1    BSD
                        org.ow2.asm    asm-analysis    7.1    BSD
                            org.ow2.asm    asm-tree    7.1    BSD
                                org.ow2.asm    asm    7.1    BSD
                        org.ow2.asm    asm-tree    7.1    BSD
                            org.ow2.asm    asm    7.1    BSD
                        org.ow2.asm    asm    7.1    BSD
                    org.ow2.asm    asm-tree    7.1    BSD
                        org.ow2.asm    asm    7.1    BSD
                    org.ow2.asm    asm-util    7.1    BSD
                        org.ow2.asm    asm-analysis    7.1    BSD
                            org.ow2.asm    asm-tree    7.1    BSD
                                org.ow2.asm    asm    7.1    BSD
                        org.ow2.asm    asm-tree    7.1    BSD
                            org.ow2.asm    asm    7.1    BSD
                        org.ow2.asm    asm    7.1    BSD
                    org.ow2.asm    asm    7.1    BSD
            com.googlecode.concurrentlinkedhashmap    concurrentlinkedhashmap-lru    1.4.2    The Apache Software License, Version 2.0
            com.ibm.icu    icu4j    65.1    Unicode/ICU License
            commons-lang    commons-lang    2.6
            javax.activation    javax.activation-api    1.2.0
            org.lz4    lz4-java    1.4.0    The Apache Software License, Version 2.0
        javax.xml.bind    jaxb-api    2.3.0
    commons-collections    commons-collections    3.2.2
    org.codehaus.groovy    groovy-jsr223    2.5.8    The Apache Software License, Version 2.0
        org.codehaus.groovy    groovy    2.5.8    The Apache Software License, Version 2.0
    org.codehaus.groovy    groovy    2.5.8    The Apache Software License, Version 2.0
com.orientechnologies    orientdb-object    3.1.9
    com.orientechnologies    orientdb-core    3.1.9
        com.github.jnr    jnr-posix    3.0.50    Eclipse Public License - v 2.0
            com.github.jnr    jnr-constants    0.9.12    The Apache Software License, Version 2.0
            com.github.jnr    jnr-ffi    2.1.10    The Apache Software License, Version 2.0
                com.github.jnr    jffi    1.2.19    The Apache Software License, Version 2.0
                com.github.jnr    jnr-a64asm    1.0.0    The Apache Software License, Version 2.0
                com.github.jnr    jnr-x86asm    1.0.2    MIT License
                org.ow2.asm    asm-analysis    7.1    BSD
                    org.ow2.asm    asm-tree    7.1    BSD
                        org.ow2.asm    asm    7.1    BSD
                org.ow2.asm    asm-commons    7.1    BSD
                    org.ow2.asm    asm-analysis    7.1    BSD
                        org.ow2.asm    asm-tree    7.1    BSD
                            org.ow2.asm    asm    7.1    BSD
                    org.ow2.asm    asm-tree    7.1    BSD
                        org.ow2.asm    asm    7.1    BSD
                    org.ow2.asm    asm    7.1    BSD
                org.ow2.asm    asm-tree    7.1    BSD
                    org.ow2.asm    asm    7.1    BSD
                org.ow2.asm    asm-util    7.1    BSD
                    org.ow2.asm    asm-analysis    7.1    BSD
                        org.ow2.asm    asm-tree    7.1    BSD
                            org.ow2.asm    asm    7.1    BSD
                    org.ow2.asm    asm-tree    7.1    BSD
                        org.ow2.asm    asm    7.1    BSD
                    org.ow2.asm    asm    7.1    BSD
                org.ow2.asm    asm    7.1    BSD
        com.googlecode.concurrentlinkedhashmap    concurrentlinkedhashmap-lru    1.4.2    The Apache Software License, Version 2.0
        com.ibm.icu    icu4j    65.1    Unicode/ICU License
        commons-lang    commons-lang    2.6
        javax.activation    javax.activation-api    1.2.0
        org.lz4    lz4-java    1.4.0    The Apache Software License, Version 2.0
    org.hibernate.javax.persistence    hibernate-jpa-2.0-api    1.0.1.Final    
    org.javassist    javassist    3.23.1-GA    MPL 1.1
com.typesafe.akka    akka-stream_2.13    2.9.0    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.9.0    BUSL-1.1
        com.typesafe    config    1.4.3    Apache-2.0
        org.scala-lang.modules    scala-java8-compat_2.13    1.0.2    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.9.0    BUSL-1.1
    org.reactivestreams    reactive-streams    1.0.4    MIT-0
    org.scala-lang    scala-library    2.13.12    Apache-2.0
org.scala-lang    scala-library    2.13.12    Apache-2.0

Database connection

Sources, Flows and Sinks provided by this connector need a OPartitionedDatabasePool to access to OrientDB. It is your responsibility to close the database connection eg. at actor system termination. This API has become deprecated in OrientDB, please suggest a Pull Request to use the latest APIs instead.

Scala
sourceimport com.orientechnologies.orient.core.db.OPartitionedDatabasePool

val url = "remote:127.0.0.1:2424/"
val dbName = "GratefulDeadConcertsScala"
val dbUrl = s"$url$dbName"
val username = "root"
val password = "root"

  val oDatabase: OPartitionedDatabasePool =
    new OPartitionedDatabasePool(dbUrl, username, password, Runtime.getRuntime.availableProcessors(), 10)

  system.registerOnTermination(() -> oDatabase.close())
Java
sourceimport com.orientechnologies.orient.core.db.OPartitionedDatabasePool;

private static String url = "remote:127.0.0.1:2424/";
private static String dbName = "GratefulDeadConcertsJava";
private static String dbUrl = url + dbName;
private static String username = "root";
private static String password = "root";

  oDatabase =
      new OPartitionedDatabasePool(
          dbUrl, username, password, Runtime.getRuntime().availableProcessors(), 10);

  system.registerOnTermination(() -> oDatabase.close());

Reading ODocument from OrientDB

Now we can stream messages which contain OrientDB’s ODocument (in Scala or Java) from or to OrientDB by providing the ODatabaseDocumentTx to the OrientDbSourceOrientDbSource.

Scala
sourceval result: Future[immutable.Seq[String]] = OrientDbSource(
  sink4,
  OrientDbSourceSettings(oDatabase)
).map { message: OrientDbReadResult[ODocument] =>
    message.oDocument.field[String]("book_title")
  }
  .runWith(Sink.seq)
Java
sourceCompletionStage<List<String>> result =
    OrientDbSource.create(sinkClass1, OrientDbSourceSettings.create(oDatabase))
        .map(m -> m.oDocument().<String>field("book_title"))
        .runWith(Sink.seq(), system);

Typed messages

Also, it’s possible to stream messages which contains any classes.

Java
sourcepublic static class source1 {

  private String book_title;

  public void setBook_title(String book_title) {
    this.book_title = book_title;
  }

  public String getBook_title() {
    return book_title;
  }
}

public static class sink2 {

  private String book_title;

  public void setBook_title(String book_title) {
    this.book_title = book_title;
  }

  public String getBook_title() {
    return book_title;
  }
}

Use OrientDbSource.typed and OrientDbSink.typed to create source and sink instead.

Scala
sourceval streamCompletion: Future[Done] = OrientDbSource
  .typed(sourceClass, OrientDbSourceSettings(oDatabase), classOf[OrientDbTest.source1])
  .map { m: OrientDbReadResult[OrientDbTest.source1] =>
    val db: ODatabaseDocumentTx = oDatabase.acquire
    db.setDatabaseOwner(new OObjectDatabaseTx(db))
    ODatabaseRecordThreadLocal.instance.set(db)
    val sink: OrientDbTest.sink2 = new OrientDbTest.sink2
    sink.setBook_title(m.oDocument.getBook_title)
    OrientDbWriteMessage(sink)
  }
  .groupedWithin(10, 10.millis)
  .runWith(OrientDbSink.typed(sinkClass2, OrientDbWriteSettings.create(oDatabase), classOf[OrientDbTest.sink2]))
Java
sourceCompletionStage<Done> f1 =
    OrientDbSource.typed(
            sourceClass, OrientDbSourceSettings.create(oDatabase), source1.class, null)
        .map(
            readResult -> {
              ODatabaseDocumentTx db = oDatabase.acquire();
              db.setDatabaseOwner(new OObjectDatabaseTx(db));
              ODatabaseRecordThreadLocal.instance().set(db);
              sink2 sink = new sink2();
              sink.setBook_title(readResult.oDocument().getBook_title());
              return OrientDbWriteMessage.create(sink);
            })
        .groupedWithin(10, Duration.ofMillis(10))
        .runWith(
            OrientDbSink.typed(
                sinkClass2, OrientDbWriteSettings.create(oDatabase), sink2.class),
            system);

Source configuration

We can configure the source by OrientDbSourceSettings.

Scala
source// re-iterating default values
val sourceSettings = OrientDbSourceSettings(oDatabase)
  .withSkip(0)
  .withLimit(10)
Java
source// re-iterating default values
OrientDbSourceSettings sourceSettings =
    OrientDbSourceSettings.create(oDatabase).withSkip(0).withLimit(10);
Parameter Default Description
skip 0 Rows skipped in the beginning of the result.
limit 10 Result items fetched per query.

Writing to OrientDB

You can also build flow stages. The API is similar to creating Sinks.

Scala
source
val f1 = OrientDbSource( sourceClass, OrientDbSourceSettings(oDatabase) ).map { message: OrientDbReadResult[ODocument] => OrientDbWriteMessage(message.oDocument) } .groupedWithin(10, 50.millis) .via( OrientDbFlow.create( sink5, OrientDbWriteSettings(oDatabase) ) ) .runWith(Sink.seq)
Java
sourceCompletionStage<List<List<OrientDbWriteMessage<ODocument, NotUsed>>>> f1 =
    OrientDbSource.create(sourceClass, OrientDbSourceSettings.create(oDatabase), null)
        .map(m -> OrientDbWriteMessage.create(m.oDocument()))
        .groupedWithin(10, Duration.ofMillis(10))
        .via(OrientDbFlow.create(sink3, OrientDbWriteSettings.create(oDatabase)))
        .runWith(Sink.seq(), system);

Passing data through OrientDBFlow

When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to OrientDB.

Scala
source// We're going to pretend we got messages from kafka.
// After we've written them to oRIENTdb, we want
// to commit the offset to Kafka

case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)

val messagesFromKafka = List(
  KafkaMessage(Book("Book 1"), KafkaOffset(0)),
  KafkaMessage(Book("Book 2"), KafkaOffset(1)),
  KafkaMessage(Book("Book 3"), KafkaOffset(2))
)

var committedOffsets = List[KafkaOffset]()

def commitToKafka(offset: KafkaOffset): Unit =
  committedOffsets = committedOffsets :+ offset

val f1 = Source(messagesFromKafka)
  .map { kafkaMessage: KafkaMessage =>
    val book = kafkaMessage.book
    val id = book.title
    println("title: " + book.title)

    OrientDbWriteMessage(new ODocument().field("book_title", id), kafkaMessage.offset)
  }
  .groupedWithin(10, 50.millis)
  .via(
    OrientDbFlow.createWithPassThrough(
      sink7,
      OrientDbWriteSettings(oDatabase)
    )
  )
  .map { messages: Seq[OrientDbWriteMessage[ODocument, KafkaOffset]] =>
    messages.foreach { message =>
      commitToKafka(message.passThrough)
    }
  }
  .runWith(Sink.ignore)
Java
source// We're going to pretend we got messages from kafka.
// After we've written them to OrientDB, we want
// to commit the offset to Kafka

List<Integer> committedOffsets = new ArrayList<>();
List<messagesFromKafka> messagesFromKafkas =
    Arrays.asList(
        new messagesFromKafka("Akka Concurrency", new KafkaOffset(0)),
        new messagesFromKafka("Akka in Action", new KafkaOffset(1)),
        new messagesFromKafka("Effective Akka", new KafkaOffset(2)));

Consumer<KafkaOffset> commitToKafka =
    new Consumer<KafkaOffset>() {
      @Override
      public void accept(KafkaOffset kafkaOffset) {
        committedOffsets.add(kafkaOffset.getOffset());
      }
    };

Source.from(messagesFromKafkas)
    .map(
        kafkaMessage -> {
          String book_title = kafkaMessage.getBook_title();
          return OrientDbWriteMessage.create(
              new ODocument().field("book_title", book_title), kafkaMessage.kafkaOffset);
        })
    .groupedWithin(10, Duration.ofMillis(10))
    .via(OrientDbFlow.createWithPassThrough(sink6, OrientDbWriteSettings.create(oDatabase)))
    .map(
        messages -> {
          ODatabaseDocumentTx db = oDatabase.acquire();
          db.setDatabaseOwner(new OObjectDatabaseTx(db));
          ODatabaseRecordThreadLocal.instance().set(db);
          messages.stream().forEach(message -> commitToKafka.accept(message.passThrough()));
          return NotUsed.getInstance();
        })
    .runWith(Sink.seq(), system)
    .toCompletableFuture()
    .get(10, TimeUnit.SECONDS);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.