Skip to content

Commit

Permalink
Merge pull request #237 from retronym/topic/xasync
Browse files Browse the repository at this point in the history
Use compiler integrated async phase under -Xasync
  • Loading branch information
retronym authored Jul 14, 2020
2 parents 8b3a647 + 4d33d8a commit 5d007a2
Show file tree
Hide file tree
Showing 55 changed files with 818 additions and 7,523 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import: scala/scala-dev:travis/default.yml
language: scala

scala:
- 2.12.11
- 2.13.2
- 2.12.12
- 2.13.3

env:
- ADOPTOPENJDK=8
Expand Down
126 changes: 64 additions & 62 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
# scala-async [![Build Status](https://travis-ci.org/scala/scala-async.svg?branch=master)](https://travis-ci.org/scala/scala-async) [<img src="https://img.shields.io/maven-central/v/org.scala-lang.modules/scala-async_2.12.svg?label=latest%20release%20for%202.12">](http://search.maven.org/#search%7Cga%7C1%7Cg%3Aorg.scala-lang.modules%20a%3Ascala-async_2.12) [<img src="https://img.shields.io/maven-central/v/org.scala-lang.modules/scala-async_2.13.svg?label=latest%20release%20for%202.13">](http://search.maven.org/#search%7Cga%7C1%7Cg%3Aorg.scala-lang.modules%20a%3Ascala-async_2.13)

## Supported Scala versions

This branch (version series 0.10.x) targets Scala 2.12 and 2.13. `scala-async` is no longer maintained for older versions.
A DSL to enable a direct style of programming with when composing values wrapped in Scala `Future`s.

## Quick start

To include scala-async in an existing project use the library published on Maven Central.
For sbt projects add the following to your build definition - build.sbt or project/Build.scala:

### Use a modern Scala compiler

As of scala-async 1.0, Scala 2.12.12+ or 2.13.3+ are required.

### Add dependency

#### SBT Example

```scala
libraryDependencies += "org.scala-lang.modules" %% "scala-async" % "0.10.0"
libraryDependencies += "org.scala-lang" % "scala-reflect" % scalaVersion.value % Provided
Expand All @@ -17,28 +23,58 @@ libraryDependencies += "org.scala-lang" % "scala-reflect" % scalaVersion.value %
For Maven projects add the following to your <dependencies> (make sure to use the correct Scala version suffix
to match your project’s Scala binary version):

#### Maven Example

```scala
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-async_2.12</artifactId>
<version>0.10.0</version>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-async_2.13</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.12.11</version>
<scope>provided</scope>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.13.3</version>
<scope>provided</scope>
</dependency>
```

After adding scala-async to your classpath, write your first `async` block:
### Enable compiler support for `async`

Add the `-Xasync` to the Scala compiler options.

#### SBT Example
```scala
scalaOptions += "-Xasync"
```

#### Maven Example

```xml
<project>
...
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.0</version>
<configuration>
<args>
<arg>-Xasync</arg>
</args>
</configuration>
</plugin>
...
</project>
```

### Start coding

```scala
import scala.concurrent.ExecutionContext.Implicits.global
import scala.async.Async.{async, await}

val future = async {
val f1 = async { ...; true }
val f1: Future[Boolean] = async { ...; true }
val f2 = async { ...; 42 }
if (await(f1)) await(f2) else 0
}
Expand Down Expand Up @@ -93,6 +129,22 @@ def combined: Future[Int] = async {
}
```

## Limitations

### `await` must be directly in the control flow of the async expression

The `await` cannot be nested under a local method, object, class or lambda:

```
async {
List(1).foreach { x => await(f(x) } // invali
}
```

### `await` must be not be nested within `try` / `catch` / `finally`.

This implementation restriction may be lifted in future versions.

## Comparison with direct use of `Future` API

This computation could also be expressed by directly using the
Expand All @@ -119,53 +171,3 @@ The `async` approach has two advantages over the use of
required at each generator (`<-`) in the for-comprehension.
This reduces the size of generated code, and can avoid boxing
of intermediate results.

## Comparison with CPS plugin

The existing continuations (CPS) plugin for Scala can also be used
to provide a syntactic layer like `async`. This approach has been
used in Akka's [Dataflow Concurrency](http://doc.akka.io/docs/akka/2.3-M1/scala/dataflow.html)
(now deprecated in favour of this library).

CPS-based rewriting of asynchronous code also produces a closure
for each suspension. It can also lead to type errors that are
difficult to understand.

## How it works

- The `async` macro analyses the block of code, looking for control
structures and locations of `await` calls. It then breaks the code
into 'chunks'. Each chunk contains a linear sequence of statements
that concludes with a branching decision, or with the registration
of a subsequent state handler as the continuation.
- Before this analysis and transformation, the program is normalized
into a form amenable to this manipulation. This is called the
"A Normal Form" (ANF), and roughly means that:
- `if` and `match` constructs are only used as statements;
they cannot be used as an expression.
- calls to `await` are not allowed in compound expressions.
- Identify vals, vars and defs that are accessed from multiple
states. These will be lifted out to fields in the state machine
object.
- Synthesize a class that holds:
- an integer representing the current state ID.
- the lifted definitions.
- an `apply(value: Try[Any]): Unit` method that will be
called on completion of each future. The behavior of
this method is determined by the current state. It records
the downcast result of the future in a field, and calls the
`resume()` method.
- the `resume(): Unit` method that switches on the current state
and runs the users code for one 'chunk', and either:
a) registers the state machine as the handler for the next future
b) completes the result Promise of the `async` block, if at the terminal state.
- an `apply(): Unit` method that starts the computation.

## Limitations

- See the [neg](https://github.com/scala/async/tree/master/src/test/scala/scala/async/neg) test cases
for constructs that are not allowed in an `async` block.
- See the [issue list](https://github.com/scala/async/issues?state=open) for which of these restrictions are planned
to be dropped in the future.
- See [#32](https://github.com/scala/async/issues/32) for why `await` is not possible in closures, and for suggestions on
ways to structure the code to work around this limitation.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ ScalaModulePlugin.scalaModuleOsgiSettings
name := "scala-async"

libraryDependencies += "org.scala-lang" % "scala-reflect" % scalaVersion.value % "provided"
libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value % "test" // for ToolBox
libraryDependencies += "junit" % "junit" % "4.12" % "test"
libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % "test"

ScalaModulePlugin.enableOptimizer
testOptions += Tests.Argument(TestFrameworks.JUnit, "-q", "-v", "-s")
scalacOptions in Test ++= Seq("-Yrangepos")
scalacOptions ++= List("-deprecation" , "-Xasync")

parallelExecution in Global := false

Expand Down
Empty file.
35 changes: 32 additions & 3 deletions src/main/scala/scala/async/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
package scala.async

import scala.language.experimental.macros
import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.{ExecutionContext, Future}
import scala.annotation.compileTimeOnly
import scala.reflect.macros.whitebox

/**
* Async blocks provide a direct means to work with [[scala.concurrent.Future]].
Expand Down Expand Up @@ -50,14 +51,42 @@ object Async {
* Run the block of code `body` asynchronously. `body` may contain calls to `await` when the results of
* a `Future` are needed; this is translated into non-blocking code.
*/
def async[T](body: => T)(implicit execContext: ExecutionContext): Future[T] = macro internal.ScalaConcurrentAsync.asyncImpl[T]
def async[T](body: => T)(implicit execContext: ExecutionContext): Future[T] = macro asyncImpl[T]

/**
* Non-blocking await the on result of `awaitable`. This may only be used directly within an enclosing `async` block.
*
* Internally, this will register the remainder of the code in enclosing `async` block as a callback
* in the `onComplete` handler of `awaitable`, and will *not* block a thread.
*/
@compileTimeOnly("`await` must be enclosed in an `async` block")
@compileTimeOnly("[async] `await` must be enclosed in an `async` block")
def await[T](awaitable: Future[T]): T = ??? // No implementation here, as calls to this are translated to `onComplete` by the macro.

def asyncImpl[T: c.WeakTypeTag](c: whitebox.Context)
(body: c.Tree)
(execContext: c.Tree): c.Tree = {
import c.universe._
if (!c.compilerSettings.contains("-Xasync")) {
c.abort(c.macroApplication.pos, "The async requires the compiler option -Xasync (supported only by Scala 2.12.12+ / 2.13.3+)")
} else try {
val awaitSym = typeOf[Async.type].decl(TermName("await"))
def mark(t: DefDef): Tree = {
import language.reflectiveCalls
c.internal.asInstanceOf[{
def markForAsyncTransform(owner: Symbol, method: DefDef, awaitSymbol: Symbol, config: Map[String, AnyRef]): DefDef
}].markForAsyncTransform(c.internal.enclosingOwner, t, awaitSym, Map.empty)
}
val name = TypeName("stateMachine$async")
q"""
final class $name extends _root_.scala.async.FutureStateMachine(${execContext}) {
// FSM translated method
${mark(q"""override def apply(tr$$async: _root_.scala.util.Try[_root_.scala.AnyRef]) = ${body}""")}
}
new $name().start() : ${c.macroApplication.tpe}
"""
} catch {
case e: ReflectiveOperationException =>
c.abort(c.macroApplication.pos, "-Xasync is provided as a Scala compiler option, but the async macro is unable to call c.internal.markForAsyncTransform. " + e.getClass.getName + " " + e.getMessage)
}
}
}
80 changes: 80 additions & 0 deletions src/main/scala/scala/async/FutureStateMachine.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Scala (https://www.scala-lang.org)
*
* Copyright EPFL and Lightbend, Inc.
*
* Licensed under Apache License 2.0
* (http://www.apache.org/licenses/LICENSE-2.0).
*
* See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.
*/
package scala.async

import java.util.Objects

import scala.util.{Failure, Success, Try}
import scala.concurrent.{ExecutionContext, Future, Promise}

/** The base class for state machines generated by the `scala.async.Async.async` macro.
* Not intended to be directly extended in user-written code.
*/
abstract class FutureStateMachine(execContext: ExecutionContext) extends Function1[Try[AnyRef], Unit] {
Objects.requireNonNull(execContext)

type F = scala.concurrent.Future[AnyRef]
type R = scala.util.Try[AnyRef]

private[this] val result$async: Promise[AnyRef] = Promise[AnyRef]();
private[this] var state$async: Int = 0

/** Retrieve the current value of the state variable */
protected def state: Int = state$async

/** Assign `i` to the state variable */
protected def state_=(s: Int): Unit = state$async = s

/** Complete the state machine with the given failure. */
// scala-async accidentally started catching NonFatal exceptions in:
// https://github.com/scala/scala-async/commit/e3ff0382ae4e015fc69da8335450718951714982#diff-136ab0b6ecaee5d240cd109e2b17ccb2R411
// This follows the new behaviour but should we fix the regression?
protected def completeFailure(t: Throwable): Unit = {
result$async.complete(Failure(t))
}

/** Complete the state machine with the given value. */
protected def completeSuccess(value: AnyRef): Unit = {
result$async.complete(Success(value))
}

/** Register the state machine as a completion callback of the given future. */
protected def onComplete(f: F): Unit = {
f.onComplete(this)(execContext)
}

/** Extract the result of the given future if it is complete, or `null` if it is incomplete. */
protected def getCompleted(f: F): Try[AnyRef] = {
if (f.isCompleted) {
f.value.get
} else null
}

/**
* Extract the success value of the given future. If the state machine detects a failure it may
* complete the async block and return `this` as a sentinel value to indicate that the caller
* (the state machine dispatch loop) should immediately exit.
*/
protected def tryGet(tr: R): AnyRef = tr match {
case Success(value) =>
value.asInstanceOf[AnyRef]
case Failure(throwable) =>
completeFailure(throwable)
this // sentinel value to indicate the dispatch loop should exit.
}

def start[T](): Future[T] = {
// This cast is safe because we know that `def apply` does not consult its argument when `state == 0`.
Future.unit.asInstanceOf[Future[AnyRef]].onComplete(this)(execContext)
result$async.future.asInstanceOf[Future[T]]
}
}
Loading

0 comments on commit 5d007a2

Please sign in to comment.