Skip to content

Commit

Permalink
Capture check the concurrent package
Browse files Browse the repository at this point in the history
  • Loading branch information
Linyxus committed Nov 21, 2023
1 parent 9fdb702 commit 98cc204
Show file tree
Hide file tree
Showing 14 changed files with 172 additions and 135 deletions.
3 changes: 3 additions & 0 deletions tests/pos-special/stdlib/concurrent/Awaitable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

package scala.concurrent

import language.experimental.captureChecking



import scala.concurrent.duration.Duration
Expand All @@ -27,6 +29,7 @@ import scala.concurrent.duration.Duration
* occurred.
*/
trait Awaitable[+T] {
this: Awaitable[T]^ =>

/**
* Await the "completed" state of this `Awaitable`.
Expand Down
37 changes: 21 additions & 16 deletions tests/pos-special/stdlib/concurrent/BatchingExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,21 @@ import java.util.concurrent.Executor
import java.util.Objects
import scala.util.control.NonFatal
import scala.annotation.{switch, tailrec}
import scala.annotation.unchecked.uncheckedCaptures

import language.experimental.captureChecking

/**
* Marker trait to indicate that a Runnable is Batchable by BatchingExecutors
*/
trait Batchable {
self: Runnable =>
self: Runnable^ =>
}

private[concurrent] object BatchingExecutorStatics {
final val emptyBatchArray: Array[Runnable] = new Array[Runnable](0)
// !cc! It is okay to elide sealed checking here,
// as `emptyBatchArray` will not be mutated.
final val emptyBatchArray: Array[(Runnable^) @uncheckedCaptures] = new Array[(Runnable^) @uncheckedCaptures](0)

// Max number of Runnables executed nested before starting to batch (to prevent stack exhaustion)
final val syncPreBatchDepth = 16
Expand Down Expand Up @@ -97,24 +102,24 @@ private[concurrent] trait BatchingExecutor extends Executor {
* In order to conserve allocations, the first element in the batch is stored "unboxed" in
* the `first` field. Subsequent Runnables are stored in the array called `other`.
*/
private[this] sealed abstract class AbstractBatch protected (protected final var first: Runnable, protected final var other: Array[Runnable], protected final var size: Int) {
private[this] sealed abstract class AbstractBatch protected (protected final var first: Runnable^, protected final var other: Array[(Runnable^) @uncheckedCaptures], protected final var size: Int) {

private[this] final def ensureCapacity(curSize: Int): Array[Runnable] = {
val curOther = this.other
private[this] final def ensureCapacity(curSize: Int): Array[(Runnable^) @uncheckedCaptures] = {
val curOther: Array[(Runnable^) @uncheckedCaptures] = this.other
val curLen = curOther.length
if (curSize <= curLen) curOther
else {
val newLen = if (curLen == 0) 4 else curLen << 1

if (newLen <= curLen) throw new StackOverflowError("Space limit of asynchronous stack reached: " + curLen)
val newOther = new Array[Runnable](newLen)
val newOther: Array[(Runnable^) @uncheckedCaptures] = new Array[(Runnable^) @uncheckedCaptures](newLen)
System.arraycopy(curOther, 0, newOther, 0, curLen)
this.other = newOther
newOther
}
}

final def push(r: Runnable): Unit = {
final def push(r: Runnable^): Unit = {
val sz = this.size
if(sz == 0)
this.first = r
Expand Down Expand Up @@ -143,10 +148,10 @@ private[concurrent] trait BatchingExecutor extends Executor {
}
}

private[this] final class AsyncBatch private(_first: Runnable, _other: Array[Runnable], _size: Int) extends AbstractBatch(_first, _other, _size) with Runnable with BlockContext with (BlockContext => Throwable) {
private[this] final var parentBlockContext: BlockContext = BatchingExecutorStatics.MissingParentBlockContext
private[this] final class AsyncBatch private(_first: Runnable^, _other: Array[(Runnable^) @uncheckedCaptures], _size: Int) extends AbstractBatch(_first, _other, _size) with Runnable with BlockContext with (BlockContext^ -> Throwable) { this: AsyncBatch^ =>
private[this] final var parentBlockContext: (BlockContext^) @uncheckedCaptures = BatchingExecutorStatics.MissingParentBlockContext

final def this(runnable: Runnable) = this(runnable, BatchingExecutorStatics.emptyBatchArray, 1)
final def this(runnable: Runnable^) = this(runnable, BatchingExecutorStatics.emptyBatchArray, 1)

override final def run(): Unit = {
_tasksLocal.set(this) // This is later cleared in `apply` or `runWithoutResubmit`
Expand All @@ -158,7 +163,7 @@ private[concurrent] trait BatchingExecutor extends Executor {
}

/* LOGIC FOR ASYNCHRONOUS BATCHES */
override final def apply(prevBlockContext: BlockContext): Throwable = try {
override final def apply(prevBlockContext: BlockContext^): Throwable = try {
parentBlockContext = prevBlockContext
runN(BatchingExecutorStatics.runLimit)
null
Expand Down Expand Up @@ -186,7 +191,7 @@ private[concurrent] trait BatchingExecutor extends Executor {
}
} else cause // TODO: consider if NonFatals should simply be `reportFailure`:ed rather than rethrown

private[this] final def cloneAndClear(): AsyncBatch = {
private[this] final def cloneAndClear(): AsyncBatch^{this} = {
val newBatch = new AsyncBatch(this.first, this.other, this.size)
this.first = null
this.other = BatchingExecutorStatics.emptyBatchArray
Expand All @@ -203,7 +208,7 @@ private[concurrent] trait BatchingExecutor extends Executor {
}
}

private[this] final class SyncBatch(runnable: Runnable) extends AbstractBatch(runnable, BatchingExecutorStatics.emptyBatchArray, 1) with Runnable {
private[this] final class SyncBatch(runnable: Runnable^) extends AbstractBatch(runnable, BatchingExecutorStatics.emptyBatchArray, 1) with Runnable {
@tailrec override final def run(): Unit = {
try runN(BatchingExecutorStatics.runLimit) catch {
case ie: InterruptedException =>
Expand All @@ -221,7 +226,7 @@ private[concurrent] trait BatchingExecutor extends Executor {
* When implementing a sync BatchingExecutor, it is RECOMMENDED
* to implement this method as `runnable.run()`
*/
protected def submitForExecution(runnable: Runnable): Unit
protected def submitForExecution(runnable: Runnable^): Unit

/** Reports that an asynchronous computation failed.
* See `ExecutionContext.reportFailure(throwable: Throwable)`
Expand All @@ -232,7 +237,7 @@ private[concurrent] trait BatchingExecutor extends Executor {
* WARNING: Never use both `submitAsyncBatched` and `submitSyncBatched` in the same
* implementation of `BatchingExecutor`
*/
protected final def submitAsyncBatched(runnable: Runnable): Unit = {
protected final def submitAsyncBatched(runnable: Runnable^): Unit = {
val b = _tasksLocal.get
if (b.isInstanceOf[AsyncBatch]) b.asInstanceOf[AsyncBatch].push(runnable)
else submitForExecution(new AsyncBatch(runnable))
Expand All @@ -242,7 +247,7 @@ private[concurrent] trait BatchingExecutor extends Executor {
* WARNING: Never use both `submitAsyncBatched` and `submitSyncBatched` in the same
* implementation of `BatchingExecutor`
*/
protected final def submitSyncBatched(runnable: Runnable): Unit = {
protected final def submitSyncBatched(runnable: Runnable^): Unit = {
Objects.requireNonNull(runnable, "runnable is null")
val tl = _tasksLocal
val b = tl.get
Expand Down
13 changes: 8 additions & 5 deletions tests/pos-special/stdlib/concurrent/BlockContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

package scala.concurrent

import language.experimental.captureChecking
import scala.annotation.unchecked.uncheckedCaptures

/**
* A context to be notified by [[scala.concurrent.blocking]] when
* a thread is about to block. In effect this trait provides
Expand Down Expand Up @@ -68,9 +71,9 @@ object BlockContext {
**/
final def defaultBlockContext: BlockContext = DefaultBlockContext

private[this] final val contextLocal = new ThreadLocal[BlockContext]()
private[this] final val contextLocal = new ThreadLocal[(BlockContext^) @uncheckedCaptures]()

private[this] final def prefer(candidate: BlockContext): BlockContext =
private[this] final def prefer(candidate: BlockContext^): BlockContext^ =
if (candidate ne null) candidate
else {
val t = Thread.currentThread
Expand All @@ -81,12 +84,12 @@ object BlockContext {
/**
* @return the `BlockContext` that would be used for the current `java.lang.Thread` at this point
**/
final def current: BlockContext = prefer(contextLocal.get)
final def current: BlockContext^ = prefer(contextLocal.get)

/**
* Installs a current `BlockContext` around executing `body`.
**/
final def withBlockContext[T](blockContext: BlockContext)(body: => T): T = {
final def withBlockContext[T](blockContext: BlockContext^)(body: => T): T = {
val old = contextLocal.get // can be null
if (old eq blockContext) body
else {
Expand All @@ -99,7 +102,7 @@ object BlockContext {
* Installs the BlockContext `blockContext` around the invocation to `f` and passes in the previously installed BlockContext to `f`.
* @return the value produced by applying `f`
**/
final def usingBlockContext[I, T](blockContext: BlockContext)(f: BlockContext => T): T = {
final def usingBlockContext[I, T](blockContext: BlockContext^)(f: BlockContext^ => T): T = {
val old = contextLocal.get // can be null
if (old eq blockContext) f(prefer(old))
else {
Expand Down
28 changes: 15 additions & 13 deletions tests/pos-special/stdlib/concurrent/ExecutionContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package scala.concurrent
import java.util.concurrent.{ ExecutorService, Executor }
import scala.annotation.implicitNotFound

import language.experimental.captureChecking

/**
* An `ExecutionContext` can execute program logic asynchronously,
* typically but not necessarily on a thread pool.
Expand All @@ -39,7 +41,7 @@ import scala.annotation.implicitNotFound
* `scala.concurrent.ExecutionContext.Implicits.global`.
* The recommended approach is to add `(implicit ec: ExecutionContext)` to methods,
* or class constructor parameters, which need an `ExecutionContext`.
*
*
* Then locally import a specific `ExecutionContext` in one place for the entire
* application or module, passing it implicitly to individual methods.
* Alternatively define a local implicit val with the required `ExecutionContext`.
Expand Down Expand Up @@ -69,13 +71,13 @@ consider using Scala's global ExecutionContext by defining
the following:
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global""")
trait ExecutionContext {
trait ExecutionContext { this: ExecutionContext^ =>

/** Runs a block of code on this execution context.
*
* @param runnable the task to execute
*/
def execute(runnable: Runnable): Unit
def execute(runnable: Runnable^): Unit

/** Reports that an asynchronous computation failed.
*
Expand All @@ -101,7 +103,7 @@ trait ExecutionContext {
*/
@deprecated("preparation of ExecutionContexts will be removed", "2.12.0")
// This cannot be removed until there is a suitable replacement
def prepare(): ExecutionContext = this
def prepare(): ExecutionContext^{this} = this
}

/**
Expand Down Expand Up @@ -212,25 +214,25 @@ object ExecutionContext {
*
* Do *not* call any blocking code in the `Runnable`s submitted to this `ExecutionContext`
* as it will prevent progress by other enqueued `Runnable`s and the calling `Thread`.
*
*
* Symptoms of misuse of this `ExecutionContext` include, but are not limited to, deadlocks
* and severe performance problems.
*
* Any `NonFatal` or `InterruptedException`s will be reported to the `defaultReporter`.
*/
object parasitic extends ExecutionContextExecutor with BatchingExecutor {
override final def submitForExecution(runnable: Runnable): Unit = runnable.run()
override final def execute(runnable: Runnable): Unit = submitSyncBatched(runnable)
override final def submitForExecution(runnable: Runnable^): Unit = runnable.run()
override final def execute(runnable: Runnable^): Unit = submitSyncBatched(runnable)
override final def reportFailure(t: Throwable): Unit = defaultReporter(t)
}

/**
* See [[ExecutionContext.global]].
*/
private[scala] lazy val opportunistic: ExecutionContextExecutor = new ExecutionContextExecutor with BatchingExecutor {
final override def submitForExecution(runnable: Runnable): Unit = global.execute(runnable)
final override def submitForExecution(runnable: Runnable^): Unit = global.execute(runnable)

final override def execute(runnable: Runnable): Unit =
final override def execute(runnable: Runnable^): Unit =
if ((!runnable.isInstanceOf[impl.Promise.Transformation[_,_]] || runnable.asInstanceOf[impl.Promise.Transformation[_,_]].benefitsFromBatching) && runnable.isInstanceOf[Batchable])
submitAsyncBatched(runnable)
else
Expand All @@ -253,7 +255,7 @@ object ExecutionContext {
* @param reporter a function for error reporting
* @return the `ExecutionContext` using the given `ExecutorService`
*/
def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService =
def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService^{reporter} =
impl.ExecutionContextImpl.fromExecutorService(e, reporter)

/** Creates an `ExecutionContext` from the given `ExecutorService` with the [[scala.concurrent.ExecutionContext$.defaultReporter default reporter]].
Expand All @@ -269,23 +271,23 @@ object ExecutionContext {
* @param e the `ExecutorService` to use. If `null`, a new `ExecutorService` is created with [[scala.concurrent.ExecutionContext$.global default configuration]].
* @return the `ExecutionContext` using the given `ExecutorService`
*/
def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService = fromExecutorService(e, defaultReporter)
def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService^ = fromExecutorService(e, defaultReporter)

/** Creates an `ExecutionContext` from the given `Executor`.
*
* @param e the `Executor` to use. If `null`, a new `Executor` is created with [[scala.concurrent.ExecutionContext$.global default configuration]].
* @param reporter a function for error reporting
* @return the `ExecutionContext` using the given `Executor`
*/
def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor =
def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor^{reporter} =
impl.ExecutionContextImpl.fromExecutor(e, reporter)

/** Creates an `ExecutionContext` from the given `Executor` with the [[scala.concurrent.ExecutionContext$.defaultReporter default reporter]].
*
* @param e the `Executor` to use. If `null`, a new `Executor` is created with [[scala.concurrent.ExecutionContext$.global default configuration]].
* @return the `ExecutionContext` using the given `Executor`
*/
def fromExecutor(e: Executor): ExecutionContextExecutor = fromExecutor(e, defaultReporter)
def fromExecutor(e: Executor): ExecutionContextExecutor^ = fromExecutor(e, defaultReporter)

/** The default reporter simply prints the stack trace of the `Throwable` to [[java.lang.System#err System.err]].
*
Expand Down
Loading

0 comments on commit 98cc204

Please sign in to comment.