FTP

Artifacts

sbt
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-file" % "0.20",
  "com.lightbend.akka" %% "akka-stream-alpakka-ftp" % "0.20"
)
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-file_2.12</artifactId>
  <version>0.20</version>
</dependency>
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-ftp_2.12</artifactId>
  <version>0.20</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-file_2.12', version: '0.20',
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-ftp_2.12', version: '0.20'
}

Example: Copy all files from an FTP server to local files

  • list FTP server contents (1),
  • just bother about file entries (2),
  • for each file prepare for awaiting FutureCompletionStage results ignoring the stream order (3),
  • run a new stream copying the file contents to a local file (4),
  • combine the filename and the copying result (5),
  • collect all filenames with results into a sequence (6)
Scala
import java.net.InetAddress
import java.nio.file.Paths

import akka.stream.IOResult
import akka.stream.alpakka.ftp.FtpSettings
import akka.stream.alpakka.ftp.scaladsl.Ftp
import akka.stream.scaladsl.{FileIO, Sink}
import org.apache.mina.util.AvailablePortFinder
import playground.filesystem.FileSystemMock
import playground.{ActorSystemAvailable, FtpServerEmbedded}

import scala.collection.immutable
import scala.concurrent.Future
import scala.util.{Failure, Success}

val ftpSettings = FtpSettings(InetAddress.getByName("localhost"), port)

Ftp
  .ls("/", ftpSettings)                                    //: FtpFile (1)
  .filter(ftpFile => ftpFile.isFile)                       //: FtpFile (2)
  .mapAsyncUnordered(parallelism = 5) { ftpFile =>         // (3)
    val localPath = targetDir.resolve("." + ftpFile.path)
    val fetchFile: Future[IOResult] = Ftp
      .fromPath(ftpFile.path, ftpSettings)                
      .runWith(FileIO.toPath(localPath))                   // (4)
    fetchFile.map { ioResult =>                            // (5)
      (ftpFile.path, ioResult)
    }
  }                                                        //: (String, IOResult)
  .runWith(Sink.seq)                                       // (6)
Full source at GitHub
Java
import akka.japi.Pair;
import akka.stream.IOResult;
import akka.stream.alpakka.ftp.FtpSettings;
import akka.stream.alpakka.ftp.javadsl.Ftp;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Sink;
import org.apache.ftpserver.FtpServer;
import org.apache.mina.util.AvailablePortFinder;
import playground.ActorSystemAvailable;
import playground.FtpServerEmbedded;
import playground.filesystem.FileSystemMock;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;

final FtpSettings ftpSettings =
    FtpSettings.create(InetAddress.getByName("localhost")).withPort(port);
final Integer parallelism = 5;

Ftp.ls("/", ftpSettings) // : FtpFile (1)
    .filter(ftpFile -> ftpFile.isFile()) // : FtpFile (2)
    .mapAsyncUnordered(
        parallelism,
        ftpFile -> { // (3)
          final Path localPath = targetDir.resolve("." + ftpFile.path());
          final CompletionStage<IOResult> fetchFile =
              Ftp.fromPath(ftpFile.path(), ftpSettings)
                  .runWith(FileIO.toPath(localPath), actorMaterializer()); // (4)
          return fetchFile.thenApply(
              ioResult -> // (5)
              Pair.create(ftpFile.path(), ioResult));
        }) // : (String, IOResult)
    .runWith(Sink.seq(), actorMaterializer()); // (6)
sample
Full source at GitHub

Running the example code

This example is contained in a stand-alone runnable main, it can be run from sbt like this:

Scala
sbt
> doc-examples/runMain ftpsamples.FtpToFile
Java
sbt
> doc-examples/runMain ftpsamples.FtpToFileExample
The source code for this page can be found here.