Apache Solr

The Solr connector provides Akka Stream sources and sinks for Solr.

For more information about Solr please visit the Solr documentation.

Reported issues

Tagged issues at Github

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-solr" % "0.20"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-solr_2.12</artifactId>
  <version>0.20</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-solr_2.12', version: '0.20'
}

Set up client

Sources, Flows and Sinks provided by this connector need a prepared org.apache.solr.client.solrj.SolrClient to access to Solr.

Scala
import org.apache.solr.client.solrj.impl.CloudSolrClient

val zkHost = "127.0.0.1:9984/solr"
implicit val client: SolrClient = new CloudSolrClient.Builder().withZkHost(zkHost).build
Full source at GitHub
Java
zkHost = "127.0.0.1:9984/solr";
client = new CloudSolrClient.Builder().withZkHost(zkHost).build();
Full source at GitHub

Source Usage

Create a tuple stream.

Scala
val factory = new StreamFactory().withCollectionZkHost(collection, zkHost)
val solrClientCache = new SolrClientCache()
val streamContext = new StreamContext()
streamContext.setSolrClientCache(solrClientCache)

val expression = StreamExpressionParser.parse(s"""search($collection, q=*:*, fl="title", sort="title asc")""")
val stream: TupleStream = new CloudSolrStream(expression, factory)
stream.setStreamContext(streamContext)
Full source at GitHub
Java
StreamFactory factory = new StreamFactory().withCollectionZkHost(collection, zkHost);
SolrClientCache solrClientCache = new SolrClientCache();
StreamContext streamContext = new StreamContext();
streamContext.setSolrClientCache(solrClientCache);

String expressionStr =
    String.format("search(%s, q=*:*, fl=\"title\", sort=\"title asc\")", collection);
StreamExpression expression = StreamExpressionParser.parse(expressionStr);
TupleStream stream = new CloudSolrStream(expression, factory);
stream.setStreamContext(streamContext);
Full source at GitHub

Use SolrSource.create to create SolrSource. SolrSource.

Scala
val source = SolrSource
  .fromTupleStream(ts = stream)
Full source at GitHub
Java
Source<Tuple, NotUsed> source = SolrSource.fromTupleStream(stream);
Full source at GitHub

Sink Usage

Now we can stream messages to Solr by providing the SolrClient to the SolrSink. SolrSink.

Scala
case class Book(title: String)

val bookToDoc: Book => SolrInputDocument = { b =>
  val doc = new SolrInputDocument
  doc.setField("title", b.title)
  doc
}

val tupleToBook: Tuple => Book = { t =>
  val title = t.getString("title")
  Book(title)
}
Full source at GitHub
Java
public static class Book {
  public String title;

  public Book() {}

  public Book(String title) {
    this.title = title;
  }
}

Function<Book, SolrInputDocument> bookToDoc =
    book -> {
      SolrInputDocument doc = new SolrInputDocument();
      doc.setField("title", book.title);
      return doc;
    };

Function<Tuple, Book> tupleToBook =
    tuple -> {
      String title = tuple.getString("title");
      return new Book(title);
    };
Full source at GitHub

With document sink

Use SolrSink.document to stream SolrInputDocument to Solr.

Scala
val f1 = SolrSource
  .fromTupleStream(ts = stream)
  .map { tuple: Tuple =>
    val book: Book = tupleToBook(tuple)
    val doc: SolrInputDocument = bookToDoc(book)
    IncomingMessage(doc)
  }
  .runWith(
    SolrSink.document(
      collection = "collection2",
      settings = SolrUpdateSettings(commitWithin = 5)
    )
  )
Full source at GitHub
Java
SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(5);
CompletionStage<Done> f1 =
    SolrSource.fromTupleStream(stream)
        .map(
            tuple -> {
              Book book = tupleToBook.apply(tuple);
              SolrInputDocument doc = bookToDoc.apply(book);
              return IncomingMessage.create(doc);
            })
        .runWith(SolrSink.document("collection2", settings, client), materializer);
Full source at GitHub

With bean sink

Firstly, create a POJO.

Scala
import org.apache.solr.client.solrj.beans.Field
import scala.annotation.meta.field
case class BookBean(@(Field @field) title: String)
Full source at GitHub
Java
class BookBean {
  @Field("title")
  public String title;

  public BookBean(String title) {
    this.title = title;
  }
}
Full source at GitHub

Use SolrSink.bean to stream POJOs to Solr.

Scala
val res1 = SolrSource
  .fromTupleStream(ts = stream)
  .map { tuple: Tuple =>
    val title = tuple.getString("title")
    IncomingMessage(BookBean(title))
  }
  .runWith(
    SolrSink.bean[BookBean](
      collection = "collection3",
      settings = SolrUpdateSettings(commitWithin = 5)
    )
  )
Full source at GitHub
Java
SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(5);
CompletionStage<Done> f1 =
    SolrSource.fromTupleStream(stream)
        .map(
            tuple -> {
              String title = tuple.getString("title");
              return IncomingMessage.create(new BookBean(title));
            })
        .runWith(SolrSink.bean("collection3", settings, client, BookBean.class), materializer);
Full source at GitHub

With typed sink

Use SolrSink.typed to stream messages with custom binding to Solr.

Scala
val res1 = SolrSource
  .fromTupleStream(ts = stream)
  .map { tuple: Tuple =>
    val book: Book = tupleToBook(tuple)
    IncomingMessage(book)
  }
  .runWith(
    SolrSink
      .typed[Book](
        collection = "collection4",
        settings = SolrUpdateSettings(commitWithin = 5),
        binder = bookToDoc
      )
  )
Full source at GitHub
Java
SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(5);
CompletionStage<Done> f1 =
    SolrSource.fromTupleStream(stream)
        .map(tuple -> IncomingMessage.create(tupleToBook.apply(tuple)))
        .runWith(
            SolrSink.typed("collection4", settings, bookToDoc, client, Book.class),
            materializer);
Full source at GitHub

Configuration

We can configure the sink by SolrUpdateSettings.

Scala
import akka.stream.alpakka.solr.SolrUpdateSettings

val settings =
  SolrUpdateSettings(bufferSize = 10, retryInterval = 5000.millis, maxRetry = 100, commitWithin = -1)
Full source at GitHub
Java
SolrUpdateSettings settings =
    SolrUpdateSettings.create()
        .withBufferSize(10)
        .withRetryInterval(FiniteDuration.create(5000, TimeUnit.MILLISECONDS))
        .withMaxRetry(100)
        .withCommitWithin(-1);
Full source at GitHub
Parameter Default Description
bufferSize 10 SolrSink puts messages by one bulk request per messages of this buffer size.
retryInterval 5000 When a request is failed, SolrSink retries that request after this interval (milliseconds).
maxRetry 100 SolrSink give up and fails the stage if it gets this number of consective failures.
commitWithin -1 Max time (in ms) before a commit will happen, -1 for manual committing

Flow Usage

You can also build flow stages with SolrFlow. SolrFlow. The API is similar to creating Sinks.

Scala
val res1 = SolrSource
  .fromTupleStream(ts = stream)
  .map { tuple: Tuple =>
    val book: Book = tupleToBook(tuple)
    IncomingMessage(book)
  }
  .via(
    SolrFlow
      .typed[Book](
        collection = "collection5",
        settings = SolrUpdateSettings(commitWithin = 5),
        binder = bookToDoc
      )
  )
  .runWith(Sink.seq)
Full source at GitHub
Java
SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(5);
CompletionStage<Done> f1 =
    SolrSource.fromTupleStream(stream)
        .map(tuple -> IncomingMessage.create(tupleToBook.apply(tuple)))
        .via(SolrFlow.typed("collection5", settings, bookToDoc, client, Book.class))
        .runWith(Sink.ignore(), materializer);
Full source at GitHub

Passing data through SolrFlow

Use SolrFlow.documentWithPassThrough, SolrFlow.beanWithPassThrough or SolrFlow.typedWithPassThrough.

When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to Solr.

Scala
// We're going to pretend we got messages from kafka.
// After we've written them to Solr, we want
// to commit the offset to Kafka

case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)

val messagesFromKafka = List(
  KafkaMessage(Book("Book 1"), KafkaOffset(0)),
  KafkaMessage(Book("Book 2"), KafkaOffset(1)),
  KafkaMessage(Book("Book 3"), KafkaOffset(2))
)

var committedOffsets = List[KafkaOffset]()

def commitToKakfa(offset: KafkaOffset): Unit =
  committedOffsets = committedOffsets :+ offset

val res1 = Source(messagesFromKafka)
  .map { kafkaMessage: KafkaMessage =>
    val book = kafkaMessage.book
    println("title: " + book.title)

    // Transform message so that we can write to solr
    IncomingMessage(book, kafkaMessage.offset)
  }
  .via( // write to Solr
    SolrFlow.typedWithPassThrough[Book, KafkaOffset](
      collection = "collection6",
      settings = SolrUpdateSettings(commitWithin = 5),
      binder = bookToDoc
    )
  )
  .map { messageResults =>
    messageResults.foreach { result =>
      if (result.status != 0)
        throw new Exception("Failed to write message to Solr")
      // Commit to kafka
      commitToKakfa(result.passThrough)
    }
  }
  .runWith(Sink.ignore)
Full source at GitHub
Java
// We're going to pretend we got messages from kafka.
// After we've written them to Solr, we want
// to commit the offset to Kafka

List<KafkaMessage> messagesFromKafka =
    Arrays.asList(
        new KafkaMessage(new Book("Book 1"), new KafkaOffset(0)),
        new KafkaMessage(new Book("Book 2"), new KafkaOffset(1)),
        new KafkaMessage(new Book("Book 3"), new KafkaOffset(2)));

final KafkaCommitter kafkaCommitter = new KafkaCommitter();

SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(5);

Source.from(messagesFromKafka) // Assume we get this from Kafka
    .map(
        kafkaMessage -> {
          Book book = kafkaMessage.book;
          // Transform message so that we can write to elastic
          return IncomingMessage.create(book, kafkaMessage.offset);
        })
    .via(SolrFlow.typedWithPassThrough("collection6", settings, bookToDoc, client, Book.class))
    .map(
        messageResults -> {
          messageResults
              .stream()
              .forEach(
                  result -> {
                    if (result.status() != 0) {
                      throw new RuntimeException("Failed to write message to elastic");
                    }
                    // Commit to kafka
                    kafkaCommitter.commit(result.passThrough());
                  });
          return NotUsed.getInstance();
        })
    .runWith(Sink.seq(), materializer) // Run it
    .toCompletableFuture()
    .get(); // Wait for it to complete
Full source at GitHub

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> solr/testOnly *.SolrSpec
Java
sbt
> solr/testOnly *.SolrTest
The source code for this page can be found here.