Files

The File connectors provide additional connectors for filesystems complementing the sources and sinks for files already included in core Akka Streams (which can be found in akka.stream.javadsl.FileIOakka.stream.scaladsl.FileIO).

Reported issues

Tagged issues at Github

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-file" % "0.20"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-file_2.12</artifactId>
  <version>0.20</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-file_2.12', version: '0.20'
}

Writing to and reading from files

Use the FileIO class to create streams reading from or writing to files. It is part part of Akka streams.

Akka Streaming File IO documentation

Tailing a file into a stream

The FileTailSource starts at a given offset in a file and emits chunks of bytes until reaching the end of the file, it will then poll the file for changes and emit new changes as they are written to the file (unless there is backpressure).

A very common use case is combining reading bytes with parsing the bytes into lines, therefore FileTailSource contains a few factory methods to create a source that parses the bytes into lines and emits those.

In this sample we simply tail the lines of a file and print them to standard out:

Scala
val fs = FileSystems.getDefault
val lines: Source[String, NotUsed] = scaladsl.FileTailSource.lines(
  path = fs.getPath(path),
  maxLineSize = 8192,
  pollingInterval = 250.millis
)

lines.runForeach(line => System.out.println(line))
Full source at GitHub
Java
final FileSystem fs = FileSystems.getDefault();
final FiniteDuration pollingInterval = FiniteDuration.create(250, TimeUnit.MILLISECONDS);
final int maxLineSize = 8192;

final Source<String, NotUsed> lines =
    akka.stream.alpakka.file.javadsl.FileTailSource.createLines(
        fs.getPath(path), maxLineSize, pollingInterval);

lines.runForeach((line) -> System.out.println(line), materializer);
Full source at GitHub

Listing directory contents

Directory.ls(path) lists all files and directories directly in a given directory:

Scala
val source: Source[Path, NotUsed] = Directory.ls(dir)
Full source at GitHub
Java
final Source<Path, NotUsed> source = Directory.ls(dir);
Full source at GitHub

Directory.walk(path) traverses all subdirectories and lists files and directories depth first:

Scala
val files: Source[Path, NotUsed] = Directory.walk(root)
Full source at GitHub
Java
final Source<Path, NotUsed> source = Directory.walk(root);
final Source<Path, NotUsed> source = Directory.walk(root, 1, FileVisitOption.FOLLOW_LINKS);
Full source at GitHub

Listening to changes in a directory

The DirectoryChangesSource will emit elements every time there is a change to a watched directory in the local filesystem, the emitted change concists of the path that was changed and an enumeration describing what kind of change it was.

In this sample we simply print each change to the directory to standard output:

Scala
val fs = FileSystems.getDefault
val changes = DirectoryChangesSource(fs.getPath(path), pollInterval = 1.second, maxBufferSize = 1000)
changes.runForeach {
  case (path, change) => println("Path: " + path + ", Change: " + change)
}
Full source at GitHub
Java
final FileSystem fs = FileSystems.getDefault();
final FiniteDuration pollingInterval = FiniteDuration.create(1, TimeUnit.SECONDS);
final int maxBufferSize = 1000;
final Source<Pair<Path, DirectoryChange>, NotUsed> changes =
    DirectoryChangesSource.create(fs.getPath(path), pollingInterval, maxBufferSize);

changes.runForeach(
    (Pair<Path, DirectoryChange> pair) -> {
      final Path changedPath = pair.first();
      final DirectoryChange change = pair.second();
      System.out.println("Path: " + changedPath + ", Change: " + change);
    },
    materializer);
Full source at GitHub

Rotating the file to stream into

The LogRotatatorSink LogRotatatorSink will create and write to multiple files.
This sink will takes a function as parameter which returns a Bytestring => Option[Path] functionFunction<ByteString, Optional<Path>>. If the generated function returns a path the sink will rotate the file output to this new path and the actual ByteString will be written to this new file too. With this approach the user can define a custom stateful file generation implementation.

A small snippet for the usage

Scala
val pathGeneratorFunction: () => ByteString => Option[Path] = ???

val completion = Source(Seq("test1", "test2", "test3", "test4", "test5", "test6").toList)
  .map(ByteString(_))
  .runWith(LogRotatorSink(pathGeneratorFunction))
Full source at GitHub
Java
import akka.stream.alpakka.file.javadsl.LogRotatorSink;

Creator<Function<ByteString, Optional<Path>>> pathGeneratorCreator = ...;

CompletionStage<Done> completion =
    Source.from(Arrays.asList("test1", "test2", "test3", "test4", "test5", "test6"))
        .map(ByteString::fromString)
        .runWith(LogRotatorSink.createFromFunction(pathGeneratorCreator), materializer);
Full source at GitHub

Example: size-based rotation

Scala
val fileSizeRotationFunction = () => {
  val max = 10 * 1024 * 1024
  var size: Long = max
  (element: ByteString) =>
    {
      if (size + element.size > max) {
        val path = Files.createTempFile("out-", ".log")
        size = element.size
        Some(path)
      } else {
        size += element.size
        None
      }
    }
}

val sizeRotatorSink: Sink[ByteString, Future[Done]] =
  LogRotatorSink(fileSizeRotationFunction)
Full source at GitHub
Java
Creator<Function<ByteString, Optional<Path>>> sizeBasedPathGenerator =
    () -> {
      long max = 10 * 1024 * 1024;
      final long[] size = new long[] {max};
      return (element) -> {
        if (size[0] + element.size() > max) {
          Path path = Files.createTempFile("out-", ".log");
          size[0] = element.size();
          return Optional.of(path);
        } else {
          size[0] += element.size();
          return Optional.empty();
        }
      };
    };

Sink<ByteString, CompletionStage<Done>> sizeRotatorSink =
    LogRotatorSink.createFromFunction(sizeBasedPathGenerator);
Full source at GitHub

Example: time-based rotation

Scala
val destinationDir = FileSystems.getDefault.getPath("/tmp")
val formatter = DateTimeFormatter.ofPattern("'stream-'yyyy-MM-dd_HH'.log'")

val timeBasedRotationFunction = () => {
  var currentFilename: Option[String] = None
  (_: ByteString) =>
    {
      val newName = LocalDateTime.now().format(formatter)
      if (currentFilename.contains(newName)) {
        None
      } else {
        currentFilename = Some(newName)
        Some(destinationDir.resolve(newName))
      }
    }
}

val timeBasedSink: Sink[ByteString, Future[Done]] =
  LogRotatorSink(timeBasedRotationFunction)
Full source at GitHub
Java
final Path destinationDir = FileSystems.getDefault().getPath("/tmp");
final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("'stream-'yyyy-MM-dd_HH'.log'");

Creator<Function<ByteString, Optional<Path>>> timeBasedPathCreator =
    () -> {
      final String[] currentFileName = new String[] {null};
      return (element) -> {
        String newName = LocalDateTime.now().format(formatter);
        if (newName.equals(currentFileName[0])) {
          return Optional.empty();
        } else {
          currentFileName[0] = newName;
          return Optional.of(destinationDir.resolve(newName));
        }
      };
    };

Sink<ByteString, CompletionStage<Done>> timeBaseSink =
    LogRotatorSink.createFromFunction(timeBasedPathCreator);
Full source at GitHub

Running the example code

Both the samples are contained in standalone runnable mains, they can be run from sbt like this:

Scala
sbt
// tail source
> file/Test/runMain akka.stream.alpakka.file.scaladsl.FileTailSourceSpec /some/path/toa/file
// or directory changes
> file/Test/runMain akka.stream.alpakka.file.scaladsl.DirectoryChangesSourceSpec /some/directory/path
// File rotator
> file/Test/runMain akka.stream.alpakka.file.scaladsl.LogRotatorSinkTest
Java
sbt
// tail source
> file/Test/runMain akka.stream.alpakka.file.javadsl.FileTailSourceTest /some/path/toa/file
// or directory changes
> file/Test/runMain akka.stream.alpakka.file.javadsl.DirectoryChangesSourceTest /some/directory/path
// File rotator
> file/Test/runMain akka.stream.alpakka.file.javadsl.LogRotatorSinkTest
The source code for this page can be found here.