Hadoop Distributed File System - HDFS

The connector offers Flows and Sources that interact with HDFS file systems.

For more information about Hadoop, please visit the Hadoop documentation.

Reported issues

Tagged issues at Github

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-hdfs" % "0.20"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-hdfs_2.12</artifactId>
  <version>0.20</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-hdfs_2.12', version: '0.20'
}

Specifying a Hadoop Version

By default, HDFS connector uses Hadoop 3.1.0. If you are using a different version of Hadoop, you should exclude the Hadoop libraries from the connector dependency and add the dependency for your preferred version.

Set up client

Flows provided by this connector need a prepared org.apache.hadoop.fs.FileSystem to interact with HDFS.

Scala
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem

val conf = new Configuration()
conf.set("fs.default.name", "hdfs://localhost:54310")

val fs: FileSystem = FileSystem.get(conf)
Full source at GitHub
Java
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:54310");

fs = FileSystem.get(conf);
Full source at GitHub

Writing

The connector provides three Flows. Each flow requires RotationStrategy and SyncStrategy to run. HdfsFlow. HdfsFlow.

The flows push OutgoingMessage to a downstream.

Data Writer

Use HdfsFlow.data to stream with FSDataOutputStream without any compression.

Scala
val flow = HdfsFlow.data(
  fs,
  SyncStrategy.count(500),
  RotationStrategy.size(1, FileUnit.GB),
  HdfsWritingSettings()
)
Full source at GitHub
Java
Flow<HdfsWriteMessage<ByteString, NotUsed>, RotationMessage, NotUsed> flow =
    HdfsFlow.data(
        fs, SyncStrategy.count(500), RotationStrategy.size(1, FileUnit.GB()), settings);
Full source at GitHub

Compressed Data Writer

First, create CompressionCodec.

Scala
val codec = new DefaultCodec()
codec.setConf(fs.getConf)
Full source at GitHub
Java
DefaultCodec codec = new DefaultCodec();
codec.setConf(fs.getConf());
Full source at GitHub

Then, use HdfsFlow.compress to stream with CompressionOutputStream and CompressionCodec.

Scala
val flow = HdfsFlow.compressed(
  fs,
  SyncStrategy.count(1),
  RotationStrategy.size(0.1, FileUnit.MB),
  codec,
  settings
)
Full source at GitHub
Java
Flow<HdfsWriteMessage<ByteString, NotUsed>, RotationMessage, NotUsed> flow =
    HdfsFlow.compressed(
        fs, SyncStrategy.count(50), RotationStrategy.size(0.1, FileUnit.MB()), codec, settings);
Full source at GitHub

Sequence Writer

Use HdfsFlow.sequence to stream a flat file consisting of binary key/value pairs.

Without Compression

Scala
val flow = HdfsFlow.sequence(
  fs,
  SyncStrategy.none,
  RotationStrategy.size(1, FileUnit.MB),
  settings,
  classOf[Text],
  classOf[Text]
)
Full source at GitHub
Java
Flow<HdfsWriteMessage<Pair<Text, Text>, NotUsed>, RotationMessage, NotUsed> flow =
    HdfsFlow.sequence(
        fs,
        SyncStrategy.none(),
        RotationStrategy.size(1, FileUnit.MB()),
        SequenceFile.CompressionType.BLOCK,
        codec,
        settings,
        Text.class,
        Text.class);
Full source at GitHub

With Compression

First, define a codec.

Scala
val codec = new DefaultCodec()
codec.setConf(fs.getConf)
Full source at GitHub
Java
DefaultCodec codec = new DefaultCodec();
codec.setConf(fs.getConf());
Full source at GitHub

Then, create a flow.

Scala
val flow = HdfsFlow.sequence(
  fs,
  SyncStrategy.none,
  RotationStrategy.size(1, FileUnit.MB),
  CompressionType.BLOCK,
  codec,
  settings,
  classOf[Text],
  classOf[Text]
)
Full source at GitHub
Java
Flow<HdfsWriteMessage<Pair<Text, Text>, NotUsed>, RotationMessage, NotUsed> flow =
    HdfsFlow.sequence(
        fs,
        SyncStrategy.none(),
        RotationStrategy.size(1, FileUnit.MB()),
        settings,
        Text.class,
        Text.class);
Full source at GitHub

Passing data through HdfsFlow

Use HdfsFlow.dataWithPassThrough, HdfsFlow.compressedWithPassThrough or HdfsFlow.sequenceWithPassThrough.

When streaming documents from Kafka, you might want to commit to Kafka. The flow will emit two messages. For every input, it will produce WrittenMessage and when it rotates, RotationMessage.

Let’s say that we have these classes.

Scala
case class Book(title: String)
case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)
Full source at GitHub
Java
public static class Book {
  final String title;

  Book(String title) {
    this.title = title;
  }
}

static class KafkaCommitter {
  List<Integer> committedOffsets = new ArrayList<>();

  void commit(KafkaOffset offset) {
    committedOffsets.add(offset.offset);
  }
}

static class KafkaOffset {
  final int offset;

  KafkaOffset(int offset) {
    this.offset = offset;
  }
}

static class KafkaMessage {
  final Book book;
  final KafkaOffset offset;

  KafkaMessage(Book book, KafkaOffset offset) {
    this.book = book;
    this.offset = offset;
  }
}
Full source at GitHub

Then, we can stream with passThrough.

Scala
// We're going to pretend we got messages from kafka.
// After we've written them to HDFS, we want
// to commit the offset to Kafka
val messagesFromKafka = List(
  KafkaMessage(Book("Akka Concurrency"), KafkaOffset(0)),
  KafkaMessage(Book("Akka in Action"), KafkaOffset(1)),
  KafkaMessage(Book("Effective Akka"), KafkaOffset(2)),
  KafkaMessage(Book("Learning Scala"), KafkaOffset(3)),
  KafkaMessage(Book("Scala Puzzlers"), KafkaOffset(4)),
  KafkaMessage(Book("Scala for Spark in Production"), KafkaOffset(5))
)

var committedOffsets = List[KafkaOffset]()

def commitToKafka(offset: KafkaOffset): Unit =
  committedOffsets = committedOffsets :+ offset

val resF = Source(messagesFromKafka)
  .map { kafkaMessage: KafkaMessage =>
    val book = kafkaMessage.book
    // Transform message so that we can write to hdfs
    HdfsWriteMessage(ByteString(book.title), kafkaMessage.offset)
  }
  .via(
    HdfsFlow.dataWithPassThrough[KafkaOffset](
      fs,
      SyncStrategy.count(50),
      RotationStrategy.count(4),
      HdfsWritingSettings(newLine = true)
    )
  )
  .map { message =>
    message match {
      case WrittenMessage(passThrough, _) =>
        commitToKafka(passThrough)
      case _ => ()
    }
    message
  }
  .collect {
    case rm: RotationMessage => rm
  }
  .runWith(Sink.seq)
Full source at GitHub
Java
// We're going to pretend we got messages from kafka.
// After we've written them to HDFS, we want
// to commit the offset to Kafka
List<KafkaMessage> messagesFromKafka =
    Arrays.asList(
        new KafkaMessage(new Book("Akka Concurrency"), new KafkaOffset(0)),
        new KafkaMessage(new Book("Akka in Action"), new KafkaOffset(1)),
        new KafkaMessage(new Book("Effective Akka"), new KafkaOffset(2)),
        new KafkaMessage(new Book("Learning Scala"), new KafkaOffset(3)),
        new KafkaMessage(new Book("Scala Puzzlers"), new KafkaOffset(4)),
        new KafkaMessage(new Book("Scala for Spark in Production"), new KafkaOffset(5)));

final KafkaCommitter kafkaCommitter = new KafkaCommitter();

Flow<HdfsWriteMessage<ByteString, KafkaOffset>, OutgoingMessage<KafkaOffset>, NotUsed> flow =
    HdfsFlow.dataWithPassThrough(
        fs,
        SyncStrategy.count(50),
        RotationStrategy.count(4),
        HdfsWritingSettings.create().withNewLine(true));

CompletionStage<List<RotationMessage>> resF =
    Source.from(messagesFromKafka)
        .map(
            kafkaMessage -> {
              Book book = kafkaMessage.book;
              // Transform message so that we can write to hdfs\
              return HdfsWriteMessage.create(
                  ByteString.fromString(book.title), kafkaMessage.offset);
            })
        .via(flow)
        .map(
            message -> {
              if (message instanceof WrittenMessage) {
                kafkaCommitter.commit(((WrittenMessage<KafkaOffset>) message).passThrough());
                return message;
              } else {
                return message;
              }
            })
        .collectType(RotationMessage.class) // Collect only rotation messages
        .runWith(Sink.seq(), materializer);
Full source at GitHub

Configuration

We can configure the sink by HdfsWritingSettings.

Scala
val settings =
  HdfsWritingSettings(
    overwrite = true,
    newLine = false,
    lineSeparator = System.getProperty("line.separator"),
    pathGenerator = pathGenerator
  )
Full source at GitHub
Java
HdfsWritingSettings.create()
    .withOverwrite(true)
    .withNewLine(false)
    .withLineSeparator(System.getProperty("line.separator"))
    .withPathGenerator(pathGenerator);
Full source at GitHub

File path generator

FilePathGenerator provides a functionality to generate rotation path in HDFS.

Scala
val pathGenerator =
  FilePathGenerator(
    (rotationCount: Long, timestamp: Long) => s"/tmp/alpakka/$rotationCount-$timestamp"
  )
Full source at GitHub
Java
BiFunction<Long, Long, String> func =
    (rotationCount, timestamp) -> "/tmp/alpakka/" + rotationCount + "-" + timestamp;
FilePathGenerator pathGenerator = FilePathGenerator.create(func);
Full source at GitHub

Rotation Strategy

RotationStrategy provides a functionality to decide when to rotate files.

Sync Strategy

SyncStrategy provides a functionality to decide when to synchronize the output.

Reading

Use HdfsSource to read from HDFS. HdfsSource. HdfsSource.

Data Reader

Scala
val source = HdfsSource.data(fs, path)
Full source at GitHub
Java
Source<ByteString, CompletionStage<IOResult>> source = HdfsSource.data(fs, path);
Full source at GitHub

Compressed Data Reader

Scala
val source = HdfsSource.compressed(fs, path, codec)
Full source at GitHub
Java
Source<ByteString, CompletionStage<IOResult>> source = HdfsSource.compressed(fs, path, codec);
Full source at GitHub

Sequence Reader

Scala
val source = HdfsSource.sequence(fs, path, classOf[Text], classOf[Text])
Full source at GitHub
Java
Source<Pair<Text, Text>, NotUsed> source =
    HdfsSource.sequence(fs, path, Text.class, Text.class);
Full source at GitHub

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> hdfs/testOnly *.HdfsWriterSpec
> hdfs/testOnly *.HdfsReaderSpec
Java
sbt
> hdfs/testOnly *.HdfsWriterTest
> hdfs/testOnly *.HdfsReaderTest
The source code for this page can be found here.