OrientDB
OrientDB is a multi-model database, supporting graph, document, key/value, and object models, but the relationships are managed as in graph databases with direct connections between records. It supports schema-less, schema-full and schema-mixed modes. It has a strong security profiling system based on users and roles and supports querying with Gremlin along with SQL extended for graph traversal.
For more information about OrientDB please visit the official documentation, more details are available in the OrientDB manual.
The Alpakka OrientDB connector provides Akka Stream sources and sinks for OrientDB.
Project Info: Alpakka OrientDB | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-orientdb
6.0.1
|
JDK versions | Adopt OpenJDK 8 Adopt OpenJDK 11 |
Scala versions | 2.13.10, 2.12.17 |
JPMS module name | akka.stream.alpakka.orientdb |
License | |
Readiness level |
Since 0.17, 2018-02-19
|
Home page | https://doc.akka.io/docs/alpakka/current |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
- Maven
<project> ... <repositories> <repository> <id>akka-repository</id> <name>Akka library repository</name> <url>https://repo.akka.io/maven</url> </repository> </repositories> </project>
- Gradle
repositories { mavenCentral() maven { url "https://repo.akka.io/maven" } }
Additionally, add the dependencies as below.
- sbt
val AkkaVersion = "2.8.1" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-orientdb" % "6.0.1", "com.typesafe.akka" %% "akka-stream" % AkkaVersion )
- Maven
<properties> <akka.version>2.8.1</akka.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-orientdb_${scala.binary.version}</artifactId> <version>6.0.1</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ AkkaVersion: "2.8.1", ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-stream-alpakka-orientdb_${versions.ScalaBinary}:6.0.1" implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.orientechnologies orientdb-graphdb 3.1.9 com.orientechnologies orientdb-object 3.1.9 com.typesafe.akka akka-stream_2.13 2.8.1 org.scala-lang scala-library 2.13.10 - Dependency tree
com.orientechnologies orientdb-graphdb 3.1.9 com.fasterxml.jackson.core jackson-databind 2.12.1 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-annotations 2.12.1 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.12.1 The Apache Software License, Version 2.0 com.orientechnologies orientdb-core 3.1.9 com.github.jnr jnr-posix 3.0.50 Eclipse Public License - v 2.0 com.github.jnr jnr-constants 0.9.12 The Apache Software License, Version 2.0 com.github.jnr jnr-ffi 2.1.10 The Apache Software License, Version 2.0 com.github.jnr jffi 1.2.19 The Apache Software License, Version 2.0 com.github.jnr jnr-a64asm 1.0.0 The Apache Software License, Version 2.0 com.github.jnr jnr-x86asm 1.0.2 MIT License org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-commons 7.1 BSD org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-util 7.1 BSD org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD com.googlecode.concurrentlinkedhashmap concurrentlinkedhashmap-lru 1.4.2 The Apache Software License, Version 2.0 com.ibm.icu icu4j 65.1 Unicode/ICU License commons-lang commons-lang 2.6 javax.activation javax.activation-api 1.2.0 org.lz4 lz4-java 1.4.0 The Apache Software License, Version 2.0 com.orientechnologies orientdb-server 3.1.9 com.orientechnologies orientdb-client 3.1.9 com.orientechnologies orientdb-core 3.1.9 com.github.jnr jnr-posix 3.0.50 Eclipse Public License - v 2.0 com.github.jnr jnr-constants 0.9.12 The Apache Software License, Version 2.0 com.github.jnr jnr-ffi 2.1.10 The Apache Software License, Version 2.0 com.github.jnr jffi 1.2.19 The Apache Software License, Version 2.0 com.github.jnr jnr-a64asm 1.0.0 The Apache Software License, Version 2.0 com.github.jnr jnr-x86asm 1.0.2 MIT License org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-commons 7.1 BSD org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-util 7.1 BSD org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD com.googlecode.concurrentlinkedhashmap concurrentlinkedhashmap-lru 1.4.2 The Apache Software License, Version 2.0 com.ibm.icu icu4j 65.1 Unicode/ICU License commons-lang commons-lang 2.6 javax.activation javax.activation-api 1.2.0 org.lz4 lz4-java 1.4.0 The Apache Software License, Version 2.0 com.orientechnologies orientdb-tools 3.1.9 com.orientechnologies orientdb-client 3.1.9 com.orientechnologies orientdb-core 3.1.9 com.github.jnr jnr-posix 3.0.50 Eclipse Public License - v 2.0 com.github.jnr jnr-constants 0.9.12 The Apache Software License, Version 2.0 com.github.jnr jnr-ffi 2.1.10 The Apache Software License, Version 2.0 com.github.jnr jffi 1.2.19 The Apache Software License, Version 2.0 com.github.jnr jnr-a64asm 1.0.0 The Apache Software License, Version 2.0 com.github.jnr jnr-x86asm 1.0.2 MIT License org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-commons 7.1 BSD org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-util 7.1 BSD org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD com.googlecode.concurrentlinkedhashmap concurrentlinkedhashmap-lru 1.4.2 The Apache Software License, Version 2.0 com.ibm.icu icu4j 65.1 Unicode/ICU License commons-lang commons-lang 2.6 javax.activation javax.activation-api 1.2.0 org.lz4 lz4-java 1.4.0 The Apache Software License, Version 2.0 com.orientechnologies orientdb-core 3.1.9 com.github.jnr jnr-posix 3.0.50 Eclipse Public License - v 2.0 com.github.jnr jnr-constants 0.9.12 The Apache Software License, Version 2.0 com.github.jnr jnr-ffi 2.1.10 The Apache Software License, Version 2.0 com.github.jnr jffi 1.2.19 The Apache Software License, Version 2.0 com.github.jnr jnr-a64asm 1.0.0 The Apache Software License, Version 2.0 com.github.jnr jnr-x86asm 1.0.2 MIT License org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-commons 7.1 BSD org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-util 7.1 BSD org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD com.googlecode.concurrentlinkedhashmap concurrentlinkedhashmap-lru 1.4.2 The Apache Software License, Version 2.0 com.ibm.icu icu4j 65.1 Unicode/ICU License commons-lang commons-lang 2.6 javax.activation javax.activation-api 1.2.0 org.lz4 lz4-java 1.4.0 The Apache Software License, Version 2.0 javax.xml.bind jaxb-api 2.3.0 com.sun.xml.bind jaxb-core 2.3.0.1 com.sun.xml.bind jaxb-impl 2.3.0.1 javax.xml.bind jaxb-api 2.3.0 com.orientechnologies orientdb-tools 3.1.9 com.orientechnologies orientdb-client 3.1.9 com.orientechnologies orientdb-core 3.1.9 com.github.jnr jnr-posix 3.0.50 Eclipse Public License - v 2.0 com.github.jnr jnr-constants 0.9.12 The Apache Software License, Version 2.0 com.github.jnr jnr-ffi 2.1.10 The Apache Software License, Version 2.0 com.github.jnr jffi 1.2.19 The Apache Software License, Version 2.0 com.github.jnr jnr-a64asm 1.0.0 The Apache Software License, Version 2.0 com.github.jnr jnr-x86asm 1.0.2 MIT License org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-commons 7.1 BSD org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-util 7.1 BSD org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD com.googlecode.concurrentlinkedhashmap concurrentlinkedhashmap-lru 1.4.2 The Apache Software License, Version 2.0 com.ibm.icu icu4j 65.1 Unicode/ICU License commons-lang commons-lang 2.6 javax.activation javax.activation-api 1.2.0 org.lz4 lz4-java 1.4.0 The Apache Software License, Version 2.0 com.orientechnologies orientdb-core 3.1.9 com.github.jnr jnr-posix 3.0.50 Eclipse Public License - v 2.0 com.github.jnr jnr-constants 0.9.12 The Apache Software License, Version 2.0 com.github.jnr jnr-ffi 2.1.10 The Apache Software License, Version 2.0 com.github.jnr jffi 1.2.19 The Apache Software License, Version 2.0 com.github.jnr jnr-a64asm 1.0.0 The Apache Software License, Version 2.0 com.github.jnr jnr-x86asm 1.0.2 MIT License org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-commons 7.1 BSD org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-util 7.1 BSD org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD com.googlecode.concurrentlinkedhashmap concurrentlinkedhashmap-lru 1.4.2 The Apache Software License, Version 2.0 com.ibm.icu icu4j 65.1 Unicode/ICU License commons-lang commons-lang 2.6 javax.activation javax.activation-api 1.2.0 org.lz4 lz4-java 1.4.0 The Apache Software License, Version 2.0 javax.xml.bind jaxb-api 2.3.0 commons-collections commons-collections 3.2.2 org.codehaus.groovy groovy-jsr223 2.5.8 The Apache Software License, Version 2.0 org.codehaus.groovy groovy 2.5.8 The Apache Software License, Version 2.0 org.codehaus.groovy groovy 2.5.8 The Apache Software License, Version 2.0 com.orientechnologies orientdb-object 3.1.9 com.orientechnologies orientdb-core 3.1.9 com.github.jnr jnr-posix 3.0.50 Eclipse Public License - v 2.0 com.github.jnr jnr-constants 0.9.12 The Apache Software License, Version 2.0 com.github.jnr jnr-ffi 2.1.10 The Apache Software License, Version 2.0 com.github.jnr jffi 1.2.19 The Apache Software License, Version 2.0 com.github.jnr jnr-a64asm 1.0.0 The Apache Software License, Version 2.0 com.github.jnr jnr-x86asm 1.0.2 MIT License org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-commons 7.1 BSD org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-util 7.1 BSD org.ow2.asm asm-analysis 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm-tree 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD org.ow2.asm asm 7.1 BSD com.googlecode.concurrentlinkedhashmap concurrentlinkedhashmap-lru 1.4.2 The Apache Software License, Version 2.0 com.ibm.icu icu4j 65.1 Unicode/ICU License commons-lang commons-lang 2.6 javax.activation javax.activation-api 1.2.0 org.lz4 lz4-java 1.4.0 The Apache Software License, Version 2.0 org.hibernate.javax.persistence hibernate-jpa-2.0-api 1.0.1.Final org.javassist javassist 3.23.1-GA MPL 1.1 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0
Database connection
Sources, Flows and Sinks provided by this connector need a OPartitionedDatabasePool
to access to OrientDB. It is your responsibility to close the database connection eg. at actor system termination. This API has become deprecated in OrientDB, please suggest a Pull Request to use the latest APIs instead.
- Scala
-
source
import com.orientechnologies.orient.core.db.OPartitionedDatabasePool val url = "remote:127.0.0.1:2424/" val dbName = "GratefulDeadConcertsScala" val dbUrl = s"$url$dbName" val username = "root" val password = "root" val oDatabase: OPartitionedDatabasePool = new OPartitionedDatabasePool(dbUrl, username, password, Runtime.getRuntime.availableProcessors(), 10) system.registerOnTermination(() -> oDatabase.close())
- Java
-
source
import com.orientechnologies.orient.core.db.OPartitionedDatabasePool; private static String url = "remote:127.0.0.1:2424/"; private static String dbName = "GratefulDeadConcertsJava"; private static String dbUrl = url + dbName; private static String username = "root"; private static String password = "root"; oDatabase = new OPartitionedDatabasePool( dbUrl, username, password, Runtime.getRuntime().availableProcessors(), 10); system.registerOnTermination(() -> oDatabase.close());
Reading ODocument
from OrientDB
Now we can stream messages which contain OrientDB’s ODocument
(in Scala or Java) from or to OrientDB by providing the ODatabaseDocumentTx
to the OrientDbSource
OrientDbSource
.
- Scala
-
source
val result: Future[immutable.Seq[String]] = OrientDbSource( sink4, OrientDbSourceSettings(oDatabase) ).map { message: OrientDbReadResult[ODocument] => message.oDocument.field[String]("book_title") } .runWith(Sink.seq)
- Java
-
source
CompletionStage<List<String>> result = OrientDbSource.create(sinkClass1, OrientDbSourceSettings.create(oDatabase)) .map(m -> m.oDocument().<String>field("book_title")) .runWith(Sink.seq(), system);
Typed messages
Also, it’s possible to stream messages which contains any classes.
- Java
-
source
public static class source1 { private String book_title; public void setBook_title(String book_title) { this.book_title = book_title; } public String getBook_title() { return book_title; } } public static class sink2 { private String book_title; public void setBook_title(String book_title) { this.book_title = book_title; } public String getBook_title() { return book_title; } }
Use OrientDbSource.typed
and OrientDbSink.typed
to create source and sink instead.
- Scala
-
source
val streamCompletion: Future[Done] = OrientDbSource .typed(sourceClass, OrientDbSourceSettings(oDatabase), classOf[OrientDbTest.source1]) .map { m: OrientDbReadResult[OrientDbTest.source1] => val db: ODatabaseDocumentTx = oDatabase.acquire db.setDatabaseOwner(new OObjectDatabaseTx(db)) ODatabaseRecordThreadLocal.instance.set(db) val sink: OrientDbTest.sink2 = new OrientDbTest.sink2 sink.setBook_title(m.oDocument.getBook_title) OrientDbWriteMessage(sink) } .groupedWithin(10, 10.millis) .runWith(OrientDbSink.typed(sinkClass2, OrientDbWriteSettings.create(oDatabase), classOf[OrientDbTest.sink2]))
- Java
-
source
CompletionStage<Done> f1 = OrientDbSource.typed( sourceClass, OrientDbSourceSettings.create(oDatabase), source1.class, null) .map( readResult -> { ODatabaseDocumentTx db = oDatabase.acquire(); db.setDatabaseOwner(new OObjectDatabaseTx(db)); ODatabaseRecordThreadLocal.instance().set(db); sink2 sink = new sink2(); sink.setBook_title(readResult.oDocument().getBook_title()); return OrientDbWriteMessage.create(sink); }) .groupedWithin(10, Duration.ofMillis(10)) .runWith( OrientDbSink.typed( sinkClass2, OrientDbWriteSettings.create(oDatabase), sink2.class), system);
Source configuration
We can configure the source by OrientDbSourceSettings
.
- Scala
-
source
// re-iterating default values val sourceSettings = OrientDbSourceSettings(oDatabase) .withSkip(0) .withLimit(10)
- Java
-
source
// re-iterating default values OrientDbSourceSettings sourceSettings = OrientDbSourceSettings.create(oDatabase).withSkip(0).withLimit(10);
Parameter | Default | Description |
---|---|---|
skip | 0 | Rows skipped in the beginning of the result. |
limit | 10 | Result items fetched per query. |
Writing to OrientDB
You can also build flow stages. The API is similar to creating Sinks.
- Scala
-
source
val f1 = OrientDbSource( sourceClass, OrientDbSourceSettings(oDatabase) ).map { message: OrientDbReadResult[ODocument] => OrientDbWriteMessage(message.oDocument) } .groupedWithin(10, 50.millis) .via( OrientDbFlow.create( sink5, OrientDbWriteSettings(oDatabase) ) ) .runWith(Sink.seq) - Java
-
source
CompletionStage<List<List<OrientDbWriteMessage<ODocument, NotUsed>>>> f1 = OrientDbSource.create(sourceClass, OrientDbSourceSettings.create(oDatabase), null) .map(m -> OrientDbWriteMessage.create(m.oDocument())) .groupedWithin(10, Duration.ofMillis(10)) .via(OrientDbFlow.create(sink3, OrientDbWriteSettings.create(oDatabase))) .runWith(Sink.seq(), system);
Passing data through OrientDBFlow
When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to OrientDB.
- Scala
-
source
// We're going to pretend we got messages from kafka. // After we've written them to oRIENTdb, we want // to commit the offset to Kafka case class KafkaOffset(offset: Int) case class KafkaMessage(book: Book, offset: KafkaOffset) val messagesFromKafka = List( KafkaMessage(Book("Book 1"), KafkaOffset(0)), KafkaMessage(Book("Book 2"), KafkaOffset(1)), KafkaMessage(Book("Book 3"), KafkaOffset(2)) ) var committedOffsets = List[KafkaOffset]() def commitToKafka(offset: KafkaOffset): Unit = committedOffsets = committedOffsets :+ offset val f1 = Source(messagesFromKafka) .map { kafkaMessage: KafkaMessage => val book = kafkaMessage.book val id = book.title println("title: " + book.title) OrientDbWriteMessage(new ODocument().field("book_title", id), kafkaMessage.offset) } .groupedWithin(10, 50.millis) .via( OrientDbFlow.createWithPassThrough( sink7, OrientDbWriteSettings(oDatabase) ) ) .map { messages: Seq[OrientDbWriteMessage[ODocument, KafkaOffset]] => messages.foreach { message => commitToKafka(message.passThrough) } } .runWith(Sink.ignore)
- Java
-
source
// We're going to pretend we got messages from kafka. // After we've written them to OrientDB, we want // to commit the offset to Kafka List<Integer> committedOffsets = new ArrayList<>(); List<messagesFromKafka> messagesFromKafkas = Arrays.asList( new messagesFromKafka("Akka Concurrency", new KafkaOffset(0)), new messagesFromKafka("Akka in Action", new KafkaOffset(1)), new messagesFromKafka("Effective Akka", new KafkaOffset(2))); Consumer<KafkaOffset> commitToKafka = new Consumer<KafkaOffset>() { @Override public void accept(KafkaOffset kafkaOffset) { committedOffsets.add(kafkaOffset.getOffset()); } }; Source.from(messagesFromKafkas) .map( kafkaMessage -> { String book_title = kafkaMessage.getBook_title(); return OrientDbWriteMessage.create( new ODocument().field("book_title", book_title), kafkaMessage.kafkaOffset); }) .groupedWithin(10, Duration.ofMillis(10)) .via(OrientDbFlow.createWithPassThrough(sink6, OrientDbWriteSettings.create(oDatabase))) .map( messages -> { ODatabaseDocumentTx db = oDatabase.acquire(); db.setDatabaseOwner(new OObjectDatabaseTx(db)); ODatabaseRecordThreadLocal.instance().set(db); messages.stream().forEach(message -> commitToKafka.accept(message.passThrough())); return NotUsed.getInstance(); }) .runWith(Sink.seq(), system) .toCompletableFuture() .get(10, TimeUnit.SECONDS);