Avro Parquet

The Avro Parquet connector provides an Akka Stream Source, Sink and Flow for push and pull data to and from parquet files.

For more information about Apache Parquet please visit the official documentation.

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-avroparquet" % "1.0-M1"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-avroparquet_2.12</artifactId>
  <version>1.0-M1</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-avroparquet_2.12', version: '1.0-M1'
}

Usage

We will need an ActorSystem and an ActorMaterializer.

Scala
implicit val system: ActorSystem = ActorSystem()
implicit val mat: ActorMaterializer = ActorMaterializer()
Full source at GitHub
Java
ActorSystem system = ActorSystem.create();
Full source at GitHub

Source Initiation

Sometimes it might be useful to use parquet file as stream Source. For this we will need to create AvroParquetReader instance which produces Parquet GenericRecord instances.

Scala
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.hadoop.ParquetReader
import org.apache.parquet.avro.AvroReadSupport
val file = folder + "/test.parquet"

val conf = new Configuration()
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)
val reader: ParquetReader[GenericRecord] =
  AvroParquetReader.builder[GenericRecord](HadoopInputFile.fromPath(new Path(file), conf)).withConf(conf).build()
Full source at GitHub
Java
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.hadoop.fs.Path;
import org.apache.avro.Schema;
import akka.stream.javadsl.Source;
import org.apache.parquet.avro.AvroParquetReader;
Configuration conf = new Configuration();

ParquetReader<GenericRecord> reader =
    AvroParquetReader.<GenericRecord>builder(
            HadoopInputFile.fromPath(new Path("./test.parquet"), conf))
        .disableCompatibility()
        .build();
Full source at GitHub

After it, you can create your Source object which accepts instance of AvroParquetReader as parameter

Scala
val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
Full source at GitHub
Java
Source<GenericRecord, NotUsed> source = AvroParquetSource.create(reader);
Full source at GitHub

Sink Initiation

Sometimes it might be useful to use Parquet file as akka stream Sink. For an instance, if you need to store data on Parquet files on HDFS (or any other distributed file system) and perform map-reduce jobs on it further. For this we first of all need to create AvroParquetWriter instance which accepts GenericRecord.

Scala
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.avro.{AvroParquetWriter, AvroReadSupport}
val file = folder + "/test.parquet"

val conf = new Configuration()
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)

val writer: ParquetWriter[GenericRecord] =
  AvroParquetWriter.builder[GenericRecord](new Path(file)).withConf(conf).withSchema(schema).build()
Full source at GitHub
Java
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.util.HadoopInputFile;
Configuration conf = new Configuration();
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true);
ParquetWriter<GenericRecord> writer =
    AvroParquetWriter.<GenericRecord>builder(new Path(file))
        .withConf(conf)
        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
        .withSchema(schema)
        .build();
Full source at GitHub

After it, you can create Sink which accepts instance of AvroParquetWriter as parameter.

Scala
val sink: Sink[GenericRecord, Future[Done]] = AvroParquetSink(writer)
Full source at GitHub
Java
Sink<GenericRecord, CompletionStage<Done>> sink = AvroParquetSink.create(writer);
Full source at GitHub

Flow Initiation

It might be useful to use ParquetWriter as the streams flow stage, which accepts Parquet GenericRecord, writes it to Parquet file, and returns the same GenericRecords. Such Flow stage can be easily created by creating AvroParquetFlow instance and providing AvroParquetWriter instance as parameter.

Scala
val flow: Flow[GenericRecord, GenericRecord, NotUsed] = AvroParquetFlow(writer)

val result = source
  .map(f => docToRecord(f))
  .via(flow)
  .runWith(Sink.ignore)
Full source at GitHubThis is all preparation that we are going to need.
Java
ParquetWriter<GenericRecord> writer =
    AvroParquetWriter.<GenericRecord>builder(new Path("./test.parquet"))
        .withConf(conf)
        .withSchema(schema)
        .build();

Flow<GenericRecord, GenericRecord, NotUsed> flow = AvroParquetFlow.create(writer);

source.via(flow).runWith(Sink.ignore(), materializer);
Full source at GitHub

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> avroparquet/test
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.