Skip to content

Commit

Permalink
420 - improve readFile Stream finalization (#421)
Browse files Browse the repository at this point in the history
  • Loading branch information
regis-leray authored Oct 11, 2024
1 parent b19ce44 commit 4892c58
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 84 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,4 @@ jobs:
PGP_PASSPHRASE: ${{ secrets.PGP_PASSPHRASE }}
PGP_SECRET: ${{ secrets.PGP_SECRET }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }}
SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }}
20 changes: 10 additions & 10 deletions .github/workflows/site.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ name: Website
jobs:
build:
name: Build and Test
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
if: ${{ github.event_name == 'pull_request' }}
steps:
- name: Git Checkout
Expand All @@ -28,16 +28,16 @@ jobs:
java-version: 17
check-latest: true
- name: Check if the README file is up to date
run: sbt docs/checkReadme
- name: Check if the site workflow is up to date
run: sbt docs/checkGithubWorkflow
run: sbt docs/checkReadme
# - name: Check if the site workflow is up to date
# run: sbt docs/checkGithubWorkflow
- name: Check artifacts build process
run: sbt +publishLocal
run: sbt +publishLocal
- name: Check website build process
run: sbt docs/clean; sbt docs/buildWebsite
publish-docs:
name: Publish Docs
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
if: ${{ ((github.event_name == 'release') && (github.event.action == 'published')) || (github.event_name == 'workflow_dispatch') }}
steps:
- name: Git Checkout
Expand All @@ -56,13 +56,13 @@ jobs:
node-version: 16.x
registry-url: https://registry.npmjs.org
- name: Publish Docs to NPM Registry
run: sbt docs/publishToNpm
run: sbt docs/publishToNpm
env:
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
generate-readme:
name: Generate README
runs-on: ubuntu-latest
if: ${{ (github.event_name == 'push') || ((github.event_name == 'release') && (github.event_name == 'published')) }}
runs-on: ubuntu-22.04
if: ${{ (github.event_name == 'push') || ((github.event_name == 'release') && (github.event.action == 'published')) }}
steps:
- name: Git Checkout
uses: actions/checkout@v3.3.0
Expand All @@ -76,7 +76,7 @@ jobs:
java-version: 17
check-latest: true
- name: Generate Readme
run: sbt docs/generateReadme
run: sbt docs/generateReadme
- name: Commit Changes
run: |
git config --local user.email "github-actions[bot]@users.noreply.github.com"
Expand Down
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt")
addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck")
addCommandAlias("fix", "; all compile:scalafix test:scalafix; all scalafmtSbt scalafmtAll")

val zioVersion = "2.0.21"
val zioVersion = "2.1.6"

lazy val root =
project.in(file(".")).settings(publish / skip := true).aggregate(`zio-ftp`, docs)
Expand All @@ -41,8 +41,8 @@ lazy val `zio-ftp` = project
"dev.zio" %% "zio-streams" % zioVersion,
("dev.zio" %% "zio-nio" % "2.0.2").exclude("org.scala-lang.modules", "scala-collection-compat_2.13"),
"com.hierynomus" % "sshj" % "0.39.0",
"commons-net" % "commons-net" % "3.10.0",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0",
"commons-net" % "commons-net" % "3.11.0",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.12.0",
"org.apache.logging.log4j" % "log4j-api" % "2.24.0" % Test,
"org.apache.logging.log4j" % "log4j-core" % "2.24.0" % Test,
"org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.24.0" % Test,
Expand Down
2 changes: 1 addition & 1 deletion project/BuildHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ object BuildHelper {
)

final val Scala212 = "2.12.20"
final val Scala213 = "2.13.13"
final val Scala213 = "2.13.15"
final val Scala3 = "3.3.3" // LTS

final private val stdOptions = Seq(
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.8.2
sbt.version = 1.10.2
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.2.1")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.10")
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.7.0")
addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.5.0")
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.29")
addSbtPlugin("dev.zio" % "zio-sbt-website" % "0.3.5")
addSbtPlugin("dev.zio" % "zio-sbt-website" % "0.3.10")
7 changes: 5 additions & 2 deletions zio-ftp/src/main/scala/zio/ftp/SecureFtp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import zio.ZIO.{ acquireRelease, attemptBlockingIO, fromAutoCloseable, scoped }
* All ftp methods exposed are lift into ZIO or ZStream, which required a Blocking Environment
* since the underlying java client only provide blocking methods.
*/
final private class SecureFtp(unsafeClient: Client) extends FtpAccessors[Client] {
sealed abstract class SecureFtp(unsafeClient: Client) extends FtpAccessors[Client] {

def stat(path: String): ZIO[Any, IOException, Option[FtpResource]] =
execute(c => Option(c.statExistence(path)).map(FtpResource(path, _)))
Expand Down Expand Up @@ -116,6 +116,9 @@ final private class SecureFtp(unsafeClient: Client) extends FtpAccessors[Client]
object SecureFtp {
type Client = SFTPClient

def unsafe(c: Client): SecureFtp =
new SecureFtp(c) {}

def connect(settings: SecureFtpSettings): ZIO[Scope, ConnectionError, FtpAccessors[Client]] = {
val ssh = new SSHClient(settings.sshConfig)
import settings._
Expand All @@ -136,7 +139,7 @@ object SecureFtp {
setIdentity(_, credentials.username)(ssh)
)

new SecureFtp(ssh.newSFTPClient())
new SecureFtp(ssh.newSFTPClient()) {}
}.mapError(ConnectionError(s"Fail to connect to server ${settings.host}:${settings.port}", _))
)(
_.execute(_.close()).ignore
Expand Down
46 changes: 36 additions & 10 deletions zio-ftp/src/main/scala/zio/ftp/UnsecureFtp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.io.IOException
import org.apache.commons.net.ftp.{ FTP, FTPClient => JFTPClient, FTPSClient => JFTPSClient }
import zio.ftp.UnsecureFtp.Client
import zio.stream.ZStream
import zio.{ Scope, ZIO }
import zio.{ Ref, Scope, UIO, ZIO }
import zio.ZIO.{ acquireRelease, attemptBlockingIO }

/**
Expand All @@ -28,25 +28,48 @@ import zio.ZIO.{ acquireRelease, attemptBlockingIO }
* All ftp methods exposed are lift into ZIO or ZStream
* The underlying java client only provide blocking methods.
*/
final private class UnsecureFtp(unsafeClient: Client) extends FtpAccessors[Client] {
sealed abstract class UnsecureFtp(unsafeClient: Client) extends FtpAccessors[Client] {

def stat(path: String): ZIO[Any, IOException, Option[FtpResource]] =
execute(c => Option(c.mlistFile(path))).map(_.map(FtpResource.fromFtpFile(_)))

def readFile(path: String, chunkSize: Int = 2048, fileOffset: Long): ZStream[Any, IOException, Byte] = {
def error(cause: Option[Exception] = None): Left[FileTransferIncompleteError, Unit] =
Left(
FileTransferIncompleteError(
s"Cannot finalize the file transfer and completely read the entire file $path. ${cause.fold("")(_.getMessage)}"
)
)
val success: Either[IOException, Unit] = Right(())

val initialize = execute(_.setRestartOffset(fileOffset))

val terminate = ZIO
.fail(
FileTransferIncompleteError(s"Cannot finalize the file transfer and completely read the entire file $path.")
)
.unlessZIO(execute(_.completePendingCommand()))
def terminate(state: Ref[Either[IOException, Unit]]): UIO[Unit] =
execute(_.completePendingCommand())
.foldZIO(
err => state.set(error(Some(err))),
resp =>
if (resp) state.set(success)
else state.set(error())
)

def propagate(state: Ref[Either[IOException, Unit]]) =
ZStream.fromZIO(state.get).flatMap {
case Left(ex) => ZStream.fail(ex)
case _ => ZStream.empty
}

val inputStream =
execute(c => Option(c.retrieveFileStream(path))).someOrFail(InvalidPathError(s"File does not exist $path"))

(ZStream.fromZIO(initialize) *> ZStream.empty) ++ ZStream.fromInputStreamZIO(inputStream, chunkSize) ++ (ZStream
.fromZIO(terminate) *> ZStream.empty)
ZStream.unwrap {
for {
state <- Ref.make(success)
is <- initialize *> inputStream
} yield ZStream
.fromInputStream(is, chunkSize)
.ensuring(terminate(state)) ++ propagate(state)
}
}

def rm(path: String): ZIO[Any, IOException, Unit] =
Expand Down Expand Up @@ -105,6 +128,9 @@ final private class UnsecureFtp(unsafeClient: Client) extends FtpAccessors[Clien
object UnsecureFtp {
type Client = JFTPClient

def unsafe(c: Client): UnsecureFtp =
new UnsecureFtp(c) {}

def connect(settings: UnsecureFtpSettings): ZIO[Scope, ConnectionError, FtpAccessors[Client]] =
acquireRelease(
attemptBlockingIO {
Expand Down Expand Up @@ -141,7 +167,7 @@ object UnsecureFtp {

settings.dataTimeout.foreach(ftpClient.setDataTimeout)

new UnsecureFtp(ftpClient) -> success
new UnsecureFtp(ftpClient) {} -> success
}.mapError(e => ConnectionError(e.getMessage, e))
.filterOrFail(_._2)(ConnectionError(s"Fail to connect to server ${settings.host}:${settings.port}"))
.map(_._1)
Expand Down
49 changes: 0 additions & 49 deletions zio-ftp/src/test/scala/zio/ftp/UnsecureDownloadFinalizeSpec.scala

This file was deleted.

63 changes: 63 additions & 0 deletions zio-ftp/src/test/scala/zio/ftp/UnsecureFtpReadFileSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package zio.ftp

import org.apache.commons.net.ftp.FTPClient
import zio.test.Assertion._
import zio.test._

import java.io.{ IOException, InputStream }
import scala.util.Random

object UnsecureFtpReadFileSpec extends ZIOSpecDefault {

private def createFtpclient(errorOrsuccess: Either[Exception, Boolean]) =
UnsecureFtp.unsafe(new FTPClient {

override def retrieveFileStream(remote: String): InputStream = {
val it = Random.alphanumeric.take(5000).map(_.toByte).iterator
() => if (it.hasNext) it.next().toInt else -1
}

override def completePendingCommand(): Boolean =
errorOrsuccess match {
case Left(err) => throw err
case Right(flag) => flag
}
})

private def hasIncompleteMsg(a: Assertion[String]) =
hasField("file transfer incomplete message", (e: Exception) => e.getMessage, a)

override def spec =
suite("UnsecureFtp - ReadFile complete pending")(
test("succeed") {
val ftpClient = createFtpclient(Right(true))
for {
bytes <- ftpClient.readFile("/a/b/c.txt").runCollect
} yield assert(bytes)(hasSize(equalTo(5000)))
},
test("fail to complete") {
val ftpClient = createFtpclient(Right(false))
for {
exit <- ftpClient.readFile("/a/b/c.txt").runCollect.exit
} yield assert(exit)(
fails(
isSubtype[FileTransferIncompleteError](
hasIncompleteMsg(startsWithString("Cannot finalize the file transfer") && containsString("/a/b/c.txt"))
)
)
)
},
test("error occur") {
val ftpClient = createFtpclient(Left(new IOException("Boom")))
for {
exit <- ftpClient.readFile("/a/b/c.txt").runCollect.exit
} yield assert(exit)(
fails(
isSubtype[FileTransferIncompleteError](
hasIncompleteMsg(startsWithString("Cannot finalize the file transfer") && containsString("/a/b/c.txt"))
)
)
)
}
)
}
10 changes: 5 additions & 5 deletions zio-ftp/src/test/scala/zio/ftp/UnsecureFtpSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ import java.net.{ InetSocketAddress, Proxy }
import scala.io.Source

object UnsecureSslFtpSpec extends ZIOSpecDefault {
val settings = UnsecureFtpSettings.secure("127.0.0.1", 2121, FtpCredentials("username", "userpass"))
private val settings = UnsecureFtpSettings.secure("127.0.0.1", 2121, FtpCredentials("username", "userpass"))

override def spec =
override def spec: Spec[TestEnvironment & Scope, Any] =
FtpSuite.spec("UnsecureSslFtpSpec", settings).provideSomeLayer[Scope](unsecure(settings)) @@ sequential
}

object UnsecureFtpSpec extends ZIOSpecDefault {
val settings = UnsecureFtpSettings("127.0.0.1", port = 2121, FtpCredentials("username", "userpass"))
private val settings = UnsecureFtpSettings("127.0.0.1", port = 2121, FtpCredentials("username", "userpass"))

override def spec =
override def spec: Spec[TestEnvironment & Scope, Any] =
FtpSuite.spec("UnsecureFtpSpec", settings).provideSomeLayer[Scope](unsecure(settings)) @@ sequential
}

object FtpSuite {
val home = ZPath("ftp-home/ftp/home")
private val home = ZPath("ftp-home/ftp/home")

def spec(labelSuite: String, settings: UnsecureFtpSettings) =
suite(labelSuite)(
Expand Down

0 comments on commit 4892c58

Please sign in to comment.