Elasticsearch examples

Example: Index all data from an RDBMS table into Elasticsearch

  • Instantiate a Slick database session using the config parameters defined in key slick-h2-mem and mount closing it on shutdown of the Actor System (1)
  • Scala only: Slick definition of the MOVIE table (2)
  • Class that holds the Movie data (3)
  • Instantiate Elastic REST client (4)
  • Scala: Instantiate the Spray json format that converts the Movie case class to json (5)
  • Java: Instantiate the Jackson Object mapper that converts the Movie class to json (5)
  • Construct the Slick Source for the H2 table and query all data in the table (6)
  • Scala only: Map each tuple into a Movie case class instance (7)
  • The first argument of the IncomingMessage is the id of the document. Replace with None if you would Elastic to generate one (8)
  • Prepare the Elastic Sink that the data needs to be drained to (9)
  • Close the Elastic client upon completion of indexing the data (10)
Scala
import akka.Done

import akka.stream.alpakka.elasticsearch.IncomingMessage
import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.RestClient

import akka.stream.alpakka.slick.javadsl.SlickSession
import akka.stream.alpakka.slick.scaladsl.Slick

import spray.json.DefaultJsonProtocol._
import spray.json.JsonFormat

import scala.concurrent.Future
import scala.concurrent.duration._

implicit val session = SlickSession.forConfig("slick-h2-mem")                         // (1)
actorSystem.registerOnTermination(session.close())

import session.profile.api._
class Movies(tag: Tag) extends Table[(Int, String, String, Double)](tag, "MOVIE") {   // (2)
  def id = column[Int]("ID")
  def title = column[String]("TITLE")
  def genre = column[String]("GENRE")
  def gross = column[Double]("GROSS")

  override def * = (id, title, genre, gross)
}

case class Movie(id: Int, title: String, genre: String, gross: Double)                // (3)

implicit val elasticSearchClient: RestClient =
  RestClient.builder(new HttpHost("localhost", 9201)).build()                         // (4)
implicit val format: JsonFormat[Movie] = jsonFormat4(Movie)                           // (5)

val done: Future[Done] =
  Slick
    .source(TableQuery[Movies].result)                                                // (6)
    .map {                                                                            // (7)
      case (id, genre, title, gross) => Movie(id, genre, title, gross)
    }
    .map(movie => IncomingMessage(Option(movie.id).map(_.toString), movie))           // (8)
    .runWith(ElasticsearchSink.create[Movie]("movie", "boxoffice"))                   // (9)

done.onComplete {
  case _ =>
    elasticSearchClient.close()                                                       // (10)
}
Java
import akka.Done;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;

import akka.stream.alpakka.elasticsearch.IncomingMessage;
import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSink;
import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSinkSettings;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;

import akka.stream.alpakka.slick.javadsl.Slick;
import akka.stream.alpakka.slick.javadsl.SlickRow;
import akka.stream.alpakka.slick.javadsl.SlickSession;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.util.concurrent.CompletionStage;


public class Movie {                                                             // (3)
    public final int id;
    public final String title;
    public final String genre;
    public final double gross;

    @JsonCreator
    public Movie(@JsonProperty("id") int id,
                 @JsonProperty("title") String title,
                 @JsonProperty("genre") String genre,
                 @JsonProperty("gross") double gross) {
        this.id = id;
        this.title = title;
        this.genre = genre;
        this.gross = gross;
    }
}


ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);

SlickSession session = SlickSession.forConfig("slick-h2-mem");                   // (1)
system.registerOnTermination(session::close);

RestClient elasticSearchClient =
    RestClient.builder(new HttpHost("localhost", 9201)).build();                 // (4)

final ObjectMapper objectToJsonMapper = new ObjectMapper();                      // (5)

final CompletionStage<Done> done =
    Slick
        .source(                                                                 // (6)
            session,
            "SELECT * FROM MOVIE",
            (SlickRow row) ->
                new Movie(row.nextInt(), row.nextString(), row.nextString(), row.nextDouble())
        )
        .map(movie -> IncomingMessage.create(String.valueOf(movie.id), movie))   // (8)
        .runWith(ElasticsearchSink
            .create(                                                             // (9)
                "movie",
                "boxoffice",
                new ElasticsearchSinkSettings(),
                elasticSearchClient,
                objectToJsonMapper
            ), materializer);

done.thenRunAsync(() -> {
    try {
        elasticSearchClient.close();                                             // (10)
    } catch (IOException ignored) {
        ignored.printStackTrace();
    }
}, system.dispatcher())

Full source Full source

Running the example code

This example is contained in a stand-alone runnable main, it can be run from sbt like this:

Scala
sbt
> doc-examples/run
The source code for this page can be found here.