FTP
The FTP connector provides Akka Stream sources to connect to FTP, FTPs and SFTP servers. Currently, two kinds of sources are provided:
- one for browsing or traversing the server recursively and,
- another for retrieving files as a stream of bytes.
Project Info: Alpakka FTP | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-ftp
1.1.2
|
JDK versions | OpenJDK 8 |
Scala versions | 2.12.7, 2.11.12 |
JPMS module name | akka.stream.alpakka.ftp |
License | |
Readiness level |
Since 0.3, 2016-12-02
|
Home page | https://doc.akka.io/docs/alpakka/current |
API documentation | |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-ftp" % "1.1.2"
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-ftp_2.12</artifactId> <version>1.1.2</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-ftp_2.12', version: '1.1.2' }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version License com.hierynomus sshj 0.26.0 Apache License 2.0 com.typesafe.akka akka-stream_2.12 2.5.23 Apache License, Version 2.0 commons-net commons-net 3.6 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause - Dependency tree
com.hierynomus sshj 0.26.0 Apache License 2.0 com.jcraft jzlib 1.1.3 BSD net.i2p.crypto eddsa 0.2.0 CC0 1.0 Universal org.bouncycastle bcpkix-jdk15on 1.60 Bouncy Castle Licence org.bouncycastle bcprov-jdk15on 1.60 Bouncy Castle Licence org.bouncycastle bcprov-jdk15on 1.60 Bouncy Castle Licence org.slf4j slf4j-api 1.7.7 MIT License com.typesafe.akka akka-stream_2.12 2.5.23 Apache License, Version 2.0 com.typesafe.akka akka-actor_2.12 2.5.23 Apache License, Version 2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-protobuf_2.12 2.5.23 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe ssl-config-core_2.12 0.3.7 Apache-2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-parser-combinators_2.12 1.1.1 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.reactivestreams reactive-streams 1.0.2 CC0 org.scala-lang scala-library 2.12.7 BSD 3-Clause commons-net commons-net 3.6 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause
Configuring the connection settings
In order to establish a connection with the remote server, you need to provide a specialized version of a RemoteFileSettings
instance. It’s specialized as it depends on the kind of server you’re connecting to: FTP, FTPs or SFTP.
- Scala
-
val ftpSettings = FtpSettings .create(InetAddress.getByName(HOSTNAME)) .withPort(PORT) .withCredentials(CREDENTIALS) .withBinary(true) .withPassiveMode(true) // only useful for debugging .withConfigureConnection((ftpClient: FTPClient) => { ftpClient.addProtocolCommandListener(new PrintCommandListener(new PrintWriter(System.out), true)) })
- Java
-
import akka.stream.alpakka.ftp.javadsl.Ftp; import akka.stream.alpakka.ftp.FtpSettings; import akka.stream.javadsl.Source; import org.apache.commons.net.PrintCommandListener; import org.apache.commons.net.ftp.FTPClient; import java.net.InetAddress; FtpSettings ftpSettings = FtpSettings.create(InetAddress.getByName(HOSTNAME)) .withPort(PORT) .withCredentials(CREDENTIALS) .withBinary(true) .withPassiveMode(true) // only useful for debugging .withConfigureConnectionConsumer( (FTPClient ftpClient) -> { ftpClient.addProtocolCommandListener( new PrintCommandListener(new PrintWriter(System.out), true)); });
The configuration above will create an anonymous connection with a remote FTP server in passive mode. For both FTPs and SFTP servers, you will need to provide the specialized versions of these settings: FtpsSettings
or SftpSettings
respectively.
The example demonstrates optional use of configureConnection
option available on FTP and FTPs clients. Use it to configure any custom parameters the server may require, such as explicit or implicit data transfer encryption.
For non-anonymous connection, please provide an instance of NonAnonFtpCredentials
instead.
For connection using a private key, please provide an instance of SftpIdentity
to SftpSettings
.
In order to use a custom SSH client for SFTP please provide an instance of SSHClient.
- Scala
-
import akka.stream.alpakka.ftp.scaladsl.{Sftp, SftpApi} import net.schmizz.sshj.{DefaultConfig, SSHClient} val sshClient: SSHClient = new SSHClient(new DefaultConfig) val configuredClient: SftpApi = Sftp(sshClient)
- Java
-
import akka.stream.alpakka.ftp.javadsl.Sftp; import akka.stream.alpakka.ftp.javadsl.SftpApi; import net.schmizz.sshj.DefaultConfig; import net.schmizz.sshj.SSHClient; public class ConfigureCustomSSHClient { public ConfigureCustomSSHClient() { SSHClient sshClient = new SSHClient(new DefaultConfig()); SftpApi sftp = Sftp.create(sshClient); } }
Traversing a remote FTP folder recursively
In order to traverse a remote folder recursively, you need to use the ls
method in the FTP API:
- Scala
-
import akka.NotUsed import akka.stream.alpakka.ftp.scaladsl.Ftp import akka.stream.scaladsl.Source def listFiles(basePath: String, settings: FtpSettings): Source[FtpFile, NotUsed] = Ftp.ls(basePath, settings)
- Java
-
import akka.NotUsed; import akka.stream.Materializer; import akka.stream.alpakka.ftp.FtpFile; import akka.stream.alpakka.ftp.FtpSettings; import akka.stream.alpakka.ftp.javadsl.Ftp; import akka.stream.javadsl.Source; public class FtpTraversingExample { public void listFiles(String basePath, FtpSettings settings, Materializer materializer) throws Exception { Ftp.ls(basePath, settings) .runForeach(ftpFile -> System.out.println(ftpFile.toString()), materializer); } }
This source will emit FtpFile
elements with no significant materialization.
For both FTPs and SFTP servers, you will need to use the FTPs
and SFTP
API respectively.
Retrieving files
In order to retrieve a remote file as a stream of bytes, you need to use the fromPath
method in the FTP API:
- Scala
-
import akka.stream.IOResult import akka.stream.alpakka.ftp.scaladsl.Ftp import akka.stream.scaladsl.Source import akka.util.ByteString import scala.concurrent.Future def retrieveFromPath(path: String, settings: FtpSettings): Source[ByteString, Future[IOResult]] = Ftp.fromPath(path, settings)
- Java
-
import akka.stream.IOResult; import akka.stream.alpakka.ftp.FtpSettings; import akka.stream.alpakka.ftp.javadsl.Ftp; import akka.stream.javadsl.Source; import akka.util.ByteString; import java.util.concurrent.CompletionStage; public class FtpRetrievingExample { public Source<ByteString, CompletionStage<IOResult>> retrieveFromPath( String path, FtpSettings settings) throws Exception { return Ftp.fromPath(path, settings); } }
This source will emit ByteString
elements and materializes to Future
in Scala API and CompletionStage
in Java API of IOResult
when the stream finishes.
For both FTPs and SFTP servers, you will need to use the FTPs
and SFTP
API respectively.
Writing files
In order to store a remote file from a stream of bytes, you need to use the toPath
method in the FTP API:
- Scala
-
import akka.stream.IOResult import akka.stream.alpakka.ftp.scaladsl.Ftp import akka.util.ByteString import scala.concurrent.Future val result: Future[IOResult] = Source .single(ByteString("this is the file contents")) .runWith(Ftp.toPath("file.txt", ftpSettings)) // Create a gzipped target file import akka.stream.scaladsl.Compression val result: Future[IOResult] = Source .single(ByteString("this is the file contents" * 50)) .via(Compression.gzip) .runWith(Ftp.toPath("file.txt.gz", ftpSettings))
- Java
-
import akka.stream.alpakka.ftp.javadsl.Ftp; import akka.stream.IOResult; import akka.stream.javadsl.Compression; import akka.stream.testkit.javadsl.StreamTestKit; import akka.util.ByteString; import java.util.concurrent.CompletionStage; CompletionStage<IOResult> result = Source.single(ByteString.fromString("this is the file contents")) .runWith(Ftp.toPath("file.txt", ftpSettings), materializer); // Create a gzipped target file CompletionStage<IOResult> result = Source.single(ByteString.fromString("this is the file contents")) .via(Compression.gzip()) .runWith(Ftp.toPath("file.txt.gz", ftpSettings), materializer);
This sink will consume ByteString
elements and materializes to Future
in Scala API and CompletionStage
in Java API of IOResult
when the stream finishes.
For both FTPs and SFTP servers, you will need to use the FTPs
and SFTP
API respectively.
Removing files
In order to remove a remote file, you need to use the remove
method in the FTP API:
- Scala
-
import akka.stream.IOResult import akka.stream.alpakka.ftp.scaladsl.Ftp import akka.stream.scaladsl.Sink import scala.concurrent.Future def remove(settings: FtpSettings): Sink[FtpFile, Future[IOResult]] = Ftp.remove(settings)
- Java
-
import akka.stream.IOResult; import akka.stream.alpakka.ftp.FtpFile; import akka.stream.alpakka.ftp.FtpSettings; import akka.stream.alpakka.ftp.javadsl.Ftp; import akka.stream.javadsl.Sink; import java.util.concurrent.CompletionStage; public class FtpRemovingExample { public Sink<FtpFile, CompletionStage<IOResult>> remove(FtpSettings settings) throws Exception { return Ftp.remove(settings); } }
This sink will consume FtpFile
elements and materializes to Future
in Scala API and CompletionStage
in Java API of IOResult
when the stream finishes.
Moving files
In order to move a remote file, you need to use the move
method in the FTP API. The move
method takes a function to calculate the path to which the file should be moved based on the consumed FtpFile
.
- Scala
-
import akka.stream.IOResult import akka.stream.alpakka.ftp.scaladsl.Ftp import akka.stream.scaladsl.Sink import scala.concurrent.Future def move(destinationPath: FtpFile => String, settings: FtpSettings): Sink[FtpFile, Future[IOResult]] = Ftp.move(destinationPath, settings)
- Java
-
import akka.stream.IOResult; import akka.stream.alpakka.ftp.FtpFile; import akka.stream.alpakka.ftp.FtpSettings; import akka.stream.alpakka.ftp.javadsl.Ftp; import akka.stream.javadsl.Sink; import java.util.concurrent.CompletionStage; import java.util.function.Function; public class FtpMovingExample { public Sink<FtpFile, CompletionStage<IOResult>> move( Function<FtpFile, String> destinationPath, FtpSettings settings) throws Exception { return Ftp.move(destinationPath, settings); } }
This sink will consume FtpFile
elements and materializes to Future
in Scala API and CompletionStage
in Java API of IOResult
when the stream finishes.
Typical use-case for this would be listing files from a ftp location, do some processing and move the files when done. An example of this use case can be found below.
Creating directory
In order to create a directory the user has to specify a parent directory (also known as base path) and directory’s name.
Alpakka provides a materialized API mkdirAsync
(based on FutureCompletion Stage) and unmaterialized API mkdir
(using Sources) to let the user choose when the action will be executed.
- Scala
-
import akka.NotUsed import akka.stream.scaladsl.Source import akka.stream.alpakka.ftp.scaladsl.Ftp import akka.Done def mkdir(basePath: String, directoryName: String, settings: FtpSettings): Source[Done, NotUsed] = Ftp.mkdir(basePath, directoryName, settings) - Java
-
import akka.Done; import akka.NotUsed; import akka.stream.alpakka.ftp.FtpSettings; import akka.stream.alpakka.ftp.javadsl.Ftp; import akka.stream.javadsl.Source; public class FtpMkdirExample { public Source<Done, NotUsed> mkdir( String parentPath, String directoryName, FtpSettings settings) { return Ftp.mkdir(parentPath, directoryName, settings); } }
Please note that to include a subdirectory in result of ls
the emitTraversedDirectories
has to be set to true
.
Example: downloading files from an FTP location and move the original files
- Scala
-
import java.nio.file.Files import akka.NotUsed import akka.stream.alpakka.ftp.scaladsl.Ftp import akka.stream.scaladsl.{FileIO, RunnableGraph} def processAndMove(sourcePath: String, destinationPath: FtpFile => String, settings: FtpSettings): RunnableGraph[NotUsed] = Ftp .ls(sourcePath, settings) .flatMapConcat(ftpFile => Ftp.fromPath(ftpFile.path, settings).map((_, ftpFile))) .alsoTo(FileIO.toPath(Files.createTempFile("downloaded", "tmp")).contramap(_._1)) .to(Ftp.move(destinationPath, settings).contramap(_._2))
- Java
-
import akka.NotUsed; import akka.japi.Pair; import akka.stream.alpakka.ftp.FtpFile; import akka.stream.alpakka.ftp.FtpSettings; import akka.stream.alpakka.ftp.javadsl.Ftp; import akka.stream.javadsl.FileIO; import akka.stream.javadsl.RunnableGraph; import java.nio.file.Files; import java.util.function.Function; public class FtpProcessAndMoveExample { public RunnableGraph<NotUsed> processAndMove( String sourcePath, Function<FtpFile, String> destinationPath, FtpSettings settings) throws Exception { return Ftp.ls(sourcePath, settings) .flatMapConcat( ftpFile -> Ftp.fromPath(ftpFile.path(), settings).map(data -> new Pair<>(data, ftpFile))) .alsoTo(FileIO.toPath(Files.createTempFile("downloaded", "tmp")).contramap(Pair::first)) .to(Ftp.move(destinationPath, settings).contramap(Pair::second)); } }
Running the example code
The code in this guide is part of runnable tests of this project. You are welcome to browse the code, edit and run it in sbt.
```
docker-compose up -d ftp sftp
sbt
> ftp/test
```
When using the SFTP
API, take into account that JVM relies on /dev/random
for random number generation by default. This might potentially block the process on some operating systems as /dev/random
waits for a certain amount of entropy to be generated on the host machine before returning a result. In such case, please consider providing the parameter -Djava.security.egd = file:/dev/./urandom
into the execution context. Further information can be found here.