From 98cc204e57da7bb579e6bb50f1e05738c54fd8ad Mon Sep 17 00:00:00 2001 From: Yichen Xu Date: Tue, 21 Nov 2023 17:00:05 +0100 Subject: [PATCH] Capture check the `concurrent` package --- .../stdlib/concurrent/Awaitable.scala | 3 + .../stdlib/concurrent/BatchingExecutor.scala | 37 +++--- .../stdlib/concurrent/BlockContext.scala | 13 +- .../stdlib/concurrent/ExecutionContext.scala | 28 +++-- .../stdlib/concurrent/Future.scala | 111 +++++++++--------- .../stdlib/concurrent/Promise.scala | 12 +- .../stdlib/concurrent/duration/Deadline.scala | 2 + .../stdlib/concurrent/duration/Duration.scala | 2 + .../duration/DurationConversions.scala | 2 + .../stdlib/concurrent/duration/package.scala | 2 + .../impl/ExecutionContextImpl.scala | 16 +-- .../impl/FutureConvertersImpl.scala | 35 +++--- .../stdlib/concurrent/impl/Promise.scala | 38 +++--- .../stdlib/concurrent/package.scala | 6 +- 14 files changed, 172 insertions(+), 135 deletions(-) diff --git a/tests/pos-special/stdlib/concurrent/Awaitable.scala b/tests/pos-special/stdlib/concurrent/Awaitable.scala index d201a14570f2..b9a068417ff7 100644 --- a/tests/pos-special/stdlib/concurrent/Awaitable.scala +++ b/tests/pos-special/stdlib/concurrent/Awaitable.scala @@ -12,6 +12,8 @@ package scala.concurrent +import language.experimental.captureChecking + import scala.concurrent.duration.Duration @@ -27,6 +29,7 @@ import scala.concurrent.duration.Duration * occurred. */ trait Awaitable[+T] { + this: Awaitable[T]^ => /** * Await the "completed" state of this `Awaitable`. diff --git a/tests/pos-special/stdlib/concurrent/BatchingExecutor.scala b/tests/pos-special/stdlib/concurrent/BatchingExecutor.scala index d9ae17cc7064..0590a6533d95 100644 --- a/tests/pos-special/stdlib/concurrent/BatchingExecutor.scala +++ b/tests/pos-special/stdlib/concurrent/BatchingExecutor.scala @@ -16,16 +16,21 @@ import java.util.concurrent.Executor import java.util.Objects import scala.util.control.NonFatal import scala.annotation.{switch, tailrec} +import scala.annotation.unchecked.uncheckedCaptures + +import language.experimental.captureChecking /** * Marker trait to indicate that a Runnable is Batchable by BatchingExecutors */ trait Batchable { - self: Runnable => + self: Runnable^ => } private[concurrent] object BatchingExecutorStatics { - final val emptyBatchArray: Array[Runnable] = new Array[Runnable](0) + // !cc! It is okay to elide sealed checking here, + // as `emptyBatchArray` will not be mutated. + final val emptyBatchArray: Array[(Runnable^) @uncheckedCaptures] = new Array[(Runnable^) @uncheckedCaptures](0) // Max number of Runnables executed nested before starting to batch (to prevent stack exhaustion) final val syncPreBatchDepth = 16 @@ -97,24 +102,24 @@ private[concurrent] trait BatchingExecutor extends Executor { * In order to conserve allocations, the first element in the batch is stored "unboxed" in * the `first` field. Subsequent Runnables are stored in the array called `other`. */ - private[this] sealed abstract class AbstractBatch protected (protected final var first: Runnable, protected final var other: Array[Runnable], protected final var size: Int) { + private[this] sealed abstract class AbstractBatch protected (protected final var first: Runnable^, protected final var other: Array[(Runnable^) @uncheckedCaptures], protected final var size: Int) { - private[this] final def ensureCapacity(curSize: Int): Array[Runnable] = { - val curOther = this.other + private[this] final def ensureCapacity(curSize: Int): Array[(Runnable^) @uncheckedCaptures] = { + val curOther: Array[(Runnable^) @uncheckedCaptures] = this.other val curLen = curOther.length if (curSize <= curLen) curOther else { val newLen = if (curLen == 0) 4 else curLen << 1 if (newLen <= curLen) throw new StackOverflowError("Space limit of asynchronous stack reached: " + curLen) - val newOther = new Array[Runnable](newLen) + val newOther: Array[(Runnable^) @uncheckedCaptures] = new Array[(Runnable^) @uncheckedCaptures](newLen) System.arraycopy(curOther, 0, newOther, 0, curLen) this.other = newOther newOther } } - final def push(r: Runnable): Unit = { + final def push(r: Runnable^): Unit = { val sz = this.size if(sz == 0) this.first = r @@ -143,10 +148,10 @@ private[concurrent] trait BatchingExecutor extends Executor { } } - private[this] final class AsyncBatch private(_first: Runnable, _other: Array[Runnable], _size: Int) extends AbstractBatch(_first, _other, _size) with Runnable with BlockContext with (BlockContext => Throwable) { - private[this] final var parentBlockContext: BlockContext = BatchingExecutorStatics.MissingParentBlockContext + private[this] final class AsyncBatch private(_first: Runnable^, _other: Array[(Runnable^) @uncheckedCaptures], _size: Int) extends AbstractBatch(_first, _other, _size) with Runnable with BlockContext with (BlockContext^ -> Throwable) { this: AsyncBatch^ => + private[this] final var parentBlockContext: (BlockContext^) @uncheckedCaptures = BatchingExecutorStatics.MissingParentBlockContext - final def this(runnable: Runnable) = this(runnable, BatchingExecutorStatics.emptyBatchArray, 1) + final def this(runnable: Runnable^) = this(runnable, BatchingExecutorStatics.emptyBatchArray, 1) override final def run(): Unit = { _tasksLocal.set(this) // This is later cleared in `apply` or `runWithoutResubmit` @@ -158,7 +163,7 @@ private[concurrent] trait BatchingExecutor extends Executor { } /* LOGIC FOR ASYNCHRONOUS BATCHES */ - override final def apply(prevBlockContext: BlockContext): Throwable = try { + override final def apply(prevBlockContext: BlockContext^): Throwable = try { parentBlockContext = prevBlockContext runN(BatchingExecutorStatics.runLimit) null @@ -186,7 +191,7 @@ private[concurrent] trait BatchingExecutor extends Executor { } } else cause // TODO: consider if NonFatals should simply be `reportFailure`:ed rather than rethrown - private[this] final def cloneAndClear(): AsyncBatch = { + private[this] final def cloneAndClear(): AsyncBatch^{this} = { val newBatch = new AsyncBatch(this.first, this.other, this.size) this.first = null this.other = BatchingExecutorStatics.emptyBatchArray @@ -203,7 +208,7 @@ private[concurrent] trait BatchingExecutor extends Executor { } } - private[this] final class SyncBatch(runnable: Runnable) extends AbstractBatch(runnable, BatchingExecutorStatics.emptyBatchArray, 1) with Runnable { + private[this] final class SyncBatch(runnable: Runnable^) extends AbstractBatch(runnable, BatchingExecutorStatics.emptyBatchArray, 1) with Runnable { @tailrec override final def run(): Unit = { try runN(BatchingExecutorStatics.runLimit) catch { case ie: InterruptedException => @@ -221,7 +226,7 @@ private[concurrent] trait BatchingExecutor extends Executor { * When implementing a sync BatchingExecutor, it is RECOMMENDED * to implement this method as `runnable.run()` */ - protected def submitForExecution(runnable: Runnable): Unit + protected def submitForExecution(runnable: Runnable^): Unit /** Reports that an asynchronous computation failed. * See `ExecutionContext.reportFailure(throwable: Throwable)` @@ -232,7 +237,7 @@ private[concurrent] trait BatchingExecutor extends Executor { * WARNING: Never use both `submitAsyncBatched` and `submitSyncBatched` in the same * implementation of `BatchingExecutor` */ - protected final def submitAsyncBatched(runnable: Runnable): Unit = { + protected final def submitAsyncBatched(runnable: Runnable^): Unit = { val b = _tasksLocal.get if (b.isInstanceOf[AsyncBatch]) b.asInstanceOf[AsyncBatch].push(runnable) else submitForExecution(new AsyncBatch(runnable)) @@ -242,7 +247,7 @@ private[concurrent] trait BatchingExecutor extends Executor { * WARNING: Never use both `submitAsyncBatched` and `submitSyncBatched` in the same * implementation of `BatchingExecutor` */ - protected final def submitSyncBatched(runnable: Runnable): Unit = { + protected final def submitSyncBatched(runnable: Runnable^): Unit = { Objects.requireNonNull(runnable, "runnable is null") val tl = _tasksLocal val b = tl.get diff --git a/tests/pos-special/stdlib/concurrent/BlockContext.scala b/tests/pos-special/stdlib/concurrent/BlockContext.scala index e282a4125c33..eb70968072b0 100644 --- a/tests/pos-special/stdlib/concurrent/BlockContext.scala +++ b/tests/pos-special/stdlib/concurrent/BlockContext.scala @@ -12,6 +12,9 @@ package scala.concurrent +import language.experimental.captureChecking +import scala.annotation.unchecked.uncheckedCaptures + /** * A context to be notified by [[scala.concurrent.blocking]] when * a thread is about to block. In effect this trait provides @@ -68,9 +71,9 @@ object BlockContext { **/ final def defaultBlockContext: BlockContext = DefaultBlockContext - private[this] final val contextLocal = new ThreadLocal[BlockContext]() + private[this] final val contextLocal = new ThreadLocal[(BlockContext^) @uncheckedCaptures]() - private[this] final def prefer(candidate: BlockContext): BlockContext = + private[this] final def prefer(candidate: BlockContext^): BlockContext^ = if (candidate ne null) candidate else { val t = Thread.currentThread @@ -81,12 +84,12 @@ object BlockContext { /** * @return the `BlockContext` that would be used for the current `java.lang.Thread` at this point **/ - final def current: BlockContext = prefer(contextLocal.get) + final def current: BlockContext^ = prefer(contextLocal.get) /** * Installs a current `BlockContext` around executing `body`. **/ - final def withBlockContext[T](blockContext: BlockContext)(body: => T): T = { + final def withBlockContext[T](blockContext: BlockContext^)(body: => T): T = { val old = contextLocal.get // can be null if (old eq blockContext) body else { @@ -99,7 +102,7 @@ object BlockContext { * Installs the BlockContext `blockContext` around the invocation to `f` and passes in the previously installed BlockContext to `f`. * @return the value produced by applying `f` **/ - final def usingBlockContext[I, T](blockContext: BlockContext)(f: BlockContext => T): T = { + final def usingBlockContext[I, T](blockContext: BlockContext^)(f: BlockContext^ => T): T = { val old = contextLocal.get // can be null if (old eq blockContext) f(prefer(old)) else { diff --git a/tests/pos-special/stdlib/concurrent/ExecutionContext.scala b/tests/pos-special/stdlib/concurrent/ExecutionContext.scala index 41dfbb609816..b617ff1cc6b8 100644 --- a/tests/pos-special/stdlib/concurrent/ExecutionContext.scala +++ b/tests/pos-special/stdlib/concurrent/ExecutionContext.scala @@ -16,6 +16,8 @@ package scala.concurrent import java.util.concurrent.{ ExecutorService, Executor } import scala.annotation.implicitNotFound +import language.experimental.captureChecking + /** * An `ExecutionContext` can execute program logic asynchronously, * typically but not necessarily on a thread pool. @@ -39,7 +41,7 @@ import scala.annotation.implicitNotFound * `scala.concurrent.ExecutionContext.Implicits.global`. * The recommended approach is to add `(implicit ec: ExecutionContext)` to methods, * or class constructor parameters, which need an `ExecutionContext`. - * + * * Then locally import a specific `ExecutionContext` in one place for the entire * application or module, passing it implicitly to individual methods. * Alternatively define a local implicit val with the required `ExecutionContext`. @@ -69,13 +71,13 @@ consider using Scala's global ExecutionContext by defining the following: implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global""") -trait ExecutionContext { +trait ExecutionContext { this: ExecutionContext^ => /** Runs a block of code on this execution context. * * @param runnable the task to execute */ - def execute(runnable: Runnable): Unit + def execute(runnable: Runnable^): Unit /** Reports that an asynchronous computation failed. * @@ -101,7 +103,7 @@ trait ExecutionContext { */ @deprecated("preparation of ExecutionContexts will be removed", "2.12.0") // This cannot be removed until there is a suitable replacement - def prepare(): ExecutionContext = this + def prepare(): ExecutionContext^{this} = this } /** @@ -212,15 +214,15 @@ object ExecutionContext { * * Do *not* call any blocking code in the `Runnable`s submitted to this `ExecutionContext` * as it will prevent progress by other enqueued `Runnable`s and the calling `Thread`. - * + * * Symptoms of misuse of this `ExecutionContext` include, but are not limited to, deadlocks * and severe performance problems. * * Any `NonFatal` or `InterruptedException`s will be reported to the `defaultReporter`. */ object parasitic extends ExecutionContextExecutor with BatchingExecutor { - override final def submitForExecution(runnable: Runnable): Unit = runnable.run() - override final def execute(runnable: Runnable): Unit = submitSyncBatched(runnable) + override final def submitForExecution(runnable: Runnable^): Unit = runnable.run() + override final def execute(runnable: Runnable^): Unit = submitSyncBatched(runnable) override final def reportFailure(t: Throwable): Unit = defaultReporter(t) } @@ -228,9 +230,9 @@ object ExecutionContext { * See [[ExecutionContext.global]]. */ private[scala] lazy val opportunistic: ExecutionContextExecutor = new ExecutionContextExecutor with BatchingExecutor { - final override def submitForExecution(runnable: Runnable): Unit = global.execute(runnable) + final override def submitForExecution(runnable: Runnable^): Unit = global.execute(runnable) - final override def execute(runnable: Runnable): Unit = + final override def execute(runnable: Runnable^): Unit = if ((!runnable.isInstanceOf[impl.Promise.Transformation[_,_]] || runnable.asInstanceOf[impl.Promise.Transformation[_,_]].benefitsFromBatching) && runnable.isInstanceOf[Batchable]) submitAsyncBatched(runnable) else @@ -253,7 +255,7 @@ object ExecutionContext { * @param reporter a function for error reporting * @return the `ExecutionContext` using the given `ExecutorService` */ - def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService = + def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService^{reporter} = impl.ExecutionContextImpl.fromExecutorService(e, reporter) /** Creates an `ExecutionContext` from the given `ExecutorService` with the [[scala.concurrent.ExecutionContext$.defaultReporter default reporter]]. @@ -269,7 +271,7 @@ object ExecutionContext { * @param e the `ExecutorService` to use. If `null`, a new `ExecutorService` is created with [[scala.concurrent.ExecutionContext$.global default configuration]]. * @return the `ExecutionContext` using the given `ExecutorService` */ - def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService = fromExecutorService(e, defaultReporter) + def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService^ = fromExecutorService(e, defaultReporter) /** Creates an `ExecutionContext` from the given `Executor`. * @@ -277,7 +279,7 @@ object ExecutionContext { * @param reporter a function for error reporting * @return the `ExecutionContext` using the given `Executor` */ - def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor = + def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor^{reporter} = impl.ExecutionContextImpl.fromExecutor(e, reporter) /** Creates an `ExecutionContext` from the given `Executor` with the [[scala.concurrent.ExecutionContext$.defaultReporter default reporter]]. @@ -285,7 +287,7 @@ object ExecutionContext { * @param e the `Executor` to use. If `null`, a new `Executor` is created with [[scala.concurrent.ExecutionContext$.global default configuration]]. * @return the `ExecutionContext` using the given `Executor` */ - def fromExecutor(e: Executor): ExecutionContextExecutor = fromExecutor(e, defaultReporter) + def fromExecutor(e: Executor): ExecutionContextExecutor^ = fromExecutor(e, defaultReporter) /** The default reporter simply prints the stack trace of the `Throwable` to [[java.lang.System#err System.err]]. * diff --git a/tests/pos-special/stdlib/concurrent/Future.scala b/tests/pos-special/stdlib/concurrent/Future.scala index 1be6130db645..ca06c8e5fff4 100644 --- a/tests/pos-special/stdlib/concurrent/Future.scala +++ b/tests/pos-special/stdlib/concurrent/Future.scala @@ -24,6 +24,9 @@ import scala.reflect.ClassTag import scala.concurrent.ExecutionContext.parasitic +import language.experimental.captureChecking +import caps.unsafe.unsafeAssumePure + /** A `Future` represents a value which may or may not be currently available, * but will be available at some point, or an exception if that value could not be made available. * @@ -104,7 +107,7 @@ import scala.concurrent.ExecutionContext.parasitic * `execute()` either immediately or asynchronously. * Completion of the Future must *happen-before* the invocation of the callback. */ -trait Future[+T] extends Awaitable[T] { +trait Future[+T] extends Awaitable[T] { this: Future[T]^ => /* Callbacks */ @@ -166,7 +169,7 @@ trait Future[+T] extends Awaitable[T] { * @return a failed projection of this `Future`. * @group Transformations */ - def failed: Future[Throwable] = transform(Future.failedFun)(parasitic) + def failed: Future[Throwable]^{this} = transform(Future.failedFun)(parasitic) /* Monadic operations */ @@ -195,7 +198,7 @@ trait Future[+T] extends Awaitable[T] { * @return a `Future` that will be completed with the transformed value * @group Transformations */ - def transform[S](s: T => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] = + def transform[S](s: T => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S]^{this, s, f} = transform { t => if (t.isInstanceOf[Success[T]]) t map s @@ -211,7 +214,7 @@ trait Future[+T] extends Awaitable[T] { * @return a `Future` that will be completed with the transformed value * @group Transformations */ - def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] + def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S]^{this, f} /** Creates a new Future by applying the specified function, which produces a Future, to the result * of this Future. If there is any non-fatal exception thrown when 'f' @@ -222,7 +225,7 @@ trait Future[+T] extends Awaitable[T] { * @return a `Future` that will be completed with the transformed value * @group Transformations */ - def transformWith[S](f: Try[T] => Future[S])(implicit executor: ExecutionContext): Future[S] + def transformWith[S](f: Try[T] => Future[S]^)(implicit executor: ExecutionContext): Future[S]^{this, f} /** Creates a new future by applying a function to the successful result of @@ -246,7 +249,7 @@ trait Future[+T] extends Awaitable[T] { * @return a `Future` which will be completed with the result of the application of the function * @group Transformations */ - def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = transform(_ map f) + def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S]^{this, f} = transform(_ map f) /** Creates a new future by applying a function to the successful result of * this future, and returns the result of the function as the new future. @@ -260,10 +263,10 @@ trait Future[+T] extends Awaitable[T] { * @return a `Future` which will be completed with the result of the application of the function * @group Transformations */ - def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = transformWith { + def flatMap[S](f: T => Future[S]^)(implicit executor: ExecutionContext): Future[S]^{this, f} = transformWith { t => if(t.isInstanceOf[Success[T]]) f(t.asInstanceOf[Success[T]].value) - else this.asInstanceOf[Future[S]] // Safe cast + else this.asInstanceOf[Future[S]^{this}] // Safe cast } /** Creates a new future with one level of nesting flattened, this method is equivalent @@ -272,7 +275,7 @@ trait Future[+T] extends Awaitable[T] { * @tparam S the type of the returned `Future` * @group Transformations */ - def flatten[S](implicit ev: T <:< Future[S]): Future[S] = flatMap(ev)(parasitic) + def flatten[S](implicit ev: T <:< Future[S]): Future[S]^{this} = flatMap(ev)(parasitic) /** Creates a new future by filtering the value of the current future with a predicate. * @@ -294,7 +297,7 @@ trait Future[+T] extends Awaitable[T] { * @return a `Future` which will hold the successful result of this `Future` if it matches the predicate or a `NoSuchElementException` * @group Transformations */ - def filter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = + def filter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T]^{this, p} = transform { t => if (t.isInstanceOf[Success[T]]) { @@ -306,7 +309,7 @@ trait Future[+T] extends Awaitable[T] { /** Used by for-comprehensions. * @group Transformations */ - final def withFilter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = filter(p)(executor) + final def withFilter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T]^{this, p} = filter(p)(executor) /** Creates a new future by mapping the value of the current future, if the given partial function is defined at that value. * @@ -333,7 +336,7 @@ trait Future[+T] extends Awaitable[T] { * @return a `Future` holding the result of application of the `PartialFunction` or a `NoSuchElementException` * @group Transformations */ - def collect[S](pf: PartialFunction[T, S])(implicit executor: ExecutionContext): Future[S] = + def collect[S](pf: PartialFunction[T, S]^)(implicit executor: ExecutionContext): Future[S]^{this, pf} = transform { t => if (t.isInstanceOf[Success[T]]) @@ -358,7 +361,7 @@ trait Future[+T] extends Awaitable[T] { * @return a `Future` with the successful value of this `Future` or the result of the `PartialFunction` * @group Transformations */ - def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = + def recover[U >: T](pf: PartialFunction[Throwable, U]^)(implicit executor: ExecutionContext): Future[U]^{this, pf} = transform { _ recover pf } /** Creates a new future that will handle any matching throwable that this @@ -379,7 +382,7 @@ trait Future[+T] extends Awaitable[T] { * @return a `Future` with the successful value of this `Future` or the outcome of the `Future` returned by the `PartialFunction` * @group Transformations */ - def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = + def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]^]^)(implicit executor: ExecutionContext): Future[U]^{this, pf} = transformWith { t => if (t.isInstanceOf[Failure[T]]) { @@ -403,7 +406,7 @@ trait Future[+T] extends Awaitable[T] { * @return a `Future` with the results of both futures or the failure of the first of them that failed * @group Transformations */ - def zip[U](that: Future[U]): Future[(T, U)] = + def zip[U](that: Future[U]^): Future[(T, U)]^{this, that} = zipWith(that)(Future.zipWithTuple2Fun)(parasitic) /** Zips the values of `this` and `that` future using a function `f`, @@ -422,7 +425,7 @@ trait Future[+T] extends Awaitable[T] { * @return a `Future` with the result of the application of `f` to the results of `this` and `that` * @group Transformations */ - def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = { + def zipWith[U, R](that: Future[U]^)(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R]^{this, that, f} = { // This is typically overriden by the implementation in DefaultPromise, which provides // symmetric fail-fast behavior regardless of which future fails first. // @@ -450,7 +453,7 @@ trait Future[+T] extends Awaitable[T] { * @return a `Future` with the successful result of this or that `Future` or the failure of this `Future` if both fail * @group Transformations */ - def fallbackTo[U >: T](that: Future[U]): Future[U] = + def fallbackTo[U >: T](that: Future[U]^): Future[U]^{this, that} = if (this eq that) this else { implicit val ec = parasitic @@ -469,7 +472,7 @@ trait Future[+T] extends Awaitable[T] { * @return a `Future` holding the casted result of this `Future` or a `ClassCastException` otherwise * @group Transformations */ - def mapTo[S](implicit tag: ClassTag[S]): Future[S] = { + def mapTo[S](implicit tag: ClassTag[S]): Future[S]^{this} = { implicit val ec = parasitic val boxedClass = { val c = tag.runtimeClass @@ -509,7 +512,7 @@ trait Future[+T] extends Awaitable[T] { * @return a `Future` which will be completed with the exact same outcome as this `Future` but after the `PartialFunction` has been executed. * @group Callbacks */ - def andThen[U](pf: PartialFunction[Try[T], U])(implicit executor: ExecutionContext): Future[T] = + def andThen[U](pf: PartialFunction[Try[T], U]^)(implicit executor: ExecutionContext): Future[T]^{this, pf} = transform { result => try pf.applyOrElse[Try[T], Any](result, Future.id[Try[T]]) @@ -563,15 +566,15 @@ object Future { private[this] final val _failedFun: Try[Any] => Try[Throwable] = v => if (v.isInstanceOf[Failure[Any]]) Success(v.asInstanceOf[Failure[Any]].exception) else failedFailure - private[concurrent] final def failedFun[T]: Try[T] => Try[Throwable] = _failedFun.asInstanceOf[Try[T] => Try[Throwable]] + private[concurrent] final def failedFun[T]: Try[T] -> Try[Throwable] = _failedFun.asInstanceOf[Try[T] -> Try[Throwable]] private[concurrent] final val recoverWithFailedMarker: Future[Nothing] = scala.concurrent.Future.failed(new Throwable with NoStackTrace) private[concurrent] final val recoverWithFailed = (t: Throwable) => recoverWithFailedMarker - private[this] final val _zipWithTuple2: (Any, Any) => (Any, Any) = Tuple2.apply _ - private[concurrent] final def zipWithTuple2Fun[T,U] = _zipWithTuple2.asInstanceOf[(T,U) => (T,U)] + private[this] final val _zipWithTuple2: (Any, Any) -> (Any, Any) = Tuple2.apply _ + private[concurrent] final def zipWithTuple2Fun[T,U] = _zipWithTuple2.asInstanceOf[(T,U) -> (T,U)] private[this] final val _addToBuilderFun: (Builder[Any, Nothing], Any) => Builder[Any, Nothing] = (b: Builder[Any, Nothing], e: Any) => b += e private[concurrent] final def addToBuilderFun[A, M] = _addToBuilderFun.asInstanceOf[Function2[Builder[A, M], A, Builder[A, M]]] @@ -622,19 +625,19 @@ object Future { override final def foreach[U](f: Nothing => U)(implicit executor: ExecutionContext): Unit = () override final def transform[S](s: Nothing => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] = this override final def transform[S](f: Try[Nothing] => Try[S])(implicit executor: ExecutionContext): Future[S] = this - override final def transformWith[S](f: Try[Nothing] => Future[S])(implicit executor: ExecutionContext): Future[S] = this - override final def map[S](f: Nothing => S)(implicit executor: ExecutionContext): Future[S] = this - override final def flatMap[S](f: Nothing => Future[S])(implicit executor: ExecutionContext): Future[S] = this + override final def transformWith[S](f: Try[Nothing] => Future[S]^)(implicit executor: ExecutionContext): Future[S]^{f} = this + override final def map[S](f: Nothing => S)(implicit executor: ExecutionContext): Future[S]^{f} = this + override final def flatMap[S](f: Nothing => Future[S]^)(implicit executor: ExecutionContext): Future[S]^{f} = this override final def flatten[S](implicit ev: Nothing <:< Future[S]): Future[S] = this override final def filter(p: Nothing => Boolean)(implicit executor: ExecutionContext): Future[Nothing] = this - override final def collect[S](pf: PartialFunction[Nothing, S])(implicit executor: ExecutionContext): Future[S] = this - override final def recover[U >: Nothing](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = this - override final def recoverWith[U >: Nothing](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = this - override final def zip[U](that: Future[U]): Future[(Nothing, U)] = this - override final def zipWith[U, R](that: Future[U])(f: (Nothing, U) => R)(implicit executor: ExecutionContext): Future[R] = this - override final def fallbackTo[U >: Nothing](that: Future[U]): Future[U] = this + override final def collect[S](pf: PartialFunction[Nothing, S]^)(implicit executor: ExecutionContext): Future[S]^{pf} = this + override final def recover[U >: Nothing](pf: PartialFunction[Throwable, U]^)(implicit executor: ExecutionContext): Future[U]^{pf} = this + override final def recoverWith[U >: Nothing](pf: PartialFunction[Throwable, Future[U]^]^)(implicit executor: ExecutionContext): Future[U]^{pf} = this + override final def zip[U](that: Future[U]^): Future[(Nothing, U)]^{that} = this + override final def zipWith[U, R](that: Future[U]^)(f: (Nothing, U) => R)(implicit executor: ExecutionContext): Future[R]^{that, f} = this + override final def fallbackTo[U >: Nothing](that: Future[U]^): Future[U]^{that} = this override final def mapTo[S](implicit tag: ClassTag[S]): Future[S] = this - override final def andThen[U](pf: PartialFunction[Try[Nothing], U])(implicit executor: ExecutionContext): Future[Nothing] = this + override final def andThen[U](pf: PartialFunction[Try[Nothing], U]^)(implicit executor: ExecutionContext): Future[Nothing]^{pf} = this override final def toString: String = "Future()" } @@ -656,7 +659,7 @@ object Future { * @param result the given successful value * @return the newly created `Future` instance */ - final def successful[T](result: T): Future[T] = Promise.successful(result).future + final def successful[T](result: T): Future[T] = Promise.successful[T](result).future /** Creates an already completed Future with the specified result or exception. * @@ -664,7 +667,7 @@ object Future { * @param result the result of the returned `Future` instance * @return the newly created `Future` instance */ - final def fromTry[T](result: Try[T]): Future[T] = Promise.fromTry(result).future + final def fromTry[T](result: Try[T]): Future[T] = Promise.fromTry[T](result).future /** Starts an asynchronous computation and returns a `Future` instance with the result of that computation. * @@ -683,7 +686,7 @@ object Future { * @param executor the execution context on which the future is run * @return the `Future` holding the result of the computation */ - final def apply[T](body: => T)(implicit executor: ExecutionContext): Future[T] = + final def apply[T](body: => T)(implicit executor: ExecutionContext): Future[T]^{body} = unit.map(_ => body) /** Starts an asynchronous computation and returns a `Future` instance with the result of that computation once it completes. @@ -703,7 +706,7 @@ object Future { * @param executor the execution context on which the `body` is evaluated in * @return the `Future` holding the result of the computation */ - final def delegate[T](body: => Future[T])(implicit executor: ExecutionContext): Future[T] = + final def delegate[T](body: => Future[T]^)(implicit executor: ExecutionContext): Future[T]^{body} = unit.flatMap(_ => body) /** Simple version of `Future.traverse`. Asynchronously and non-blockingly transforms, in essence, a `IterableOnce[Future[A]]` @@ -715,8 +718,8 @@ object Future { * @param in the `IterableOnce` of Futures which will be sequenced * @return the `Future` of the resulting collection */ - final def sequence[A, CC[X] <: IterableOnce[X], To](in: CC[Future[A]])(implicit bf: BuildFrom[CC[Future[A]], A, To], executor: ExecutionContext): Future[To] = - in.iterator.foldLeft(successful(bf.newBuilder(in))) { + final def sequence[A, CC[+X] <: IterableOnce[X], To](in: CC[Future[A]^]^)(implicit bf: BuildFrom[CC[Future[A]^], A, To], executor: ExecutionContext): Future[To]^{in*} = + in.iterator.foldLeft(successful(bf.newBuilder(in.unsafeAssumePure))) { (fr, fa) => fr.zipWith(fa)(Future.addToBuilderFun) }.map(_.result())(if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic) @@ -727,19 +730,20 @@ object Future { * @param futures the `IterableOnce` of Futures in which to find the first completed * @return the `Future` holding the result of the future that is first to be completed */ - final def firstCompletedOf[T](futures: IterableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = { + final def firstCompletedOf[T](futures: IterableOnce[Future[T]^])(implicit executor: ExecutionContext): Future[T]^ = { val i = futures.iterator if (!i.hasNext) Future.never else { - val p = Promise[T]() - val firstCompleteHandler = new AtomicReference[Promise[T]](p) with (Try[T] => Unit) { + val p: Promise[T]^ = Promise[T]() + // !cc! Try[T] -> Unit is a workaround for #18857. It should probably be Try[T] => Unit if this is a capability. + val firstCompleteHandler = new AtomicReference[Promise[T]^{p}](p) with (Try[T] -> Unit) { override final def apply(v1: Try[T]): Unit = { val r = getAndSet(null) if (r ne null) r tryComplete v1 // tryComplete is likely to be cheaper than complete } } - while(i.hasNext && firstCompleteHandler.get != null) // exit early if possible + while(i.hasNext && firstCompleteHandler.asInstanceOf[AtomicReference[Promise[T]]].get != null) // exit early if possible i.next().onComplete(firstCompleteHandler) p.future } @@ -753,8 +757,8 @@ object Future { * @param p the predicate which indicates if it's a match * @return the `Future` holding the optional result of the search */ - final def find[T](futures: scala.collection.immutable.Iterable[Future[T]])(p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = { - def searchNext(i: Iterator[Future[T]]): Future[Option[T]] = + final def find[T](futures: scala.collection.immutable.Iterable[Future[T]^]^)(p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]]^ = { + def searchNext(i: Iterator[Future[T]^]^): Future[Option[T]]^ = if (!i.hasNext) successful(None) else i.next().transformWith { case Success(r) if p(r) => successful(Some(r)) @@ -782,10 +786,10 @@ object Future { * @param op the fold operation to be applied to the zero and futures * @return the `Future` holding the result of the fold */ - final def foldLeft[T, R](futures: scala.collection.immutable.Iterable[Future[T]])(zero: R)(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = + final def foldLeft[T, R](futures: scala.collection.immutable.Iterable[Future[T]^]^)(zero: R)(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R]^ = foldNext(futures.iterator, zero, op) - private[this] final def foldNext[T, R](i: Iterator[Future[T]], prevValue: R, op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = + private[this] final def foldNext[T, R](i: Iterator[Future[T]^]^, prevValue: R, op: (R, T) => R)(implicit executor: ExecutionContext): Future[R]^ = if (!i.hasNext) successful(prevValue) else i.next().flatMap { value => foldNext(i, op(prevValue, value), op) } @@ -808,8 +812,9 @@ object Future { */ @deprecated("use Future.foldLeft instead", "2.12.0") // not removed in 2.13, to facilitate 2.11/2.12/2.13 cross-building; remove further down the line (see scala/scala#6319) - def fold[T, R](futures: IterableOnce[Future[T]])(zero: R)(@deprecatedName("foldFun") op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = - if (futures.isEmpty) successful(zero) + def fold[T, R](futures: IterableOnce[Future[T]^]^)(zero: R)(@deprecatedName("foldFun") op: (R, T) => R)(implicit executor: ExecutionContext): Future[R]^ = + // !cc! without `.iterator` it doesn't compile + if (futures.iterator.isEmpty) successful(zero) else sequence(futures)(ArrayBuffer, executor).map(_.foldLeft(zero)(op)) /** Initiates a non-blocking, asynchronous, fold over the supplied futures @@ -827,8 +832,8 @@ object Future { */ @deprecated("use Future.reduceLeft instead", "2.12.0") // not removed in 2.13, to facilitate 2.11/2.12/2.13 cross-building; remove further down the line (see scala/scala#6319) - final def reduce[T, R >: T](futures: IterableOnce[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = - if (futures.isEmpty) failed(new NoSuchElementException("reduce attempted on empty collection")) + final def reduce[T, R >: T](futures: IterableOnce[Future[T]^]^)(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R]^ = + if (futures.iterator.isEmpty) failed(new NoSuchElementException("reduce attempted on empty collection")) else sequence(futures)(ArrayBuffer, executor).map(_ reduceLeft op) /** Initiates a non-blocking, asynchronous, left reduction over the supplied futures @@ -844,7 +849,7 @@ object Future { * @param op the reduce operation which is applied to the results of the futures * @return the `Future` holding the result of the reduce */ - final def reduceLeft[T, R >: T](futures: scala.collection.immutable.Iterable[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { + final def reduceLeft[T, R >: T](futures: scala.collection.immutable.Iterable[Future[T]^]^)(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R]^ = { val i = futures.iterator if (!i.hasNext) failed(new NoSuchElementException("reduceLeft attempted on empty collection")) else i.next() flatMap { v => foldNext(i, v, op) } @@ -865,9 +870,9 @@ object Future { * @param fn the function to be mapped over the collection to produce a collection of Futures * @return the `Future` of the collection of results */ - final def traverse[A, B, M[X] <: IterableOnce[X]](in: M[A])(fn: A => Future[B])(implicit bf: BuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = + final def traverse[A, B, M[X] <: IterableOnce[X]](in: M[A]^)(fn: A => Future[B]^)(implicit bf: BuildFrom[M[A]^, B, M[B]], executor: ExecutionContext): Future[M[B]]^{fn*} = in.iterator.foldLeft(successful(bf.newBuilder(in))) { - (fr, a) => fr.zipWith(fn(a))(Future.addToBuilderFun) + (fr, a) => fr.zipWith[B, Builder[B, M[B]]](fn(a))(Future.addToBuilderFun) }.map(_.result())(if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic) } diff --git a/tests/pos-special/stdlib/concurrent/Promise.scala b/tests/pos-special/stdlib/concurrent/Promise.scala index e13541b17bc7..5b02544b3e09 100644 --- a/tests/pos-special/stdlib/concurrent/Promise.scala +++ b/tests/pos-special/stdlib/concurrent/Promise.scala @@ -14,11 +14,13 @@ package scala.concurrent import scala.util.{ Try, Success, Failure } +import language.experimental.captureChecking + /** Promise is an object which can be completed with a value or failed * with an exception. * - * A promise should always eventually be completed, whether for success or failure, - * in order to avoid unintended resource retention for any associated Futures' + * A promise should always eventually be completed, whether for success or failure, + * in order to avoid unintended resource retention for any associated Futures' * callbacks or transformations. * * @define promiseCompletion @@ -33,10 +35,10 @@ import scala.util.{ Try, Success, Failure } * @define nonDeterministic * Note: Using this method may result in non-deterministic concurrent programs. */ -trait Promise[T] { +trait Promise[T] { this: Promise[T]^ => /** Future containing the value of this promise. */ - def future: Future[T] + def future: Future[T]^ /** Returns whether the promise has already been completed with * a value or an exception. @@ -68,7 +70,7 @@ trait Promise[T] { * * @return This promise */ - def completeWith(other: Future[T]): this.type = { + def completeWith(other: Future[T]^): this.type = { if (other ne this.future) // this tryCompleteWith this doesn't make much sense other.onComplete(this tryComplete _)(ExecutionContext.parasitic) diff --git a/tests/pos-special/stdlib/concurrent/duration/Deadline.scala b/tests/pos-special/stdlib/concurrent/duration/Deadline.scala index 9a81d1423910..63f1be66764b 100644 --- a/tests/pos-special/stdlib/concurrent/duration/Deadline.scala +++ b/tests/pos-special/stdlib/concurrent/duration/Deadline.scala @@ -12,6 +12,8 @@ package scala.concurrent.duration +import language.experimental.captureChecking + /** * This class stores a deadline, as obtained via `Deadline.now` or the * duration DSL: diff --git a/tests/pos-special/stdlib/concurrent/duration/Duration.scala b/tests/pos-special/stdlib/concurrent/duration/Duration.scala index 2d28021581a7..e086f7e88fc1 100644 --- a/tests/pos-special/stdlib/concurrent/duration/Duration.scala +++ b/tests/pos-special/stdlib/concurrent/duration/Duration.scala @@ -15,6 +15,8 @@ package scala.concurrent.duration import java.lang.{ Double => JDouble } import scala.collection.StringParsers +import language.experimental.captureChecking + object Duration { /** diff --git a/tests/pos-special/stdlib/concurrent/duration/DurationConversions.scala b/tests/pos-special/stdlib/concurrent/duration/DurationConversions.scala index b8f29f2f5d8b..71cc164aadbd 100644 --- a/tests/pos-special/stdlib/concurrent/duration/DurationConversions.scala +++ b/tests/pos-special/stdlib/concurrent/duration/DurationConversions.scala @@ -12,6 +12,8 @@ package scala.concurrent.duration +import language.experimental.captureChecking + import DurationConversions._ // Would be nice to limit the visibility of this trait a little bit, diff --git a/tests/pos-special/stdlib/concurrent/duration/package.scala b/tests/pos-special/stdlib/concurrent/duration/package.scala index 415af1c915a2..7c60ef3953f2 100644 --- a/tests/pos-special/stdlib/concurrent/duration/package.scala +++ b/tests/pos-special/stdlib/concurrent/duration/package.scala @@ -14,6 +14,8 @@ package scala.concurrent import scala.language.implicitConversions +import language.experimental.captureChecking + package object duration { /** * This object can be used as closing token if you prefer dot-less style but do not want diff --git a/tests/pos-special/stdlib/concurrent/impl/ExecutionContextImpl.scala b/tests/pos-special/stdlib/concurrent/impl/ExecutionContextImpl.scala index d662df8927a8..0abfcf2c6701 100644 --- a/tests/pos-special/stdlib/concurrent/impl/ExecutionContextImpl.scala +++ b/tests/pos-special/stdlib/concurrent/impl/ExecutionContextImpl.scala @@ -16,9 +16,11 @@ import java.util.concurrent.{ Semaphore, ForkJoinPool, ForkJoinWorkerThread, Cal import java.util.Collection import scala.concurrent.{ BlockContext, ExecutionContext, CanAwait, ExecutionContextExecutor, ExecutionContextExecutorService } +import language.experimental.captureChecking + private[scala] class ExecutionContextImpl private[impl] (final val executor: Executor, final val reporter: Throwable => Unit) extends ExecutionContextExecutor { require(executor ne null, "Executor must not be null") - override final def execute(runnable: Runnable): Unit = executor execute runnable + override final def execute(runnable: Runnable^): Unit = executor execute runnable override final def reportFailure(t: Throwable): Unit = reporter(t) } @@ -28,7 +30,7 @@ private[concurrent] object ExecutionContextImpl { final val daemonic: Boolean, final val maxBlockers: Int, final val prefix: String, - final val uncaught: Thread.UncaughtExceptionHandler) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory { + final val uncaught: Thread.UncaughtExceptionHandler^) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory { require(prefix ne null, "DefaultThreadFactory.prefix must be non null") require(maxBlockers >= 0, "DefaultThreadFactory.maxBlockers must be greater-or-equal-to 0") @@ -51,8 +53,8 @@ private[concurrent] object ExecutionContextImpl { final override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = if ((Thread.currentThread eq this) && !isBlocked && blockerPermits.tryAcquire()) { try { - val b: ForkJoinPool.ManagedBlocker with (() => T) = - new ForkJoinPool.ManagedBlocker with (() => T) { + val b: (ForkJoinPool.ManagedBlocker with (() => T))^{thunk} = + new ForkJoinPool.ManagedBlocker with (() -> T) { private[this] final var result: T = null.asInstanceOf[T] private[this] final var done: Boolean = false final override def block(): Boolean = { @@ -78,7 +80,7 @@ private[concurrent] object ExecutionContextImpl { }) } - def createDefaultExecutorService(reporter: Throwable => Unit): ExecutionContextExecutorService = { + def createDefaultExecutorService(reporter: Throwable => Unit): ExecutionContextExecutorService^{reporter} = { def getInt(name: String, default: String) = (try System.getProperty(name, default) catch { case e: SecurityException => default }) match { @@ -108,14 +110,14 @@ private[concurrent] object ExecutionContextImpl { } } - def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextExecutor = + def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextExecutor^{reporter} = e match { case null => createDefaultExecutorService(reporter) case some => new ExecutionContextImpl(some, reporter) } def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): - ExecutionContextExecutorService = es match { + ExecutionContextExecutorService^{reporter} = es match { case null => createDefaultExecutorService(reporter) case some => new ExecutionContextImpl(some, reporter) with ExecutionContextExecutorService { diff --git a/tests/pos-special/stdlib/concurrent/impl/FutureConvertersImpl.scala b/tests/pos-special/stdlib/concurrent/impl/FutureConvertersImpl.scala index 0980ade5a5a7..4bcc7a23988f 100644 --- a/tests/pos-special/stdlib/concurrent/impl/FutureConvertersImpl.scala +++ b/tests/pos-special/stdlib/concurrent/impl/FutureConvertersImpl.scala @@ -19,8 +19,11 @@ import scala.concurrent.Future import scala.concurrent.impl.Promise.DefaultPromise import scala.util.{Failure, Success, Try} +import language.experimental.captureChecking +import scala.caps.unsafe.unsafeAssumePure + private[scala] object FutureConvertersImpl { - final class CF[T](val wrapped: Future[T]) extends CompletableFuture[T] with (Try[T] => Unit) { + final class CF[T](val wrapped: Future[T]^) extends CompletableFuture[T] with (Try[T] -> Unit) { this: CF[T]^ => override def apply(t: Try[T]): Unit = t match { case Success(v) => complete(v) case Failure(e) => completeExceptionally(e) @@ -28,31 +31,31 @@ private[scala] object FutureConvertersImpl { // Ensure that completions of this future cannot hold the Scala Future's completer hostage - override def thenApply[U](fn: JFunction[_ >: T, _ <: U]): CompletableFuture[U] = thenApplyAsync(fn) + override def thenApply[U](fn: JFunction[_ >: T, _ <: U]^): CompletableFuture[U]^{this, fn} = thenApplyAsync(fn) - override def thenAccept(fn: Consumer[_ >: T]): CompletableFuture[Void] = thenAcceptAsync(fn) + override def thenAccept(fn: Consumer[_ >: T]^): CompletableFuture[Void]^{this, fn} = thenAcceptAsync(fn) - override def thenRun(fn: Runnable): CompletableFuture[Void] = thenRunAsync(fn) + override def thenRun(fn: Runnable^): CompletableFuture[Void]^{this, fn} = thenRunAsync(fn) - override def thenCombine[U, V](cs: CompletionStage[_ <: U], fn: BiFunction[_ >: T, _ >: U, _ <: V]): CompletableFuture[V] = thenCombineAsync(cs, fn) + override def thenCombine[U, V](cs: CompletionStage[_ <: U], fn: BiFunction[_ >: T, _ >: U, _ <: V]^): CompletableFuture[V]^{this, fn} = thenCombineAsync(cs, fn) - override def thenAcceptBoth[U](cs: CompletionStage[_ <: U], fn: BiConsumer[_ >: T, _ >: U]): CompletableFuture[Void] = thenAcceptBothAsync(cs, fn) + override def thenAcceptBoth[U](cs: CompletionStage[_ <: U], fn: BiConsumer[_ >: T, _ >: U]^): CompletableFuture[Void]^{this, fn} = thenAcceptBothAsync(cs, fn) - override def runAfterBoth(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterBothAsync(cs, fn) + override def runAfterBoth(cs: CompletionStage[_], fn: Runnable^): CompletableFuture[Void]^{this, fn} = runAfterBothAsync(cs, fn) - override def applyToEither[U](cs: CompletionStage[_ <: T], fn: JFunction[_ >: T, U]): CompletableFuture[U] = applyToEitherAsync(cs, fn) + override def applyToEither[U](cs: CompletionStage[_ <: T], fn: JFunction[_ >: T, U]^): CompletableFuture[U]^{this, fn} = applyToEitherAsync(cs, fn) - override def acceptEither(cs: CompletionStage[_ <: T], fn: Consumer[_ >: T]): CompletableFuture[Void] = acceptEitherAsync(cs, fn) + override def acceptEither(cs: CompletionStage[_ <: T], fn: Consumer[_ >: T]^): CompletableFuture[Void]^{this, fn} = acceptEitherAsync(cs, fn) - override def runAfterEither(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterEitherAsync(cs, fn) + override def runAfterEither(cs: CompletionStage[_], fn: Runnable^): CompletableFuture[Void]^{this, fn} = runAfterEitherAsync(cs, fn) - override def thenCompose[U](fn: JFunction[_ >: T, _ <: CompletionStage[U]]): CompletableFuture[U] = thenComposeAsync(fn) + override def thenCompose[U](fn: JFunction[_ >: T, _ <: CompletionStage[U]]^): CompletableFuture[U]^{this, fn} = thenComposeAsync(fn) - override def whenComplete(fn: BiConsumer[_ >: T, _ >: Throwable]): CompletableFuture[T] = whenCompleteAsync(fn) + override def whenComplete(fn: BiConsumer[_ >: T, _ >: Throwable]^): CompletableFuture[T]^{this, fn} = whenCompleteAsync(fn) - override def handle[U](fn: BiFunction[_ >: T, Throwable, _ <: U]): CompletableFuture[U] = handleAsync(fn) + override def handle[U](fn: BiFunction[_ >: T, Throwable, _ <: U]^): CompletableFuture[U]^{this, fn} = handleAsync(fn) - override def exceptionally(fn: JFunction[Throwable, _ <: T]): CompletableFuture[T] = { + override def exceptionally(fn: JFunction[Throwable, _ <: T]^): CompletableFuture[T]^{this, fn} = { val cf = new CompletableFuture[T] whenCompleteAsync((t, e) => { if (e == null) cf.complete(t) @@ -63,7 +66,7 @@ private[scala] object FutureConvertersImpl { } catch { case thr: Throwable => cf.completeExceptionally(thr) - this + this.unsafeAssumePure } if (n ne this) cf.complete(n.asInstanceOf[T]) } @@ -78,7 +81,7 @@ private[scala] object FutureConvertersImpl { * WARNING: completing the result of this method will not complete the underlying * Scala Future or Promise (ie, the one that that was passed to `toJava`.) */ - override def toCompletableFuture: CompletableFuture[T] = this + override def toCompletableFuture: CompletableFuture[T]^{this} = this override def obtrudeValue(value: T): Unit = throw new UnsupportedOperationException("obtrudeValue may not be used on the result of toJava(scalaFuture)") diff --git a/tests/pos-special/stdlib/concurrent/impl/Promise.scala b/tests/pos-special/stdlib/concurrent/impl/Promise.scala index 7024344c1184..4fa17b9bace1 100644 --- a/tests/pos-special/stdlib/concurrent/impl/Promise.scala +++ b/tests/pos-special/stdlib/concurrent/impl/Promise.scala @@ -22,6 +22,8 @@ import java.util.concurrent.atomic.AtomicReference import java.util.Objects.requireNonNull import java.io.{IOException, NotSerializableException, ObjectInputStream, ObjectOutputStream} +import language.experimental.captureChecking + /** * Latch used to implement waiting on a DefaultPromise's result. * @@ -30,7 +32,7 @@ import java.io.{IOException, NotSerializableException, ObjectInputStream, Object * Expert Group and released to the public domain, as explained at * https://creativecommons.org/publicdomain/zero/1.0/ */ -private[impl] final class CompletionLatch[T] extends AbstractQueuedSynchronizer with (Try[T] => Unit) { +private[impl] final class CompletionLatch[T] extends AbstractQueuedSynchronizer with (Try[T] -> Unit) { //@volatie not needed since we use acquire/release /*@volatile*/ private[this] var _result: Try[T] = null final def result: Try[T] = _result @@ -58,11 +60,11 @@ private[concurrent] object Promise { * If when compressing a chain of Links it is discovered that the root has been completed, * the `owner`'s value is completed with that value, and the Link chain is discarded. **/ - private[concurrent] final class Link[T](to: DefaultPromise[T]) extends AtomicReference[DefaultPromise[T]](to) { + private[concurrent] final class Link[T](to: DefaultPromise[T]^) extends AtomicReference[DefaultPromise[T]^{to}](to) { /** * Compresses this chain and returns the currently known root of this chain of Links. **/ - final def promise(owner: DefaultPromise[T]): DefaultPromise[T] = { + final def promise(owner: DefaultPromise[T]^): DefaultPromise[T]^ = { val c = get() compressed(current = c, target = c, owner = owner) } @@ -70,7 +72,7 @@ private[concurrent] object Promise { /** * The combination of traversing and possibly unlinking of a given `target` DefaultPromise. **/ - @inline @tailrec private[this] final def compressed(current: DefaultPromise[T], target: DefaultPromise[T], owner: DefaultPromise[T]): DefaultPromise[T] = { + @inline @tailrec private[this] final def compressed(current: DefaultPromise[T]^, target: DefaultPromise[T]^, owner: DefaultPromise[T]^): DefaultPromise[T]^ = { val value = target.get() if (value.isInstanceOf[Callbacks[_]]) { if (compareAndSet(current, target)) target // Link @@ -101,7 +103,7 @@ private[concurrent] object Promise { } // Left non-final to enable addition of extra fields by Java/Scala converters in scala-java8-compat. - class DefaultPromise[T] private[this] (initial: AnyRef) extends AtomicReference[AnyRef](initial) with scala.concurrent.Promise[T] with scala.concurrent.Future[T] with (Try[T] => Unit) { + class DefaultPromise[T] private[this] (initial: AnyRef) extends AtomicReference[AnyRef](initial) with scala.concurrent.Promise[T] with scala.concurrent.Future[T] with (Try[T] -> Unit) { this: DefaultPromise[T]^ => /** * Constructs a new, completed, Promise. */ @@ -122,15 +124,15 @@ private[concurrent] object Promise { /** * Returns the associated `Future` with this `Promise` */ - override final def future: Future[T] = this + override final def future: Future[T]^{this} = this - override final def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = + override final def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S]^{this, f} = dispatchOrAddCallbacks(get(), new Transformation[T, S](Xform_transform, f, executor)) - override final def transformWith[S](f: Try[T] => Future[S])(implicit executor: ExecutionContext): Future[S] = + override final def transformWith[S](f: Try[T] => Future[S]^)(implicit executor: ExecutionContext): Future[S]^{this, f} = dispatchOrAddCallbacks(get(), new Transformation[T, S](Xform_transformWith, f, executor)) - override final def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = { + override final def zipWith[U, R](that: Future[U]^)(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R]^{this, that, f} = { val state = get() if (state.isInstanceOf[Try[_]]) { if (state.asInstanceOf[Try[T]].isFailure) this.asInstanceOf[Future[R]] @@ -171,43 +173,43 @@ private[concurrent] object Promise { if (!state.isInstanceOf[Failure[_]]) dispatchOrAddCallbacks(state, new Transformation[T, Unit](Xform_foreach, f, executor)) } - override final def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = { + override final def flatMap[S](f: T => Future[S]^)(implicit executor: ExecutionContext): Future[S]^{this, f} = { val state = get() if (!state.isInstanceOf[Failure[_]]) dispatchOrAddCallbacks(state, new Transformation[T, S](Xform_flatMap, f, executor)) else this.asInstanceOf[Future[S]] } - override final def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = { + override final def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S]^{this, f} = { val state = get() if (!state.isInstanceOf[Failure[_]]) dispatchOrAddCallbacks(state, new Transformation[T, S](Xform_map, f, executor)) else this.asInstanceOf[Future[S]] } - override final def filter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = { + override final def filter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T]^{this, p} = { val state = get() if (!state.isInstanceOf[Failure[_]]) dispatchOrAddCallbacks(state, new Transformation[T, T](Xform_filter, p, executor)) // Short-circuit if we get a Success else this } - override final def collect[S](pf: PartialFunction[T, S])(implicit executor: ExecutionContext): Future[S] = { + override final def collect[S](pf: PartialFunction[T, S]^)(implicit executor: ExecutionContext): Future[S]^{this, pf} = { val state = get() if (!state.isInstanceOf[Failure[_]]) dispatchOrAddCallbacks(state, new Transformation[T, S](Xform_collect, pf, executor)) // Short-circuit if we get a Success else this.asInstanceOf[Future[S]] } - override final def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = { + override final def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]^]^)(implicit executor: ExecutionContext): Future[U]^{this, pf} = { val state = get() if (!state.isInstanceOf[Success[_]]) dispatchOrAddCallbacks(state, new Transformation[T, U](Xform_recoverWith, pf, executor)) // Short-circuit if we get a Failure else this.asInstanceOf[Future[U]] } - override final def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = { + override final def recover[U >: T](pf: PartialFunction[Throwable, U]^)(implicit executor: ExecutionContext): Future[U]^{this, pf} = { val state = get() if (!state.isInstanceOf[Success[_]]) dispatchOrAddCallbacks(state, new Transformation[T, U](Xform_recover, pf, executor)) // Short-circuit if we get a Failure else this.asInstanceOf[Future[U]] } - override final def mapTo[S](implicit tag: scala.reflect.ClassTag[S]): Future[S] = + override final def mapTo[S](implicit tag: scala.reflect.ClassTag[S]): Future[S]^{this} = if (!get().isInstanceOf[Failure[_]]) super[Future].mapTo[S](tag) // Short-circuit if we get a Success else this.asInstanceOf[Future[S]] @@ -290,7 +292,7 @@ private[concurrent] object Promise { (p ne this) && p.tryComplete0(p.get(), resolved) // Use this to get tailcall optimization and avoid re-resolution } else /* if(state.isInstanceOf[Try[T]]) */ false - override final def completeWith(other: Future[T]): this.type = { + override final def completeWith(other: Future[T]^): this.type = { if (other ne this) { val state = get() if (!state.isInstanceOf[Try[_]]) { @@ -340,7 +342,7 @@ private[concurrent] object Promise { /** Link this promise to the root of another promise. */ - @tailrec private[concurrent] final def linkRootOf(target: DefaultPromise[T], link: Link[T]): Unit = + @tailrec private[concurrent] final def linkRootOf(target: DefaultPromise[T]^, link: Link[T]^): Unit = if (this ne target) { val state = get() if (state.isInstanceOf[Try[_]]) { diff --git a/tests/pos-special/stdlib/concurrent/package.scala b/tests/pos-special/stdlib/concurrent/package.scala index 652551eb988b..eed6bf36dbf0 100644 --- a/tests/pos-special/stdlib/concurrent/package.scala +++ b/tests/pos-special/stdlib/concurrent/package.scala @@ -15,6 +15,8 @@ package scala import scala.concurrent.duration.Duration import scala.annotation.implicitNotFound +import language.experimental.captureChecking + /** This package object contains primitives for concurrent and parallel programming. * * == Guide == @@ -169,7 +171,7 @@ package concurrent { */ @throws(classOf[TimeoutException]) @throws(classOf[InterruptedException]) - final def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type = awaitable match { + final def ready[T](awaitable: Awaitable[T]^, atMost: Duration): awaitable.type = awaitable match { case f: Future[T] if f.isCompleted => awaitable.ready(atMost)(AwaitPermission) case _ => blocking(awaitable.ready(atMost)(AwaitPermission)) } @@ -196,7 +198,7 @@ package concurrent { */ @throws(classOf[TimeoutException]) @throws(classOf[InterruptedException]) - final def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable match { + final def result[T](awaitable: Awaitable[T]^, atMost: Duration): T = awaitable match { case f: Future[T] if f.isCompleted => f.result(atMost)(AwaitPermission) case _ => blocking(awaitable.result(atMost)(AwaitPermission)) }