Apache Solr Connector

The Solr connector provides Akka Stream sources and sinks for Solr.

For more information about Solr please visit the Solr documentation.

Reported issues

Tagged issues at Github

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-solr" % "0.19"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-solr_2.12</artifactId>
  <version>0.19</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-solr_2.12', version: '0.19'
}

Set up client

Sources, Flows and Sinks provided by this connector need a prepared org.apache.solr.client.solrj.SolrClient to access to Solr.

Scala
import org.apache.solr.client.solrj.impl.CloudSolrClient

val zkHost = "127.0.0.1:9984/solr"
implicit val client: SolrClient = new CloudSolrClient.Builder().withZkHost(zkHost).build
Java
zkHost = "127.0.0.1:9984/solr";
client = new CloudSolrClient.Builder().withZkHost(zkHost).build();

Source Usage

Create a tuple stream.

Scala
val factory = new StreamFactory().withCollectionZkHost(collection, zkHost)
val solrClientCache = new SolrClientCache()
val streamContext = new StreamContext()
streamContext.setSolrClientCache(solrClientCache)

val expression = StreamExpressionParser.parse(s"""search($collection, q=*:*, fl="title", sort="title asc")""")
val stream: TupleStream = new CloudSolrStream(expression, factory)
stream.setStreamContext(streamContext)
Java
StreamFactory factory = new StreamFactory()
  .withCollectionZkHost(collection, zkHost);
SolrClientCache solrClientCache = new SolrClientCache();
StreamContext streamContext = new StreamContext();
streamContext.setSolrClientCache(solrClientCache);

String expressionStr = String.format("search(%s, q=*:*, fl=\"title\", sort=\"title asc\")", collection);
StreamExpression expression = StreamExpressionParser.parse(expressionStr);
TupleStream stream = new CloudSolrStream(expression, factory);
stream.setStreamContext(streamContext);

Use SolrSource.create to create SolrSource. SolrSource.

Scala
val source = SolrSource
  .fromTupleStream(ts = stream)
Java
Source<Tuple, NotUsed> source = SolrSource.fromTupleStream(stream);

Sink Usage

Now we can stream messages to Solr by providing the SolrClient to the SolrSink. SolrSink.

Scala
case class Book(title: String)

val bookToDoc: Book => SolrInputDocument = { b =>
  val doc = new SolrInputDocument
  doc.setField("title", b.title)
  doc
}

val tupleToBook: Tuple => Book = { t =>
  val title = t.getString("title")
  Book(title)
}
Java
public static class Book {
  public String title;

  public Book() {
  }

  public Book(String title) {
    this.title = title;
  }
}

Function<Book, SolrInputDocument> bookToDoc = book -> {
  SolrInputDocument doc = new SolrInputDocument();
  doc.setField("title", book.title);
  return doc;
};

Function<Tuple, Book> tupleToBook = tuple -> {
  String title = tuple.getString("title");
  return new Book(title);
};

With document sink

Use SolrSink.document to stream SolrInputDocument to Solr.

Scala
val f1 = SolrSource
  .fromTupleStream(ts = stream)
  .map { tuple: Tuple =>
    val book: Book = tupleToBook(tuple)
    val doc: SolrInputDocument = bookToDoc(book)
    IncomingMessage(doc)
  }
  .runWith(
    SolrSink.document(
      collection = "collection2",
      settings = SolrUpdateSettings(commitWithin = 5)
    )
  )
Java
SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(5);
CompletionStage<Done> f1 = SolrSource.fromTupleStream(stream)
  .map(tuple -> {
    Book book = tupleToBook.apply(tuple);
    SolrInputDocument doc = bookToDoc.apply(book);
    return IncomingMessage.create(doc);
  }).runWith(
    SolrSink.document(
      "collection2",
      settings,
      client
    ),
    materializer
  );

With bean sink

Firstly, create a POJO.

Scala
import org.apache.solr.client.solrj.beans.Field
import scala.annotation.meta.field
case class BookBean(@(Field @field) title: String)
Java
class BookBean {
  @Field("title")
  public String title;

  public BookBean(String title) {
    this.title = title;
  }
}

Use SolrSink.bean to stream POJOs to Solr.

Scala
val res1 = SolrSource
  .fromTupleStream(ts = stream)
  .map { tuple: Tuple =>
    val title = tuple.getString("title")
    IncomingMessage(BookBean(title))
  }
  .runWith(
    SolrSink.bean[BookBean](
      collection = "collection3",
      settings = SolrUpdateSettings(commitWithin = 5)
    )
  )
Java
SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(5);
CompletionStage<Done> f1 = SolrSource.fromTupleStream(stream)
  .map(tuple -> {
    String title = tuple.getString("title");
    return IncomingMessage.create(new BookBean(title));
  }).runWith(
    SolrSink.bean(
      "collection3",
      settings,
      client,
      BookBean.class
    ),
    materializer
  );

With typed sink

Use SolrSink.typed to stream messages with custom binding to Solr.

Scala
val res1 = SolrSource
  .fromTupleStream(ts = stream)
  .map { tuple: Tuple =>
    val book: Book = tupleToBook(tuple)
    IncomingMessage(book)
  }
  .runWith(
    SolrSink
      .typed[Book](
        collection = "collection4",
        settings = SolrUpdateSettings(commitWithin = 5),
        binder = bookToDoc
      )
  )
Java
SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(5);
CompletionStage<Done> f1 = SolrSource.fromTupleStream(stream)
  .map(tuple -> IncomingMessage.create(tupleToBook.apply(tuple)))
  .runWith(
    SolrSink.typed(
      "collection4",
      settings,
      bookToDoc,
      client,
      Book.class
    ),
    materializer
  );

Configuration

We can configure the sink by SolrUpdateSettings.

Scala
import akka.stream.alpakka.solr.SolrUpdateSettings

val settings =
  SolrUpdateSettings(bufferSize = 10, retryInterval = 5000.millis, maxRetry = 100, commitWithin = -1)
Java
SolrUpdateSettings settings =
 SolrUpdateSettings.create()
  .withBufferSize(10)
  .withRetryInterval(FiniteDuration.create(5000, TimeUnit.MILLISECONDS))
  .withMaxRetry(100)
  .withCommitWithin(-1);
Parameter Default Description
bufferSize 10 SolrSink puts messages by one bulk request per messages of this buffer size.
retryInterval 5000 When a request is failed, SolrSink retries that request after this interval (milliseconds).
maxRetry 100 SolrSink give up and fails the stage if it gets this number of consective failures.
commitWithin -1 Max time (in ms) before a commit will happen, -1 for manual committing

Flow Usage

You can also build flow stages with SolrFlow. SolrFlow. The API is similar to creating Sinks.

Scala
val res1 = SolrSource
  .fromTupleStream(ts = stream)
  .map { tuple: Tuple =>
    val book: Book = tupleToBook(tuple)
    IncomingMessage(book)
  }
  .via(
    SolrFlow
      .typed[Book](
        collection = "collection5",
        settings = SolrUpdateSettings(commitWithin = 5),
        binder = bookToDoc
      )
  )
  .runWith(Sink.seq)
Java
SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(5);
CompletionStage<Done> f1 = SolrSource.fromTupleStream(stream)
  .map(tuple -> IncomingMessage.create(tupleToBook.apply(tuple)))
  .via(
       SolrFlow.typed(
         "collection5",
         settings,
         bookToDoc,
         client,
         Book.class
       )
  )
  .runWith(Sink.ignore(), materializer);

Passing data through SolrFlow

Use SolrFlow.documentWithPassThrough, SolrFlow.beanWithPassThrough or SolrFlow.typedWithPassThrough.

When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to Solr.

Scala
// We're going to pretend we got messages from kafka.
// After we've written them to Solr, we want
// to commit the offset to Kafka

case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)

val messagesFromKafka = List(
  KafkaMessage(Book("Book 1"), KafkaOffset(0)),
  KafkaMessage(Book("Book 2"), KafkaOffset(1)),
  KafkaMessage(Book("Book 3"), KafkaOffset(2))
)

var committedOffsets = List[KafkaOffset]()

def commitToKakfa(offset: KafkaOffset): Unit =
  committedOffsets = committedOffsets :+ offset

val res1 = Source(messagesFromKafka)
  .map { kafkaMessage: KafkaMessage =>
    val book = kafkaMessage.book
    println("title: " + book.title)

    // Transform message so that we can write to solr
    IncomingMessage(book, kafkaMessage.offset)
  }
  .via( // write to Solr
    SolrFlow.typedWithPassThrough[Book, KafkaOffset](
      collection = "collection6",
      settings = SolrUpdateSettings(commitWithin = 5),
      binder = bookToDoc
    )
  )
  .map { messageResults =>
    messageResults.foreach { result =>
      if (result.status != 0)
        throw new Exception("Failed to write message to Solr")
      // Commit to kafka
      commitToKakfa(result.passThrough)
    }
  }
  .runWith(Sink.ignore)
Java
// We're going to pretend we got messages from kafka.
// After we've written them to Solr, we want
// to commit the offset to Kafka

List<KafkaMessage> messagesFromKafka = Arrays.asList(
  new KafkaMessage(new Book("Book 1"), new KafkaOffset(0)),
  new KafkaMessage(new Book("Book 2"), new KafkaOffset(1)),
  new KafkaMessage(new Book("Book 3"), new KafkaOffset(2))
);

final KafkaCommitter kafkaCommitter = new KafkaCommitter();

SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(5);

Source.from(messagesFromKafka) // Assume we get this from Kafka
  .map(kafkaMessage -> {
      Book book = kafkaMessage.book;
      // Transform message so that we can write to elastic
      return IncomingMessage.create(book, kafkaMessage.offset);
    })
  .via(
       SolrFlow.typedWithPassThrough(
         "collection6",
         settings,
         bookToDoc,
         client,
         Book.class
       )
  ).map(messageResults -> {
      messageResults.stream() .forEach(result -> {
        if (result.status() != 0) {
          throw new RuntimeException("Failed to write message to elastic");
        }
        // Commit to kafka
        kafkaCommitter.commit(result.passThrough());
      });
      return NotUsed.getInstance();
  }).runWith(Sink.seq(), materializer) // Run it
  .toCompletableFuture().get(); // Wait for it to complete

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> solr/testOnly *.SolrSpec
Java
sbt
> solr/testOnly *.SolrTest
The source code for this page can be found here.