Elasticsearch Connector

The Elasticsearch connector provides Akka Stream sources and sinks for Elasticsearch.

For more information about Elasticsearch please visit the Elasticsearch documentation.

Reported issues

Tagged issues at Github

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" % "0.19"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-elasticsearch_2.12</artifactId>
  <version>0.19</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-elasticsearch_2.12', version: '0.19'
}

Set up REST client

Sources, Flows and Sinks provided by this connector need a prepared org.elasticsearch.client.RestClient to access to Elasticsearch.

Scala
import org.elasticsearch.client.RestClient
import org.apache.http.HttpHost

implicit val client: RestClient = RestClient.builder(new HttpHost("localhost", 9201)).build()
Java
import org.elasticsearch.client.RestClient;
import org.apache.http.HttpHost;

    client = RestClient.builder(new HttpHost("localhost", 9201)).build();

Elasticsearch as Source and Sink

Now we can stream messages from or to Elasticsearch by providing the RestClient to the ElasticsearchSource ElasticsearchSource or the ElasticsearchSink. ElasticsearchSink.

Scala
import spray.json._
import DefaultJsonProtocol._

case class Book(title: String)

implicit val format: JsonFormat[Book] = jsonFormat1(Book)
Java
public static class Book {
  public String title;

  public Book() {}

  public Book(String title) {
    this.title = title;
  }
}

With typed source

Use ElasticsearchSource.typed and ElasticsearchSink.create to create source and sink. The data is converted to and from JSON by Spray JSON. The data is converted to and from JSON by Jackson’s ObjectMapper.

Scala
val f1 = ElasticsearchSource
  .typed[Book](
    indexName = "source",
    typeName = "book",
    query = """{"match_all": {}}"""
  )
  .map { message: OutgoingMessage[Book] =>
    IncomingMessage(Some(message.id), message.source)
  }
  .runWith(
    ElasticsearchSink.create[Book](
      indexName = "sink2",
      typeName = "book"
    )
  )
Java
ElasticsearchSourceSettings sourceSettings = new ElasticsearchSourceSettings();
ElasticsearchSinkSettings sinkSettings = new ElasticsearchSinkSettings();

Source<OutgoingMessage<Book>, NotUsed> source = ElasticsearchSource.typed(
    "source",
    "book",
    "{\"match_all\": {}}",
    sourceSettings,
    client,
    Book.class
);
CompletionStage<Done> f1 = source
    .map(m -> IncomingMessage.create(m.id(), m.source()))
    .runWith(
        ElasticsearchSink.create(
            "sink2",
            "book",
            sinkSettings,
            client,
            new ObjectMapper()
        ),
        materializer);

With JSON source

Use ElasticsearchSource.create and ElasticsearchSink.create to create source and sink.

Scala
val f1 = ElasticsearchSource
  .create(
    indexName = "source",
    typeName = "book",
    query = """{"match_all": {}}"""
  )
  .map { message: OutgoingMessage[spray.json.JsObject] =>
    val book: Book = jsonReader[Book].read(message.source)
    IncomingMessage(Some(message.id), book)
  }
  .runWith(
    ElasticsearchSink.create[Book](
      indexName = "sink2",
      typeName = "book"
    )
  )
Java
ElasticsearchSourceSettings sourceSettings = new ElasticsearchSourceSettings();
ElasticsearchSinkSettings sinkSettings = new ElasticsearchSinkSettings();

Source<OutgoingMessage<Map<String, Object>>, NotUsed> source = ElasticsearchSource.create(
    "source",
    "book",
    "{\"match_all\": {}}",
    sourceSettings,
    client
);
CompletionStage<Done> f1 = source
    .map(m -> IncomingMessage.create(m.id(), m.source()))
    .runWith(
        ElasticsearchSink.create(
            "sink1",
            "book",
            sinkSettings,
            client,
            new ObjectMapper()
        ),
        materializer);

Configuration

We can configure the source by ElasticsearchSourceSettings.

Scala
import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSourceSettings

val sourceSettings = ElasticsearchSourceSettings(bufferSize = 10)
Java
import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSourceSettings;

    ElasticsearchSourceSettings sourceSettings = new ElasticsearchSourceSettings()
        .withBufferSize(10);
Parameter Default Description
bufferSize 10 ElasticsearchSource retrieves messages from Elasticsearch by scroll scan. This buffer size is used as the scroll size.

Also, we can configure the sink by ElasticsearchSinkSettings.

Scala
import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSinkSettings

val sinkSettings =
  ElasticsearchSinkSettings(bufferSize = 10, retryInterval = 5000, maxRetry = 100, retryPartialFailure = true)
Java
import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSinkSettings;

    ElasticsearchSinkSettings sinkSettings =
        new ElasticsearchSinkSettings()
            .withBufferSize(10)
            .withRetryInterval(5000)
            .withMaxRetry(100)
            .withRetryPartialFailure(true);
Parameter Default Description
bufferSize 10 ElasticsearchSink puts messages by one bulk request per messages of this buffer size.
retryInterval 5000 When a request is failed, ElasticsearchSink retries that request after this interval (milliseconds).
maxRetry 100 ElasticsearchSink give up and fails the stage if it gets this number of consective failures.
retryPartialFailure true A bulk request might fails partially for some reason. If this parameter is true, then ElasticsearchSink retries to request these failed messages. Otherwise, failed messages are discarded (or pushed to downstream if you use ElasticsearchFlow instead of the sink).
docAsUpsert false If this parameter is true, ElasticsearchSink uses the upsert method to index documents. By default, documents are added using the standard index method (which create or replace).
versionType None If set, ElasticsearchSink uses the chosen versionType to index documents. See Version types for accepted settings.

Elasticsearch as Flow

You can also build flow stages with ElasticsearchFlow. ElasticsearchFlow. The API is similar to creating Sinks.

Scala
val f1 = ElasticsearchSource
  .typed[Book](
    indexName = "source",
    typeName = "book",
    query = """{"match_all": {}}"""
  )
  .map { message: OutgoingMessage[Book] =>
    IncomingMessage(Some(message.id), message.source)
  }
  .via(
    ElasticsearchFlow.create[Book](
      indexName = "sink3",
      typeName = "book"
    )
  )
  .runWith(Sink.seq)
val f1 = Source(books)
  .map { book: (String, Book) =>
    IncomingMessage(Some(book._1), book._2)
  }
  .via(
    ElasticsearchFlow.create[Book](
      "sink7",
      "book",
      ElasticsearchSinkSettings(bufferSize = 5, docAsUpsert = true)
    )
  )
  .runWith(Sink.seq)
val f2 = Source(updatedBooks)
  .map { book: (String, JsObject) =>
    IncomingMessage(Some(book._1), book._2)
  }
  .via(
    ElasticsearchFlow.create[JsObject](
      "sink7",
      "book",
      ElasticsearchSinkSettings(bufferSize = 5, docAsUpsert = true)
    )
  )
  .runWith(Sink.seq)
Java
CompletionStage<List<List<IncomingMessageResult<Book, NotUsed>>>> f1 = ElasticsearchSource.typed(
    "source",
    "book",
    "{\"match_all\": {}}",
    new ElasticsearchSourceSettings().withBufferSize(5),
    client,
    Book.class)
    .map(m -> IncomingMessage.create(m.id(), m.source()))
    .via(ElasticsearchFlow.create(
            "sink3",
            "book",
            new ElasticsearchSinkSettings().withBufferSize(5),
            client,
            new ObjectMapper()))
    .runWith(Sink.seq(), materializer);

Passing data through ElasticsearchFlow

When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to Elastic.

Scala
// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// to commit the offset to Kafka

case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)

val messagesFromKafka = List(
  KafkaMessage(Book("Book 1"), KafkaOffset(0)),
  KafkaMessage(Book("Book 2"), KafkaOffset(1)),
  KafkaMessage(Book("Book 3"), KafkaOffset(2))
)

var committedOffsets = List[KafkaOffset]()

def commitToKakfa(offset: KafkaOffset): Unit =
  committedOffsets = committedOffsets :+ offset

val f1 = Source(messagesFromKafka) // Assume we get this from Kafka
  .map { kafkaMessage: KafkaMessage =>
    val book = kafkaMessage.book
    val id = book.title
    println("title: " + book.title)

    // Transform message so that we can write to elastic
    IncomingMessage(Some(id), book, kafkaMessage.offset)
  }
  .via( // write to elastic
    ElasticsearchFlow.createWithPassThrough[Book, KafkaOffset](
      indexName = "sink6",
      typeName = "book"
    )
  )
  .map { messageResults =>
    messageResults.foreach { result =>
      if (!result.success) throw new Exception("Failed to write message to elastic")
      // Commit to kafka
      commitToKakfa(result.passThrough)
    }
  }
  .runWith(Sink.seq)

Await.ready(f1, Duration.Inf)
Java
// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// to commit the offset to Kafka

List<KafkaMessage> messagesFromKafka = Arrays.asList(
        new KafkaMessage(new Book("Book 1"), new KafkaOffset(0)),
        new KafkaMessage(new Book("Book 2"), new KafkaOffset(1)),
        new KafkaMessage(new Book("Book 3"), new KafkaOffset(2))
);

final KafkaCommitter kafkaCommitter = new KafkaCommitter();

Source.from(messagesFromKafka) // Assume we get this from Kafka
        .map(kafkaMessage -> {
          Book book = kafkaMessage.book;
          String id = book.title;

          // Transform message so that we can write to elastic
          return IncomingMessage.create(id, book, kafkaMessage.offset);
        })
        .via( // write to elastic
                ElasticsearchFlow.createWithPassThrough(
                        "sink6",
                        "book",
                        new ElasticsearchSinkSettings().withBufferSize(5),
                        client,
                        new ObjectMapper())
        ).map(messageResults -> {
          messageResults.stream()
                  .forEach(result -> {
                    if (!result.success()) throw new RuntimeException("Failed to write message to elastic");
                    // Commit to kafka
                    kafkaCommitter.commit(result.passThrough());
                  });
          return NotUsed.getInstance();

        }).runWith(Sink.seq(), materializer) // Run it
        .toCompletableFuture().get(); // Wait for it to complete

Specifying custom index-name for every document

When working with index-patterns using wildcards, you might need to specify a custom index-name for each document:

Scala
val customIndexName = "custom-index"

val f1 = ElasticsearchSource
  .typed[Book](
    indexName = "source",
    typeName = "book",
    query = """{"match_all": {}}"""
  )
  .map { message: OutgoingMessage[Book] =>
    IncomingMessage(Some(message.id), message.source)
      .withIndexName(customIndexName) // Setting the index-name to use for this document
  }
  .runWith(
    ElasticsearchSink.create[Book](
      indexName = "this-is-not-the-index-we-are-using",
      typeName = "book"
    )
  )

More custom searching

The easiest way of using ElasticSearch-source, is to just specify the query-param. Sometimes you need more control, like specifying which fields to return and so on. In such cases you can instead use ‘searchParams’ instead:

Scala
case class TestDoc(id: String, a: String, b: Option[String], c: String)
// Search for docs and ask elastic to only return some fields

val f3 = ElasticsearchSource
  .typed[TestDoc](indexName,
                  Some(typeName),
                  searchParams = Map(
                    "query" -> """ {"match_all": {}} """,
                    "_source" -> """ ["id", "a", "c"] """
                  ),
                  ElasticsearchSourceSettings())
  .map { message =>
    message.source
  }
  .runWith(Sink.seq)
Java
public static class TestDoc {
  public String id;
  public String a;
  public String b;
  public String c;
}
  // Search for docs and ask elastic to only return some fields

  Map<String, String> searchParams = new HashMap<>();
  searchParams.put("query", "{\"match_all\": {}}");
  searchParams.put("_source", "[\"id\", \"a\", \"c\"]");


  List<TestDoc> result = ElasticsearchSource.<TestDoc>typed(
          indexName,
          typeName,
          searchParams, // <-- Using searchParams
          new ElasticsearchSourceSettings(),
          client,
          TestDoc.class,
          new ObjectMapper())
          .map(o -> {
            return o.source(); // These documents will only have property id, a and c (not b)
          })
          .runWith(Sink.seq(), materializer)
          .toCompletableFuture().get();

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> elasticsearch/testOnly *.ElasticsearchSpec
Java
sbt
> elasticsearch/testOnly *.ElasticsearchTest
The source code for this page can be found here.