Elasticsearch

The Elasticsearch connector provides Akka Stream sources and sinks for Elasticsearch.

For more information about Elasticsearch please visit the Elasticsearch documentation.

Reported issues

Tagged issues at Github

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" % "0.20"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-elasticsearch_2.12</artifactId>
  <version>0.20</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-elasticsearch_2.12', version: '0.20'
}

Set up REST client

Sources, Flows and Sinks provided by this connector need a prepared org.elasticsearch.client.RestClient to access to Elasticsearch.

Scala
import org.elasticsearch.client.RestClient
import org.apache.http.HttpHost

implicit val client: RestClient = RestClient.builder(new HttpHost("localhost", 9201)).build()
Full source at GitHub
Java
import org.elasticsearch.client.RestClient;
import org.apache.http.HttpHost;

client = RestClient.builder(new HttpHost("localhost", 9201)).build();
Full source at GitHub

Elasticsearch as Source and Sink

Now we can stream messages from or to Elasticsearch by providing the RestClient to the ElasticsearchSource ElasticsearchSource or the ElasticsearchSink. ElasticsearchSink.

Scala
import spray.json._
import DefaultJsonProtocol._

case class Book(title: String)

implicit val format: JsonFormat[Book] = jsonFormat1(Book)
Full source at GitHub
Java
public static class Book {
  public String title;

  public Book() {}

  public Book(String title) {
    this.title = title;
  }
}
Full source at GitHub

With typed source

Use ElasticsearchSource.typed and ElasticsearchSink.create to create source and sink. The data is converted to and from JSON by Spray JSON. The data is converted to and from JSON by Jackson’s ObjectMapper.

Scala
val f1 = ElasticsearchSource
  .typed[Book](
    indexName = "source",
    typeName = "book",
    query = """{"match_all": {}}"""
  )
  .map { message: OutgoingMessage[Book] =>
    IncomingMessage(Some(message.id), message.source)
  }
  .runWith(
    ElasticsearchSink.create[Book](
      indexName = "sink2",
      typeName = "book"
    )
  )
Full source at GitHub
Java
ElasticsearchSourceSettings sourceSettings = ElasticsearchSourceSettings.Default();
ElasticsearchSinkSettings sinkSettings = ElasticsearchSinkSettings.Default();

Source<OutgoingMessage<Book>, NotUsed> source =
    ElasticsearchSource.typed(
        "source", "book", "{\"match_all\": {}}", sourceSettings, client, Book.class);
CompletionStage<Done> f1 =
    source
        .map(m -> IncomingMessage.create(m.id(), m.source()))
        .runWith(
            ElasticsearchSink.create("sink2", "book", sinkSettings, client, new ObjectMapper()),
            materializer);
Full source at GitHub

With JSON source

Use ElasticsearchSource.create and ElasticsearchSink.create to create source and sink.

Scala
val f1 = ElasticsearchSource
  .create(
    indexName = "source",
    typeName = "book",
    query = """{"match_all": {}}"""
  )
  .map { message: OutgoingMessage[spray.json.JsObject] =>
    val book: Book = jsonReader[Book].read(message.source)
    IncomingMessage(Some(message.id), book)
  }
  .runWith(
    ElasticsearchSink.create[Book](
      indexName = "sink2",
      typeName = "book"
    )
  )
Full source at GitHub
Java
ElasticsearchSourceSettings sourceSettings = ElasticsearchSourceSettings.Default();
ElasticsearchSinkSettings sinkSettings = ElasticsearchSinkSettings.Default();

Source<OutgoingMessage<Map<String, Object>>, NotUsed> source =
    ElasticsearchSource.create("source", "book", "{\"match_all\": {}}", sourceSettings, client);
CompletionStage<Done> f1 =
    source
        .map(m -> IncomingMessage.create(m.id(), m.source()))
        .runWith(
            ElasticsearchSink.create("sink1", "book", sinkSettings, client, new ObjectMapper()),
            materializer);
Full source at GitHub

Configuration

We can configure the source by ElasticsearchSourceSettings.

Scala
val sourceSettings = ElasticsearchSourceSettings(bufferSize = 10)
Full source at GitHub
Java
ElasticsearchSourceSettings sourceSettings =
    ElasticsearchSourceSettings.Default().withBufferSize(10);
Full source at GitHub
Parameter Default Description
bufferSize 10 ElasticsearchSource retrieves messages from Elasticsearch by scroll scan. This buffer size is used as the scroll size.
includeDocumentVersion false Tell Elasticsearch to return the documents _version property with the search results. See Version and Optimistic Concurrenct Control to know about this property.

Also, we can configure the sink by ElasticsearchSinkSettings.

Scala
val sinkSettings =
  ElasticsearchSinkSettings(bufferSize = 10, retryInterval = 5000, maxRetry = 100, retryPartialFailure = true)
Full source at GitHub
Java
ElasticsearchSinkSettings sinkSettings =
    ElasticsearchSinkSettings.Default()
        .withBufferSize(10)
        .withRetryInterval(5000)
        .withMaxRetry(100)
        .withRetryPartialFailure(true);
Full source at GitHub
Parameter Default Description
bufferSize 10 ElasticsearchSink puts messages by one bulk request per messages of this buffer size.
retryInterval 5000 When a request is failed, ElasticsearchSink retries that request after this interval (milliseconds).
maxRetry 100 ElasticsearchSink give up and fails the stage if it gets this number of consective failures.
retryPartialFailure true A bulk request might fails partially for some reason. If this parameter is true, then ElasticsearchSink retries to request these failed messages. Otherwise, failed messages are discarded (or pushed to downstream if you use ElasticsearchFlow instead of the sink).
docAsUpsert false If this parameter is true, ElasticsearchSink uses the upsert method to index documents. By default, documents are added using the standard index method (which create or replace).
versionType None If set, ElasticsearchSink uses the chosen versionType to index documents. See Version types for accepted settings.

Elasticsearch as Flow

You can also build flow stages with ElasticsearchFlow. ElasticsearchFlow. The API is similar to creating Sinks.

Scala
val f1 = ElasticsearchSource
  .typed[Book](
    indexName = "source",
    typeName = "book",
    query = """{"match_all": {}}"""
  )
  .map { message: OutgoingMessage[Book] =>
    IncomingMessage(Some(message.id), message.source)
  }
  .via(
    ElasticsearchFlow.create[Book](
      indexName = "sink3",
      typeName = "book"
    )
  )
  .runWith(Sink.seq)
val f1 = Source(books)
  .map { book: (String, Book) =>
    IncomingMessage(Some(book._1), book._2)
  }
  .via(
    ElasticsearchFlow.create[Book](
      "sink7",
      "book",
      ElasticsearchSinkSettings(bufferSize = 5, docAsUpsert = true)
    )
  )
  .runWith(Sink.seq)
val f2 = Source(updatedBooks)
  .map { book: (String, JsObject) =>
    IncomingMessage(Some(book._1), book._2)
  }
  .via(
    ElasticsearchFlow.create[JsObject](
      "sink7",
      "book",
      ElasticsearchSinkSettings(bufferSize = 5, docAsUpsert = true)
    )
  )
  .runWith(Sink.seq)
Full source at GitHub
Java
CompletionStage<List<List<IncomingMessageResult<Book, NotUsed>>>> f1 =
    ElasticsearchSource.typed(
            "source",
            "book",
            "{\"match_all\": {}}",
            ElasticsearchSourceSettings.Default().withBufferSize(5),
            client,
            Book.class)
        .map(m -> IncomingMessage.create(m.id(), m.source()))
        .via(
            ElasticsearchFlow.create(
                "sink3",
                "book",
                ElasticsearchSinkSettings.Default().withBufferSize(5),
                client,
                new ObjectMapper()))
        .runWith(Sink.seq(), materializer);
Full source at GitHub

Passing data through ElasticsearchFlow

When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to Elastic.

Scala
// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// to commit the offset to Kafka

case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)

val messagesFromKafka = List(
  KafkaMessage(Book("Book 1"), KafkaOffset(0)),
  KafkaMessage(Book("Book 2"), KafkaOffset(1)),
  KafkaMessage(Book("Book 3"), KafkaOffset(2))
)

var committedOffsets = List[KafkaOffset]()

def commitToKakfa(offset: KafkaOffset): Unit =
  committedOffsets = committedOffsets :+ offset

val f1 = Source(messagesFromKafka) // Assume we get this from Kafka
  .map { kafkaMessage: KafkaMessage =>
    val book = kafkaMessage.book
    val id = book.title
    println("title: " + book.title)

    // Transform message so that we can write to elastic
    IncomingMessage(Some(id), book, kafkaMessage.offset)
  }
  .via( // write to elastic
    ElasticsearchFlow.createWithPassThrough[Book, KafkaOffset](
      indexName = "sink6",
      typeName = "book"
    )
  )
  .map { messageResults =>
    messageResults.foreach { result =>
      if (!result.success) throw new Exception("Failed to write message to elastic")
      // Commit to kafka
      commitToKakfa(result.passThrough)
    }
  }
  .runWith(Sink.seq)

Await.ready(f1, Duration.Inf)
Full source at GitHub
Java
// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// to commit the offset to Kafka

List<KafkaMessage> messagesFromKafka =
    Arrays.asList(
        new KafkaMessage(new Book("Book 1"), new KafkaOffset(0)),
        new KafkaMessage(new Book("Book 2"), new KafkaOffset(1)),
        new KafkaMessage(new Book("Book 3"), new KafkaOffset(2)));

final KafkaCommitter kafkaCommitter = new KafkaCommitter();

Source.from(messagesFromKafka) // Assume we get this from Kafka
    .map(
        kafkaMessage -> {
          Book book = kafkaMessage.book;
          String id = book.title;

          // Transform message so that we can write to elastic
          return IncomingMessage.create(id, book, kafkaMessage.offset);
        })
    .via( // write to elastic
        ElasticsearchFlow.createWithPassThrough(
            "sink6",
            "book",
            ElasticsearchSinkSettings.Default().withBufferSize(5),
            client,
            new ObjectMapper()))
    .map(
        messageResults -> {
          messageResults
              .stream()
              .forEach(
                  result -> {
                    if (!result.success())
                      throw new RuntimeException("Failed to write message to elastic");
                    // Commit to kafka
                    kafkaCommitter.commit(result.passThrough());
                  });
          return NotUsed.getInstance();
        })
    .runWith(Sink.seq(), materializer) // Run it
    .toCompletableFuture()
    .get(); // Wait for it to complete
Full source at GitHub

Specifying custom index-name for every document

When working with index-patterns using wildcards, you might need to specify a custom index-name for each document:

Scala
val customIndexName = "custom-index"

val f1 = ElasticsearchSource
  .typed[Book](
    indexName = "source",
    typeName = "book",
    query = """{"match_all": {}}"""
  )
  .map { message: OutgoingMessage[Book] =>
    IncomingMessage(Some(message.id), message.source)
      .withIndexName(customIndexName) // Setting the index-name to use for this document
  }
  .runWith(
    ElasticsearchSink.create[Book](
      indexName = "this-is-not-the-index-we-are-using",
      typeName = "book"
    )
  )
Full source at GitHub

More custom searching

The easiest way of using ElasticSearch-source, is to just specify the query-param. Sometimes you need more control, like specifying which fields to return and so on. In such cases you can instead use ‘searchParams’ instead:

Scala
case class TestDoc(id: String, a: String, b: Option[String], c: String)
// Search for docs and ask elastic to only return some fields

val f3 = ElasticsearchSource
  .typed[TestDoc](indexName,
                  Some(typeName),
                  searchParams = Map(
                    "query" -> """ {"match_all": {}} """,
                    "_source" -> """ ["id", "a", "c"] """
                  ),
                  ElasticsearchSourceSettings.Default)
  .map { message =>
    message.source
  }
  .runWith(Sink.seq)
Full source at GitHub
Java
public static class TestDoc {
  public String id;
  public String a;
  public String b;
  public String c;

// Search for docs and ask elastic to only return some fields

Map<String, String> searchParams = new HashMap<>();
searchParams.put("query", "{\"match_all\": {}}");
searchParams.put("_source", "[\"id\", \"a\", \"c\"]");

List<TestDoc> result =
    ElasticsearchSource.<TestDoc>typed(
            indexName,
            typeName,
            searchParams, // <-- Using searchParams
            ElasticsearchSourceSettings.Default(),
            client,
            TestDoc.class,
            new ObjectMapper())
        .map(
            o -> {
              return o.source(); // These documents will only have property id, a and c (not b)
            })
        .runWith(Sink.seq(), materializer)
        .toCompletableFuture()
        .get();
Full source at GitHub

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> elasticsearch/testOnly *.ElasticsearchSpec
Java
sbt
> elasticsearch/testOnly *.ElasticsearchTest
The source code for this page can be found here.