Elasticsearch

Example: Index all data from an RDBMS table into Elasticsearch

  • Instantiate a Slick database session using the config parameters defined in key slick-h2-mem and mount closing it on shutdown of the Actor System (1)
  • Scala only: Slick definition of the MOVIE table (2)
  • Class that holds the Movie data (3)
  • Instantiate Elastic REST client (4)
  • Scala: Instantiate the Spray json format that converts the Movie case class to json (5)
  • Java: Instantiate the Jackson Object mapper that converts the Movie class to json (5)
  • Construct the Slick Source for the H2 table and query all data in the table (6)
  • Scala only: Map each tuple into a Movie case class instance (7)
  • The first argument of the IncomingMessage is the id of the document. Replace with None if you would Elastic to generate one (8)
  • Prepare the Elastic Sink that the data needs to be drained to (9)
  • Close the Elastic client upon completion of indexing the data (10)
Scala
  import akka.Done

  import akka.stream.alpakka.elasticsearch.IncomingMessage
  import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSink
  import org.apache.http.HttpHost
  import org.elasticsearch.client.RestClient

  import akka.stream.alpakka.slick.javadsl.SlickSession
  import akka.stream.alpakka.slick.scaladsl.Slick

  import spray.json.DefaultJsonProtocol._
  import spray.json.JsonFormat

  import scala.concurrent.Future
  import scala.concurrent.duration._

implicit val session = SlickSession.forConfig("slick-h2-mem")                         // (1)
actorSystem.registerOnTermination(session.close())

import session.profile.api._
class Movies(tag: Tag) extends Table[(Int, String, String, Double)](tag, "MOVIE") {   // (2)
  def id = column[Int]("ID")
  def title = column[String]("TITLE")
  def genre = column[String]("GENRE")
  def gross = column[Double]("GROSS")

  override def * = (id, title, genre, gross)
}

case class Movie(id: Int, title: String, genre: String, gross: Double)                // (3)

implicit val elasticSearchClient: RestClient =
  RestClient.builder(new HttpHost("localhost", 9201)).build()                         // (4)
implicit val format: JsonFormat[Movie] = jsonFormat4(Movie)                           // (5)

val done: Future[Done] =
  Slick
    .source(TableQuery[Movies].result)                                                // (6)
    .map {                                                                            // (7)
      case (id, genre, title, gross) => Movie(id, genre, title, gross)
    }
    .map(movie => IncomingMessage(Option(movie.id).map(_.toString), movie))           // (8)
    .runWith(ElasticsearchSink.create[Movie]("movie", "boxoffice"))                   // (9)

done.onComplete {
  case _ =>
    elasticSearchClient.close()                                                       // (10)
}
Full source at GitHub
Java
import akka.Done;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;

import akka.stream.alpakka.elasticsearch.ElasticsearchSinkSettings;
import akka.stream.alpakka.elasticsearch.IncomingMessage;
import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;

import akka.stream.alpakka.slick.javadsl.Slick;
import akka.stream.alpakka.slick.javadsl.SlickRow;
import akka.stream.alpakka.slick.javadsl.SlickSession;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.util.concurrent.CompletionStage;


static class Movie { // (3)
  public final int id;
  public final String title;
  public final String genre;
  public final double gross;

  @JsonCreator
  public Movie(
      @JsonProperty("id") int id,
      @JsonProperty("title") String title,
      @JsonProperty("genre") String genre,
      @JsonProperty("gross") double gross) {
    this.id = id;
    this.title = title;
    this.genre = genre;
    this.gross = gross;
  }
}

ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);

SlickSession session = SlickSession.forConfig("slick-h2-mem"); // (1)
system.registerOnTermination(session::close);

RestClient elasticSearchClient =
    RestClient.builder(new HttpHost("localhost", 9201)).build(); // (4)

final ObjectMapper objectToJsonMapper = new ObjectMapper(); // (5)

final CompletionStage<Done> done =
    Slick.source( // (6)
            session,
            "SELECT * FROM MOVIE",
            (SlickRow row) ->
                new Movie(row.nextInt(), row.nextString(), row.nextString(), row.nextDouble()))
        .map(movie -> IncomingMessage.create(String.valueOf(movie.id), movie)) // (8)
        .runWith(
            ElasticsearchSink.create( // (9)
                "movie",
                "boxoffice",
                ElasticsearchSinkSettings.Default(),
                elasticSearchClient,
                objectToJsonMapper),
            materializer);

done.thenRunAsync(
        () -> {
          try {
            elasticSearchClient.close(); // (10)
          } catch (IOException ignored) {
            ignored.printStackTrace();
          }
        },
        system.dispatcher())
Full source at GitHub

Running the example code

This example is contained in a stand-alone runnable main, it can be run from sbt like this:

Scala
sbt
> doc-examples/run
The source code for this page can be found here.