From 4892c58bcfd59834fcdfb44e6bdeebee6fd317ba Mon Sep 17 00:00:00 2001 From: Regis Leray Date: Thu, 10 Oct 2024 20:33:43 -0400 Subject: [PATCH] 420 - improve readFile Stream finalization (#421) --- .github/workflows/ci.yml | 2 +- .github/workflows/site.yml | 20 +++--- build.sbt | 6 +- project/BuildHelper.scala | 2 +- project/build.properties | 2 +- project/plugins.sbt | 4 +- .../src/main/scala/zio/ftp/SecureFtp.scala | 7 ++- .../src/main/scala/zio/ftp/UnsecureFtp.scala | 46 +++++++++++--- .../ftp/UnsecureDownloadFinalizeSpec.scala | 49 --------------- .../zio/ftp/UnsecureFtpReadFileSpec.scala | 63 +++++++++++++++++++ .../test/scala/zio/ftp/UnsecureFtpSpec.scala | 10 +-- 11 files changed, 127 insertions(+), 84 deletions(-) delete mode 100644 zio-ftp/src/test/scala/zio/ftp/UnsecureDownloadFinalizeSpec.scala create mode 100644 zio-ftp/src/test/scala/zio/ftp/UnsecureFtpReadFileSpec.scala diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5c31ddf4..5603acac 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -121,4 +121,4 @@ jobs: PGP_PASSPHRASE: ${{ secrets.PGP_PASSPHRASE }} PGP_SECRET: ${{ secrets.PGP_SECRET }} SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }} - SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }} + SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }} \ No newline at end of file diff --git a/.github/workflows/site.yml b/.github/workflows/site.yml index 223121b5..93dca407 100644 --- a/.github/workflows/site.yml +++ b/.github/workflows/site.yml @@ -14,7 +14,7 @@ name: Website jobs: build: name: Build and Test - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 if: ${{ github.event_name == 'pull_request' }} steps: - name: Git Checkout @@ -28,16 +28,16 @@ jobs: java-version: 17 check-latest: true - name: Check if the README file is up to date - run: sbt docs/checkReadme - - name: Check if the site workflow is up to date - run: sbt docs/checkGithubWorkflow + run: sbt docs/checkReadme +# - name: Check if the site workflow is up to date +# run: sbt docs/checkGithubWorkflow - name: Check artifacts build process - run: sbt +publishLocal + run: sbt +publishLocal - name: Check website build process run: sbt docs/clean; sbt docs/buildWebsite publish-docs: name: Publish Docs - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 if: ${{ ((github.event_name == 'release') && (github.event.action == 'published')) || (github.event_name == 'workflow_dispatch') }} steps: - name: Git Checkout @@ -56,13 +56,13 @@ jobs: node-version: 16.x registry-url: https://registry.npmjs.org - name: Publish Docs to NPM Registry - run: sbt docs/publishToNpm + run: sbt docs/publishToNpm env: NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }} generate-readme: name: Generate README - runs-on: ubuntu-latest - if: ${{ (github.event_name == 'push') || ((github.event_name == 'release') && (github.event_name == 'published')) }} + runs-on: ubuntu-22.04 + if: ${{ (github.event_name == 'push') || ((github.event_name == 'release') && (github.event.action == 'published')) }} steps: - name: Git Checkout uses: actions/checkout@v3.3.0 @@ -76,7 +76,7 @@ jobs: java-version: 17 check-latest: true - name: Generate Readme - run: sbt docs/generateReadme + run: sbt docs/generateReadme - name: Commit Changes run: | git config --local user.email "github-actions[bot]@users.noreply.github.com" diff --git a/build.sbt b/build.sbt index 18fb6709..9e1b615d 100644 --- a/build.sbt +++ b/build.sbt @@ -27,7 +27,7 @@ addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt") addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck") addCommandAlias("fix", "; all compile:scalafix test:scalafix; all scalafmtSbt scalafmtAll") -val zioVersion = "2.0.21" +val zioVersion = "2.1.6" lazy val root = project.in(file(".")).settings(publish / skip := true).aggregate(`zio-ftp`, docs) @@ -41,8 +41,8 @@ lazy val `zio-ftp` = project "dev.zio" %% "zio-streams" % zioVersion, ("dev.zio" %% "zio-nio" % "2.0.2").exclude("org.scala-lang.modules", "scala-collection-compat_2.13"), "com.hierynomus" % "sshj" % "0.39.0", - "commons-net" % "commons-net" % "3.10.0", - "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0", + "commons-net" % "commons-net" % "3.11.0", + "org.scala-lang.modules" %% "scala-collection-compat" % "2.12.0", "org.apache.logging.log4j" % "log4j-api" % "2.24.0" % Test, "org.apache.logging.log4j" % "log4j-core" % "2.24.0" % Test, "org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.24.0" % Test, diff --git a/project/BuildHelper.scala b/project/BuildHelper.scala index bb754df2..b28307f6 100644 --- a/project/BuildHelper.scala +++ b/project/BuildHelper.scala @@ -13,7 +13,7 @@ object BuildHelper { ) final val Scala212 = "2.12.20" - final val Scala213 = "2.13.13" + final val Scala213 = "2.13.15" final val Scala3 = "3.3.3" // LTS final private val stdOptions = Seq( diff --git a/project/build.properties b/project/build.properties index f344c148..23f7d979 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version = 1.8.2 +sbt.version = 1.10.2 diff --git a/project/plugins.sbt b/project/plugins.sbt index 155a6dbc..278da11d 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,6 +1,6 @@ addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.2.1") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6") -addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.10") +addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.7.0") addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.5.0") addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.29") -addSbtPlugin("dev.zio" % "zio-sbt-website" % "0.3.5") +addSbtPlugin("dev.zio" % "zio-sbt-website" % "0.3.10") diff --git a/zio-ftp/src/main/scala/zio/ftp/SecureFtp.scala b/zio-ftp/src/main/scala/zio/ftp/SecureFtp.scala index fa2c7681..5b000631 100644 --- a/zio-ftp/src/main/scala/zio/ftp/SecureFtp.scala +++ b/zio-ftp/src/main/scala/zio/ftp/SecureFtp.scala @@ -36,7 +36,7 @@ import zio.ZIO.{ acquireRelease, attemptBlockingIO, fromAutoCloseable, scoped } * All ftp methods exposed are lift into ZIO or ZStream, which required a Blocking Environment * since the underlying java client only provide blocking methods. */ -final private class SecureFtp(unsafeClient: Client) extends FtpAccessors[Client] { +sealed abstract class SecureFtp(unsafeClient: Client) extends FtpAccessors[Client] { def stat(path: String): ZIO[Any, IOException, Option[FtpResource]] = execute(c => Option(c.statExistence(path)).map(FtpResource(path, _))) @@ -116,6 +116,9 @@ final private class SecureFtp(unsafeClient: Client) extends FtpAccessors[Client] object SecureFtp { type Client = SFTPClient + def unsafe(c: Client): SecureFtp = + new SecureFtp(c) {} + def connect(settings: SecureFtpSettings): ZIO[Scope, ConnectionError, FtpAccessors[Client]] = { val ssh = new SSHClient(settings.sshConfig) import settings._ @@ -136,7 +139,7 @@ object SecureFtp { setIdentity(_, credentials.username)(ssh) ) - new SecureFtp(ssh.newSFTPClient()) + new SecureFtp(ssh.newSFTPClient()) {} }.mapError(ConnectionError(s"Fail to connect to server ${settings.host}:${settings.port}", _)) )( _.execute(_.close()).ignore diff --git a/zio-ftp/src/main/scala/zio/ftp/UnsecureFtp.scala b/zio-ftp/src/main/scala/zio/ftp/UnsecureFtp.scala index f7c045d1..91f6ac36 100644 --- a/zio-ftp/src/main/scala/zio/ftp/UnsecureFtp.scala +++ b/zio-ftp/src/main/scala/zio/ftp/UnsecureFtp.scala @@ -19,7 +19,7 @@ import java.io.IOException import org.apache.commons.net.ftp.{ FTP, FTPClient => JFTPClient, FTPSClient => JFTPSClient } import zio.ftp.UnsecureFtp.Client import zio.stream.ZStream -import zio.{ Scope, ZIO } +import zio.{ Ref, Scope, UIO, ZIO } import zio.ZIO.{ acquireRelease, attemptBlockingIO } /** @@ -28,25 +28,48 @@ import zio.ZIO.{ acquireRelease, attemptBlockingIO } * All ftp methods exposed are lift into ZIO or ZStream * The underlying java client only provide blocking methods. */ -final private class UnsecureFtp(unsafeClient: Client) extends FtpAccessors[Client] { +sealed abstract class UnsecureFtp(unsafeClient: Client) extends FtpAccessors[Client] { def stat(path: String): ZIO[Any, IOException, Option[FtpResource]] = execute(c => Option(c.mlistFile(path))).map(_.map(FtpResource.fromFtpFile(_))) def readFile(path: String, chunkSize: Int = 2048, fileOffset: Long): ZStream[Any, IOException, Byte] = { + def error(cause: Option[Exception] = None): Left[FileTransferIncompleteError, Unit] = + Left( + FileTransferIncompleteError( + s"Cannot finalize the file transfer and completely read the entire file $path. ${cause.fold("")(_.getMessage)}" + ) + ) + val success: Either[IOException, Unit] = Right(()) + val initialize = execute(_.setRestartOffset(fileOffset)) - val terminate = ZIO - .fail( - FileTransferIncompleteError(s"Cannot finalize the file transfer and completely read the entire file $path.") - ) - .unlessZIO(execute(_.completePendingCommand())) + def terminate(state: Ref[Either[IOException, Unit]]): UIO[Unit] = + execute(_.completePendingCommand()) + .foldZIO( + err => state.set(error(Some(err))), + resp => + if (resp) state.set(success) + else state.set(error()) + ) + + def propagate(state: Ref[Either[IOException, Unit]]) = + ZStream.fromZIO(state.get).flatMap { + case Left(ex) => ZStream.fail(ex) + case _ => ZStream.empty + } val inputStream = execute(c => Option(c.retrieveFileStream(path))).someOrFail(InvalidPathError(s"File does not exist $path")) - (ZStream.fromZIO(initialize) *> ZStream.empty) ++ ZStream.fromInputStreamZIO(inputStream, chunkSize) ++ (ZStream - .fromZIO(terminate) *> ZStream.empty) + ZStream.unwrap { + for { + state <- Ref.make(success) + is <- initialize *> inputStream + } yield ZStream + .fromInputStream(is, chunkSize) + .ensuring(terminate(state)) ++ propagate(state) + } } def rm(path: String): ZIO[Any, IOException, Unit] = @@ -105,6 +128,9 @@ final private class UnsecureFtp(unsafeClient: Client) extends FtpAccessors[Clien object UnsecureFtp { type Client = JFTPClient + def unsafe(c: Client): UnsecureFtp = + new UnsecureFtp(c) {} + def connect(settings: UnsecureFtpSettings): ZIO[Scope, ConnectionError, FtpAccessors[Client]] = acquireRelease( attemptBlockingIO { @@ -141,7 +167,7 @@ object UnsecureFtp { settings.dataTimeout.foreach(ftpClient.setDataTimeout) - new UnsecureFtp(ftpClient) -> success + new UnsecureFtp(ftpClient) {} -> success }.mapError(e => ConnectionError(e.getMessage, e)) .filterOrFail(_._2)(ConnectionError(s"Fail to connect to server ${settings.host}:${settings.port}")) .map(_._1) diff --git a/zio-ftp/src/test/scala/zio/ftp/UnsecureDownloadFinalizeSpec.scala b/zio-ftp/src/test/scala/zio/ftp/UnsecureDownloadFinalizeSpec.scala deleted file mode 100644 index 689e401f..00000000 --- a/zio-ftp/src/test/scala/zio/ftp/UnsecureDownloadFinalizeSpec.scala +++ /dev/null @@ -1,49 +0,0 @@ -package zio.ftp - -import org.apache.commons.net.ftp.FTPClient -import zio.test.Assertion._ -import zio.test._ - -import java.io.InputStream -import scala.util.Random - -object UnsecureDownloadFinalizeSpec extends ZIOSpecDefault { - - private def createFtpclient(success: Boolean) = { - val client = new FTPClient { - override def retrieveFileStream(remote: String): InputStream = { - val it = Random.alphanumeric.take(5000).map(_.toByte).iterator - () => if (it.hasNext) it.next().toInt else -1 - } - - override def completePendingCommand(): Boolean = success - } - - new UnsecureFtp(client) - } - - private def hasIncompleteMsg(a: Assertion[String]) = - hasField("file transfer incomplete message", (e: FileTransferIncompleteError) => e.message, a) - - override def spec = - suite("Download finalizer")( - test("complete pending command gets called") { - val ftpClient = createFtpclient(true) - for { - bytes <- ftpClient.readFile("/a/b/c.txt").runCollect - } yield assert(bytes)(hasSize(equalTo(5000))) - }, - test("completion failure is exposed on error channel") { - val ftpClient = createFtpclient(false) - for { - exit <- ftpClient.readFile("/a/b/c.txt").runCollect.exit - } yield assert(exit)( - fails( - isSubtype[FileTransferIncompleteError]( - hasIncompleteMsg(startsWithString("Cannot finalize the file transfer") && containsString("/a/b/c.txt")) - ) - ) - ) - } - ) -} diff --git a/zio-ftp/src/test/scala/zio/ftp/UnsecureFtpReadFileSpec.scala b/zio-ftp/src/test/scala/zio/ftp/UnsecureFtpReadFileSpec.scala new file mode 100644 index 00000000..07e487d8 --- /dev/null +++ b/zio-ftp/src/test/scala/zio/ftp/UnsecureFtpReadFileSpec.scala @@ -0,0 +1,63 @@ +package zio.ftp + +import org.apache.commons.net.ftp.FTPClient +import zio.test.Assertion._ +import zio.test._ + +import java.io.{ IOException, InputStream } +import scala.util.Random + +object UnsecureFtpReadFileSpec extends ZIOSpecDefault { + + private def createFtpclient(errorOrsuccess: Either[Exception, Boolean]) = + UnsecureFtp.unsafe(new FTPClient { + + override def retrieveFileStream(remote: String): InputStream = { + val it = Random.alphanumeric.take(5000).map(_.toByte).iterator + () => if (it.hasNext) it.next().toInt else -1 + } + + override def completePendingCommand(): Boolean = + errorOrsuccess match { + case Left(err) => throw err + case Right(flag) => flag + } + }) + + private def hasIncompleteMsg(a: Assertion[String]) = + hasField("file transfer incomplete message", (e: Exception) => e.getMessage, a) + + override def spec = + suite("UnsecureFtp - ReadFile complete pending")( + test("succeed") { + val ftpClient = createFtpclient(Right(true)) + for { + bytes <- ftpClient.readFile("/a/b/c.txt").runCollect + } yield assert(bytes)(hasSize(equalTo(5000))) + }, + test("fail to complete") { + val ftpClient = createFtpclient(Right(false)) + for { + exit <- ftpClient.readFile("/a/b/c.txt").runCollect.exit + } yield assert(exit)( + fails( + isSubtype[FileTransferIncompleteError]( + hasIncompleteMsg(startsWithString("Cannot finalize the file transfer") && containsString("/a/b/c.txt")) + ) + ) + ) + }, + test("error occur") { + val ftpClient = createFtpclient(Left(new IOException("Boom"))) + for { + exit <- ftpClient.readFile("/a/b/c.txt").runCollect.exit + } yield assert(exit)( + fails( + isSubtype[FileTransferIncompleteError]( + hasIncompleteMsg(startsWithString("Cannot finalize the file transfer") && containsString("/a/b/c.txt")) + ) + ) + ) + } + ) +} diff --git a/zio-ftp/src/test/scala/zio/ftp/UnsecureFtpSpec.scala b/zio-ftp/src/test/scala/zio/ftp/UnsecureFtpSpec.scala index 11c948c7..3110b581 100644 --- a/zio-ftp/src/test/scala/zio/ftp/UnsecureFtpSpec.scala +++ b/zio-ftp/src/test/scala/zio/ftp/UnsecureFtpSpec.scala @@ -14,21 +14,21 @@ import java.net.{ InetSocketAddress, Proxy } import scala.io.Source object UnsecureSslFtpSpec extends ZIOSpecDefault { - val settings = UnsecureFtpSettings.secure("127.0.0.1", 2121, FtpCredentials("username", "userpass")) + private val settings = UnsecureFtpSettings.secure("127.0.0.1", 2121, FtpCredentials("username", "userpass")) - override def spec = + override def spec: Spec[TestEnvironment & Scope, Any] = FtpSuite.spec("UnsecureSslFtpSpec", settings).provideSomeLayer[Scope](unsecure(settings)) @@ sequential } object UnsecureFtpSpec extends ZIOSpecDefault { - val settings = UnsecureFtpSettings("127.0.0.1", port = 2121, FtpCredentials("username", "userpass")) + private val settings = UnsecureFtpSettings("127.0.0.1", port = 2121, FtpCredentials("username", "userpass")) - override def spec = + override def spec: Spec[TestEnvironment & Scope, Any] = FtpSuite.spec("UnsecureFtpSpec", settings).provideSomeLayer[Scope](unsecure(settings)) @@ sequential } object FtpSuite { - val home = ZPath("ftp-home/ftp/home") + private val home = ZPath("ftp-home/ftp/home") def spec(labelSuite: String, settings: UnsecureFtpSettings) = suite(labelSuite)(