File Connectors

The File connectors provides additional connectors for filesystems complementing the sources and sinks for files already included in core Akka Streams (which can be found in akka.stream.javadsl.FileIO and akka.stream.scaladsl.FileIO).

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-file" % "0.3"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-file_2.11</artifactId>
  <version>0.3</version>
</dependency>
Gradle
dependencies {
  compile group: "com.lightbend.akka", name: "akka-stream-alpakka-file_2.11", version: "0.3"
}

Usage

FileTailSource

The FileTailSource starts at a given offset in a file and emits chunks of bytes until reaching the end of the file, it will then poll the file for changes and emit new changes as they are written to the file (unless there is backpressure).

A very common use case is combining reading bytes with parsing the bytes into lines, therefore FileTailSource contains a few factory methods to create a source that parses the bytes into lines and emits those.

In this sample we simply tail the lines of a file and print them to standard out:

Scala
val fs = FileSystems.getDefault
val lines: Source[String, NotUsed] = scaladsl.FileTailSource.lines(
  path = fs.getPath(path),
  maxLineSize = 8192,
  pollingInterval = 250.millis
)

lines.runForeach(line => System.out.println(line))
Java
final FileSystem fs = FileSystems.getDefault();
final FiniteDuration pollingInterval = FiniteDuration.create(250, TimeUnit.MILLISECONDS);
final int maxLineSize = 8192;

final Source<String, NotUsed> lines =
  akka.stream.alpakka.file.javadsl.FileTailSource.createLines(fs.getPath(path), maxLineSize, pollingInterval);

lines.runForeach((line) -> System.out.println(line), materializer);

DirectoryChangesSource

The DirectoryChangesSource will emit elements every time there is a change to a watched directory in the local filesystem, the emitted change concists of the path that was changed and an enumeration describing what kind of change it was.

In this sample we simply print each change to the directory to standard output:

Scala
val fs = FileSystems.getDefault
val changes = DirectoryChangesSource(fs.getPath(path), pollInterval = 1.second, maxBufferSize = 1000)
changes.runForeach {
  case (path, change) => println("Path: " + path + ", Change: " + change)
}
Java
final FileSystem fs = FileSystems.getDefault();
final FiniteDuration pollingInterval = FiniteDuration.create(1, TimeUnit.SECONDS);
final int maxBufferSize = 1000;
final Source<Pair<Path, DirectoryChange>, NotUsed> changes =
  DirectoryChangesSource.create(fs.getPath(path), pollingInterval, maxBufferSize);


changes.runForeach((Pair<Path, DirectoryChange> pair) -> {
  final Path changedPath = pair.first();
  final DirectoryChange change = pair.second();
  System.out.println("Path: " + changedPath + ", Change: " + change);
}, materializer);

Running the example code

Both the samples are contained in standalone runnable mains, they can be run from sbt like this:

Scala
sbt
// tail source
> akka-stream-alpakka-file/test:runMain akka.stream.alpakka.file.scaladsl.FileTailSourceSpec /some/path/toa/file
// or directory changes
> akka-stream-alpakka-file/test:runMain akka.stream.alpakka.file.scaladsl.DirectoryChangesSourceSpec /some/directory/path
Java
sbt
// tail source
> akka-stream-alpakka-file/test:runMain akka.stream.alpakka.file.javadsl.FileTailSourceTest /some/path/toa/file
// or directory changes
> akka-stream-alpakka-file/test:runMain akka.stream.alpakka.file.javadsl.DirectoryChangesSourceTest /some/directory/path