Avro Parquet
The Avro Parquet connector provides an Akka Stream Source, Sink and Flow for push and pull data to and from parquet files.
For more information about Apache Parquet please visit the official documentation.
Project Info: Alpakka Avro Parquet | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-avroparquet
2.0.2
|
JDK versions | Adopt OpenJDK 8 Adopt OpenJDK 11 |
Scala versions | 2.12.11, 2.13.3 |
JPMS module name | akka.stream.alpakka.avroparquet |
License | |
Readiness level |
Since 1.0-M1, 2018-11-06
|
Home page | https://doc.akka.io/docs/alpakka/current |
API documentation | |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
val AkkaVersion = "2.5.31" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-avroparquet" % "2.0.2", "com.typesafe.akka" %% "akka-stream" % AkkaVersion )
- Maven
<properties> <akka.version>2.5.31</akka.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-avroparquet_${scala.binary.version}</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency>
- Gradle
versions += [ AkkaVersion: "2.5.31", ScalaBinary: "2.12" ] dependencies { compile group: 'com.lightbend.akka', name: "akka-stream-alpakka-avroparquet_${versions.ScalaBinary}", version: '2.0.2', compile group: 'com.typesafe.akka', name: "akka-stream_${versions.ScalaBinary}", version: versions.AkkaVersion }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.typesafe.akka akka-stream_2.12 2.5.31 org.apache.parquet parquet-avro 1.10.1 org.scala-lang scala-library 2.12.11 - Dependency tree
com.typesafe.akka akka-stream_2.12 2.5.31 com.typesafe.akka akka-actor_2.12 2.5.31 com.typesafe config 1.3.3 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 com.typesafe.akka akka-protobuf_2.12 2.5.31 org.scala-lang scala-library 2.12.11 com.typesafe ssl-config-core_2.12 0.3.8 com.typesafe config 1.3.3 org.scala-lang.modules scala-parser-combinators_2.12 1.1.2 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 org.reactivestreams reactive-streams 1.0.2 org.scala-lang scala-library 2.12.11 org.apache.parquet parquet-avro 1.10.1 it.unimi.dsi fastutil 7.0.13 org.apache.avro avro 1.9.2 com.fasterxml.jackson.core jackson-core 2.10.2 com.fasterxml.jackson.core jackson-databind 2.10.2 com.fasterxml.jackson.core jackson-annotations 2.10.2 com.fasterxml.jackson.core jackson-core 2.10.2 org.apache.commons commons-compress 1.19 org.slf4j slf4j-api 1.7.30 org.apache.parquet parquet-column 1.10.1 commons-codec commons-codec 1.11 org.apache.parquet parquet-common 1.10.1 org.apache.parquet parquet-format 2.4.0 org.slf4j slf4j-api 1.7.30 org.slf4j slf4j-api 1.7.30 org.apache.parquet parquet-encoding 1.10.1 commons-codec commons-codec 1.11 org.apache.parquet parquet-common 1.10.1 org.apache.parquet parquet-format 2.4.0 org.slf4j slf4j-api 1.7.30 org.slf4j slf4j-api 1.7.30 org.apache.parquet parquet-format 2.4.0 org.slf4j slf4j-api 1.7.30 org.apache.parquet parquet-hadoop 1.10.1 commons-pool commons-pool 1.6 org.apache.parquet parquet-column 1.10.1 commons-codec commons-codec 1.11 org.apache.parquet parquet-common 1.10.1 org.apache.parquet parquet-format 2.4.0 org.slf4j slf4j-api 1.7.30 org.slf4j slf4j-api 1.7.30 org.apache.parquet parquet-encoding 1.10.1 commons-codec commons-codec 1.11 org.apache.parquet parquet-common 1.10.1 org.apache.parquet parquet-format 2.4.0 org.slf4j slf4j-api 1.7.30 org.slf4j slf4j-api 1.7.30 org.apache.parquet parquet-format 2.4.0 org.slf4j slf4j-api 1.7.30 org.apache.parquet parquet-jackson 1.10.1 org.codehaus.jackson jackson-core-asl 1.9.13 org.codehaus.jackson jackson-mapper-asl 1.9.13 org.codehaus.jackson jackson-core-asl 1.9.13 org.xerial.snappy snappy-java 1.1.2.6 org.scala-lang scala-library 2.12.11
Source Initiation
Sometimes it might be useful to use parquet file as stream Source. For this we will need to create AvroParquetReader
instance which will produce records as a subtypes of GenericRecord
, the avro’s record abstract representation.
- Scala
-
import org.apache.hadoop.conf.Configuration import org.apache.parquet.avro.AvroReadSupport val conf: Configuration = new Configuration() conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true) val reader: ParquetReader[GenericRecord] = AvroParquetReader.builder[GenericRecord](HadoopInputFile.fromPath(new Path(file), conf)).withConf(conf).build()
- Java
-
import org.apache.parquet.hadoop.ParquetReader; import org.apache.avro.generic.GenericRecord; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.hadoop.fs.Path; import org.apache.avro.Schema; import akka.stream.javadsl.Source; import org.apache.parquet.avro.AvroParquetReader; Configuration conf = new Configuration(); ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder( HadoopInputFile.fromPath(new Path("./test.parquet"), conf)) .disableCompatibility() .build();
After that, you can create the parquet Source from the initialisation of AvroParquetReader
, this object requires an instance of a org.apache.parquet.hadoop.ParquetReader
typed by a subtype of GenericRecord
.
- Scala
-
val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader) val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
- Java
-
Source<GenericRecord, NotUsed> source = AvroParquetSource.create(reader);
Sink Initiation
On the other hand, you can use AvroParquetWriter
, as the akka streams Sink implementation for writing to parquet. In that case, its initialisation would require an instance of org.apache.parquet.hadoop.ParquetWriter
, it will also expect any subtype of GenericRecord
to be passed.
- Scala
-
import com.sksamuel.avro4s.Record import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroReadSupport val file: String = "./sample/path/test.parquet" val conf: Configuration = new Configuration() conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true) val writer: ParquetWriter[Record] = AvroParquetWriter.builder[Record](new Path(file)).withConf(conf).withSchema(schema).build()
- Java
-
import org.apache.parquet.hadoop.ParquetWriter; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.util.HadoopInputFile; Configuration conf = new Configuration(); conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true); ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new Path(file)) .withConf(conf) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .withSchema(schema) .build();
After that, the AvroParquet Sink can already be used.
The below scala example demonstrates that any subtype of GenericRecord
can be passed to the stream, in this case the one used is com.sksamuel.avro4s.Record
, which it implements the GenericRecord
avro interface. See Avro4s or Avrohugger between other ways of generating these classes.
- Scala
-
val records: List[Record] = documents.map(RecordFormat[Document].to(_)) val source: Source[Record, NotUsed] = Source(records) val result: Future[Done] = source .runWith(AvroParquetSink(writer))
- Java
-
Sink<GenericRecord, CompletionStage<Done>> sink = AvroParquetSink.create(writer);
Flow Initiation
The representation of a ParquetWriter
as a Flow is also available to use as a streams flow stage, in which as well as per the other representations, it will expect subtypes of the Parquet GenericRecord
type to be passed. In which as a result, writes into a Parquet file and return the same GenericRecord
s. Such Flow stage can be easily created by using the AvroParquetFlow
and providing an AvroParquetWriter
instance as parameter.
- Scala
-
This is all preparation that we are going to need.val records: List[GenericRecord] val source: Source[GenericRecord, NotUsed] = Source(records) val avroParquet: Flow[GenericRecord, GenericRecord, NotUsed] = AvroParquetFlow(writer) val result = source .via(avroParquet) .runWith(Sink.seq)
- Java
-
ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new Path("./test.parquet")) .withConf(conf) .withSchema(schema) .build(); Flow<GenericRecord, GenericRecord, NotUsed> flow = AvroParquetFlow.create(writer); source.via(flow).runWith(Sink.ignore(), materializer);
Running the example code
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
- Scala
-
sbt > avroparquet/test