CSV examples

Example: Fetch CSV from Internet and publish the data as JSON to Kafka

This example uses Akka HTTP to send the HTTP request and Akka HTTPs primary JSON support via Spray JSON to convert the map into a JSON structure.

  • (1) trigger an HTTP request every 30 seconds,
  • (2) send it to web server,
  • (3) continue with the response body as a stream of ByteString,
  • (4) scan the stream for CSV lines,
  • (5) convert the CSV lines into maps with the header line as keys,
  • (6) local logic to clean the data and convert values to Strings,
  • (7) convert the maps to JSON with Spray JSON from Akka HTTP
Scala
import akka.http.scaladsl._
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.alpakka.csv.scaladsl.{CsvParsing, CsvToMap}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{
  ByteArrayDeserializer,
  ByteArraySerializer,
  StringDeserializer,
  StringSerializer
}
import playground.ActorSystemAvailable
import spray.json.{DefaultJsonProtocol, JsValue, JsonWriter}

import scala.concurrent.duration.DurationInt

    Source                                                         // stream element type
      .tick(1.seconds, 30.seconds, httpRequest)                    //: HttpRequest             (1)
      .mapAsync(1)(Http().singleRequest(_))                        //: HttpResponse            (2)
      .flatMapConcat(extractEntityData)                            //: ByteString              (3)
      .via(CsvParsing.lineScanner())                               //: List[ByteString]        (4)
      .via(CsvToMap.toMap())                                       //: Map[String, ByteString] (5)
      .map(cleanseCsvData)                                         //: Map[String, String]     (6)
      .map(toJson)                                                 //: JsValue                 (7)
      .map(_.compactPrint)                                         //: String (JSON formatted)
      .map { elem =>
        new ProducerRecord[Array[Byte], String]("topic1", elem)    //: Kafka ProducerRecord
      }
      .toMat(Producer.plainSink(kafkaProducerSettings))(Keep.both)
      .run()

Full source

Helper code

Scala
val httpRequest = HttpRequest(uri = "http://www.nasdaq.com/screening/companies-by-name.aspx?exchange=NASDAQ&render=download")

def extractEntityData(response: HttpResponse): Source[ByteString, _] =
  response match {
    case HttpResponse(OK, _, entity, _) => entity.dataBytes
    case notOkResponse =>
      Source.failed(new RuntimeException(s"illegal response $notOkResponse"))
  }

def cleanseCsvData(csvData: Map[String, ByteString]): Map[String, String] =
  csvData
    .filterNot { case (key, _) => key.isEmpty }
    .mapValues(_.utf8String)

def toJson(map: Map[String, String])(
    implicit jsWriter: JsonWriter[Map[String, String]]): JsValue = jsWriter.write(map)

Running the example code

This example is contained in a stand-alone runnable main, it can be run from sbt like this:

Scala
sbt
> doc-examples/run
The source code for this page can be found here.