CSV

Example: Fetch CSV from Internet and publish the data as JSON to Kafka

This example uses Akka HTTP to send the HTTP request and Akka HTTPs primary JSON support via Spray JSON to convert the map into a JSON structure. Jackson JSON generator to convert the map into a JSON-formatted string.

  • (1) trigger an HTTP request every 30 seconds,
  • (2) send it to web server,
  • (3) continue with the response body as a stream of ByteString,
  • (4) scan the stream for CSV lines,
  • (5) convert the CSV lines into maps with the header line as keys,
  • (6) local logic to clean the data and convert values to Strings,
  • (7) convert the maps to JSON with Spray JSON from Akka HTTPJackson
  • (8) create a Kafka producer record
Scala
import akka.http.scaladsl._
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.headers.Accept
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, MediaRanges}
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.alpakka.csv.scaladsl.{CsvParsing, CsvToMap}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import playground.{ActorSystemAvailable, KafkaEmbedded}
import spray.json.{DefaultJsonProtocol, JsValue, JsonWriter}

import scala.concurrent.duration.DurationInt

Source                                                         // stream element type
  .tick(1.seconds, 30.seconds, httpRequest)                    //: HttpRequest             (1)
  .mapAsync(1)(Http().singleRequest(_))                        //: HttpResponse            (2)
  .flatMapConcat(extractEntityData)                            //: ByteString              (3)
  .via(CsvParsing.lineScanner())                               //: List[ByteString]        (4)
  .via(CsvToMap.toMap())                                       //: Map[String, ByteString] (5)
  .map(cleanseCsvData)                                         //: Map[String, String]     (6)
  .map(toJson)                                                 //: JsValue                 (7)
  .map(_.compactPrint)                                         //: String (JSON formatted)
  .map { elem =>
    new ProducerRecord[String, String]("topic1", elem)         //: Kafka ProducerRecord    (8)
  }
  .toMat(Producer.plainSink(kafkaProducerSettings))(Keep.both)
  .run()
Full source at GitHub
Java
import akka.Done;
import akka.actor.ActorSystem;
import akka.actor.Cancellable;

import akka.http.javadsl.Http;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.model.MediaRanges;
import akka.http.javadsl.model.StatusCodes;
import akka.http.javadsl.model.headers.Accept;

import akka.japi.Pair;

import akka.kafka.ConsumerSettings;
import akka.kafka.ProducerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Consumer;
import akka.kafka.javadsl.Producer;

import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.alpakka.csv.javadsl.CsvParsing;
import akka.stream.alpakka.csv.javadsl.CsvToMap;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.ByteString;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.IOException;
import java.io.StringWriter;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

Source.tick(
        Duration.ofSeconds(1),
        Duration.ofSeconds(30),
        httpRequest) // : HttpRequest             (1)
    .mapAsync(1, http::singleRequest) // : HttpResponse            (2)
    .flatMapConcat(this::extractEntityData) // : ByteString              (3)
    .via(CsvParsing.lineScanner()) // : List<ByteString>        (4)
    .via(CsvToMap.toMap()) // : Map<String, ByteString> (5)
    .map(this::cleanseCsvData) // : Map<String, String>     (6)
    .map(this::toJson) // : String                  (7)
    .map(
        elem ->
            new ProducerRecord<String, String>(
                "topic1", elem) // : Kafka ProducerRecord    (8)
        )
    .toMat(Producer.plainSink(kafkaProducerSettings), Keep.both())
    .run(materializer);
sample
Full source at GitHub

Helper code

Scala
val httpRequest = HttpRequest(uri = "https://www.nasdaq.com/screening/companies-by-name.aspx?exchange=NASDAQ&render=download")
  .withHeaders(Accept(MediaRanges.`text/*`))

def extractEntityData(response: HttpResponse): Source[ByteString, _] =
  response match {
    case HttpResponse(OK, _, entity, _) => entity.dataBytes
    case notOkResponse =>
      Source.failed(new RuntimeException(s"illegal response $notOkResponse"))
  }

def cleanseCsvData(csvData: Map[String, ByteString]): Map[String, String] =
  csvData
    .filterNot { case (key, _) => key.isEmpty }
    .mapValues(_.utf8String)

def toJson(map: Map[String, String])(
    implicit jsWriter: JsonWriter[Map[String, String]]): JsValue = jsWriter.write(map)
Full source at GitHub
Java
final HttpRequest httpRequest =
    HttpRequest.create(
            "https://www.nasdaq.com/screening/companies-by-name.aspx?exchange=NASDAQ&render=download")
        .withHeaders(Collections.singletonList(Accept.create(MediaRanges.ALL_TEXT)));

private Source<ByteString, ?> extractEntityData(HttpResponse httpResponse) {
  if (httpResponse.status() == StatusCodes.OK) {
    return httpResponse.entity().getDataBytes();
  } else {
    return Source.failed(new RuntimeException("illegal response " + httpResponse));
  }
}

private Map<String, String> cleanseCsvData(Map<String, ByteString> map) {
  Map<String, String> out = new HashMap<>(map.size());
  map.forEach(
      (key, value) -> {
        if (!key.isEmpty()) out.put(key, value.utf8String());
      });
  return out;
}

private final JsonFactory jsonFactory = new JsonFactory();

private String toJson(Map<String, String> map) throws Exception {
  StringWriter sw = new StringWriter();
  JsonGenerator generator = jsonFactory.createGenerator(sw);
  generator.writeStartObject();
  map.forEach(
      (key, value) -> {
        try {
          generator.writeStringField(key, value);
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
      });
  generator.writeEndObject();
  generator.close();
  return sw.toString();
}
Full source at GitHub

Running the example code

This example is contained in a stand-alone runnable main, it can be run from sbt like this:

Scala
sbt
> doc-examples/run
The source code for this page can be found here.