diff --git a/std/async/async.kk b/std/async/async.kk index 9325843..899e30a 100644 --- a/std/async/async.kk +++ b/std/async/async.kk @@ -31,19 +31,239 @@ import std/num/ddouble // for C# backend import std/time/duration import std/async/null import std/core/unsafe +import std/core/undiv pub import std/num/ddouble pub import std/num/float64 import std/time/timestamp +import std/core-extras - // js is just using primitives +/* +# Cancellation scopes -// A type alias for asynchronous operations that can raise exceptions non-deterministically. -// This is common for almost all `:async` operations since `cancel` and `timeout` can -// cancel operations non-deterministically which raises the `Cancel` exception and cancels -// outstanding asynchronous requests. -pub alias asyncx = +Each scope has a unique ID (int). + +When we await any async action, we associate it with its full scope chain. If +scope a spawns scope b which spawns scope c which spawns an action, that +action's scope is Scope([c, b, a]) - a scope is just a list of scope IDs, +childmost first. + +When we decide to cancel a scope, we cancel it by ID, e.g. `b`. This will +cancel the following scopes: + +`[b, _]` -> clearly in b `[_, b, _])` -> a child scope of b + +It will not cancel something in `[a, d]` (i.e. within `a` but not `b`) + +# Cancelation: up and down + +When cancelation is triggered, it is generally triggered from _within_ the +scope to be canceled, and affects all transitive child scopes. There are many +reasons for cancelation: + - returning normally, cancelation prevents dangling callbacks + - returning abnormally (e.g. exn), we cancel any pending callbacks + - within a `firstof`, we have decided to cancel all remaining strands after + the first is complete + +After cancelation is triggered, the canceler's execution continues as normal - +calling `cancel-scope` simply returns `()`. + +Triggering affects actions within the scope - so when observing cancelation, it +being triggered from somewhere "up" the stack. It's like a reverse of an +exception (which is coming from "down" the stack and observed "up" the stack). + +The way that cancelation is observed is that all outstanding callbacks beneath +the canceled scope receive a `Cancel` instead of a `Result`. Aside from +low-level code, you won't observe this - the higher level utilities for +awaiting a result will automatically invoke `discontinue()` on the execution +which is awaiting the result. + +It's not intended to be possible to "catch" a `discontinue` - cancelation can +be deferred to protect some critical section (using `defer-cancelation`), but +that only means cancelation will take effect at the end of that critical +section - it cannot be stopped. + +(Internally `interleave` does "catch" cancelation, but it's a special case and +is careful to maintain these semantics) + +# Strands (interleave) + +Multiple strands can be cooperatively executed, by running each in a custom +`async` handler. Instead of awaiting a result, the handler returns immediately +and the callback triggers an enqueue onto an internal channel. + +Continuations are consumed from this channel until all strands have completed. + +When one strand returns abnormally (e.g. exception), the entire scope is +canceled. This will cause any pending callbacks to be resolved with a Cancel +result. + +# Interleave and cancelation + +In order to control cancelation explicitly, `interleave-strands` creates a root +scope instead of a child scope to spawn operations in. No actions spawned +within this scope will be seen as children of the parent scope, preventing +automatic cancelation when a parent scope is canceled. + +Instead, `interleave-strands` _observes_ when the parent scope is canceled, and +triggers cancelation on the child strands. Crucially this doesn't cancel the +interleaving bookkeeping itself, which ensures that: + - `interleave-raw` always observes the final states of all strands, even if + all of them are `Cancel` + - We drive each strand to completion, including finalizers. If we simply + abandoned the bookeeping, we'd potentially leave finalize actions sitting in + an unconsumed channel instead of running them. + +*/ + +// Asynchronous operations have the `:async` effect. +// **Note**: technically, async is capable of embedding arbitrary `io-noexn` +// actions, as this is typically required to interface with the event loop. +// +// Code which makes use of this low-level capability should take care to +// represent the appropriate semantic effects in addition to `async`. +pub effect async + val async-scope: scope + + // setup a callback and await its completion + ctl do-await( setup : await-setup, scope : scope) : await-result + + // setup a callback to execute `f` on completion, returning control to the caller immediately + // after the setup is performed + ctl no-await( setup : await-setup, scope : scope, f : await-result -> io-noexn () ) : () + + // Internal: evaluate io in async + ctl async-iox( action : () -> io-noexn a ) : a + + // trigger cancelation of a specific scope (and all its children) + ctl cancel-scope( scope : scope-id ) : () + + // React to cancelation of this expression + final ctl discontinue(): a + +// A type alias for asynchronous operations which are both `ndet` and `div` (may never return). +// This is common for almost all `:async` operations due to concurrency and cancelation. +// TODO: remove st from this alias +pub alias asyncx = > + +pub alias async-exn = + +pub fun on-cancel(fin: () -> (), action: () -> a): a + handle({ mask behind(action) }) + final ctl discontinue() + fin() + discontinue() + + // passthrough others + val async-scope = async-scope + fun do-await(setup, scope) do-await(setup,scope) + fun no-await(setup, scope, f) no-await(setup,scope, f) + fun async-iox(x) async-iox(x) + fun cancel-scope(scope) cancel-scope(scope) + +// ---------------------------------------------------------------------------- +// Async effect +// ---------------------------------------------------------------------------- +pub type await-result + Cancel + Result(value: a) + +fun await-result/show(r: await-result) + match r + Cancel -> "Cancel" + Result(_) -> "Result(_)" + +type strand-result + Finalize(y: yield-info) + Complete(value: a) + +fun strand-result/show(s: strand-result) + match s + Finalize(_) -> "Finalize(...)" + Complete(_) -> "Complete(_)" + +fun continue-await(v: await-result): asyncx a + match v + Result(value) -> value + Cancel -> discontinue() + +fun continue-strand(v: strand-result): total a + match v + Complete(value) -> value + Finalize(yield-info) -> unsafe-reyield(yield-info) + +fun finalize-strand(v: strand-result): total () + match v + Complete(_) -> () + Finalize(yield-info) -> unsafe-reyield(yield-info) + +alias dispose-fn = () -> io-noexn () + +alias maybe-dispose-fn = (maybe) + +alias await-setup = (cb : (await-result, bool) -> io-noexn ()) -> io-noexn maybe-dispose-fn + +// Prevents cancelation, but invokes a hook immediately for notification +fun unsafe-override-cancel(on-cancel-hook: () -> (), action: () -> a): a + val dangling-cb: ref = ref(Nothing) + with finally { + // ensure dangling-cb gets invoked so we don't leak uninvoked and uncancelled callbacks + match !dangling-cb + Just(cb) -> async-iox { cb(Result(()), True) } + Nothing -> () + } + // within the outer scope, spawn a canary action + // to capture cancelation triggered from above. + val setup-cb = fn(cb) + dangling-cb.set(Just(cb)) + Just(on-cancel-hook) // cleanup fn + + no-await(setup-cb, async-scope, fn(_) ()) + + // create a new scope root, so that any callbacks registered + // underneath this scope will not be seen as children of + // any scopes above us (and therefore not canceled if an outer + // scope is canceled) + val cid = async-iox { unique() } + val new-scope = empty-scope.child(cid) + + with finally { + cancel-scope(cid) // don't leave anything dangling in case of exn etc + } + + with handler + val async-scope = new-scope + fun do-await(setup,scope) do-await(setup,scope) + fun no-await(setup,scope,f) no-await(setup,scope,f) + + // We notice cancelation from a parent via the canary. We also + // need to modify the `cancel-scope` hook so that we notice (and defer) + // cancelation triggered from within this scope, but only if it's + // targeted at exactly this scope. + fun cancel-scope(scope-id) + if scope-id == cid then + async-iox(on-cancel-hook) + else + // pass it along, it's presumably a child scope + // nested under this uncancelable one + cancel-scope(scope-id) + + fun async-iox(f) async-iox(f) + fun discontinue() discontinue() + + with mask behind + action() + +// defer cancelation of the child-scoped `action` until the end of `action`. Note that +// cancelation cannot be ignored, only deferred. +pub fun defer-cancelation(action: () -> a): a + val canceled: ref = ref(False) + fun on-cancel() + canceled := True + val result = unsafe-override-cancel(on-cancel, action) + if !canceled then + discontinue() + result -pub alias async-exn = // ---------------------------------------------------------------------------- // Promises @@ -56,7 +276,6 @@ pub alias async-exn = abstract struct promise state : ref> - abstract value type promise-state Resolved( value : a ) Awaiting( listeners : list io ()> ) @@ -68,11 +287,11 @@ pub fun promise() : async promise // Await a promise; returns immediately if the promise was already resolved and otherwise // waits asynchronously. pub fun promise/await( p : promise ) : asyncx a - fun setup(cb : _ -> io-noexn ()) + fun setup(cb : _ -> io-noexn ()): io-noexn () val r = p.state match (!r) Awaiting(listeners) -> r := Awaiting(Cons(cb,listeners)) - Resolved(value) -> io-noexn1(cb,value) // resume right away; should not happen due to try-await + Resolved(value) -> cb(value) // resume right away; should not happen due to try-await match p.try-await Just(v) -> v Nothing -> await1(setup) @@ -86,7 +305,7 @@ pub fun try-await( p : promise ) : maybe _ -> Nothing // Resolve a promise to `value`. Raises an exception if the promise was already resolved. -pub fun resolve( p : promise, value : a ) : asyncx () +pub fun resolve( p : promise, value : a ) : () async-io val r = p.state match !r @@ -96,6 +315,10 @@ pub fun resolve( p : promise, value : a ) : asyncx () cbx(value) // set-immediate1( cbx, value ) _ -> throw("Promise was already resolved") +// Perform an I/O operation at the outer level; exceptions are propagated back. +fun async-io( f : () -> io a ) : a + async-io-noexn( { try(f) } ).untry + // ---------------------------------------------------------------------------- // Channels // ---------------------------------------------------------------------------- @@ -130,12 +353,9 @@ pub fun channel() : async channel // Receive (and remove) a value from the channel: returns immediately if a value is available and otherwise // waits asynchronously until a value becomes available. -pub fun receive( ch : channel ) : asyncx a - ch.receivex.untry - -fun receivex( ch : channel, cancelable : bool = True ) : error - fun setup( cb : (_,_) -> io-noexn () ) - fun cbr(x) cb(Ok(x),True) +fun receive( ch : channel) : asyncx a + val setup: await-setup<_> = fn(cb : (_,_) -> io-noexn () ) + fun cbr(x) cb(Result(x),True) val r = ch.state match !r Empty -> r := Waiting(cbr,[]) @@ -146,8 +366,8 @@ fun receivex( ch : channel, cancelable : bool = True ) : error Ok(v) - Nothing -> do-await(setup,empty-scope,cancelable) + Just(v) -> v + Nothing -> do-await(setup,async-scope).continue-await // Return immediately if a value is available on the channel and otherwise returns `Nothing`. pub fun try-receive( ch : channel ) : maybe @@ -169,8 +389,8 @@ fun emit-io( ch : channel, value : a ) : io-noexn () l(value) // Emit a value asynchronously into a channel. -pub fun emit( ch : channel, value : a ) : asyncx () - async-io +pub fun emit( ch : channel, value : a ) : () + async-io-noexn emit-io(ch,value) fun trace-channel( msg : string, ch : channel ) : () @@ -196,21 +416,27 @@ fun trace-anyx( s : string, x : a ) : async () // `cancel` it (and return `Nothing`). Due to the generality of `cancel`, this `timeout` // abstraction can reliably time out over any composition of asynchronous operations // and is therefore quite expressive. - -pub fun timeout( secs : duration, action : () -> a, ?set-timeout: (unit-cb, int32) -> io-noexn any, ?clear-timeout: (any) -> io-noexn () ) : maybe +pub fun timeout( secs : duration, action : () -> a, ?set-timeout: (unit-cb, int32) -> io-noexn any, ?clear-timeout: (any) -> io-noexn () ) : maybe firstof { duration/wait(secs); Nothing} { Just(action()) } // Execute `a` and `b` interleaved. As soon as one of them finishes, // `cancel` the other one and return the result of the first. -pub fun firstof( a : () -> a, b : () -> a ) : a - cancelable - val (ra,rb) = interleavedx { val x = mask behind{ a() }; cancel(); x } - { val x = mask behind{ b() }; cancel(); x } - match ra - Error(exn) | exn.is-cancel -> rb.untry - _ -> ra.untry - - +pub fun firstof( a : () -> a, b : () -> a) : a + fun runner(f) + fn(scope) + with finally { scope.cancel() } // cancel regardless of exn or result + mask { f() } + + // note: when we cancel `scope`, interleaved-raw returns Cancel items, + // rather than actually canceling the entire call + val results = interleaved-raw([runner(a), runner(b)]) + val first = results.find-maybe fn(result) + match result + Result(x) -> Just(x) + Cancel -> Nothing + match first + Just(x) -> x + Nothing -> discontinue() // Wait (asynchronously) for `secs` seconds as a `:double`. // Use `yield()` to yield to other asynchronous operations. @@ -223,7 +449,7 @@ pub fun duration/wait( secs : duration = zero, ?set-timeout: (unit-cb, int32) -> if secs <= duration/zero then return yield() val msecs = max(zero:int32,secs.milli-seconds.int32) await fn(cb) - val tid = async/set-timeout( fn(){ cb(Ok(())) }, msecs ) + val tid = async/set-timeout( fn(){ cb(()) }, msecs ) Just( { async/clear-timeout(tid) } ) // Yield to other asynchronous operations. Same as `wait(0)`. @@ -251,58 +477,50 @@ fun async/clear-timeout( tid : timeout-id , ?clear-timeout: (any) -> io-noexn () // ---------------------------------------------------------------------------- // Interleave two actions around their asynchronous operations. -pub fun two/interleaved( action1 : () -> a, action2 : () -> b ) : (a,b) - val (ra,rb) = interleavedx( {mask behind(action1)}, {mask behind(action2)} ) - [ra.maybe-exn,rb.maybe-exn].ordered_throw - (ra.untry,rb.untry) - -// Interleave a list of actions around their asynchronous operations. -pub fun list/interleaved( xs : list<() -> a> ) : list - val ress = xs.map( fn(f) { return { mask behind(f) } } ).interleavedx - //ress.map(maybe).ordered_throw - ress.map(untry) - -fun maybe-exn( err : error ) : maybe - match err - Error(exn) -> Just(exn) - _ -> Nothing - -fun ordered_throw( xs : list> ) : exn () - var mexn := Nothing - xs.foreach fn(x) - match x - Nothing -> () - Just(exn) -> match mexn - Nothing -> mexn := x - Just(exnx) -> - if ((exn.is-finalize && !exnx.is-finalize) || (exnx.is-cancel && !exn.is-cancel)) - then mexn := x - match mexn - Just(exn) -> rethrow(exn) - Nothing -> () - -// Interleave two actions around their asynchronous operations and explicitly returning either -// their result or their exception. -pub fun interleavedx( action1 : () -> a, action2 : () -> b ) : (error,error) - fun act1() Left(action1()) - fun act2() Right(action2()) - match interleavedx([act1,act2]) - Cons(x,Cons(y)) -> (x.unleft,y.unright) - _ -> - // this will never happen.. - val exn = Exception("invalid interleaved result",ExnInternal("std/async/interleavedx(action1,action2)")) - (Error(exn),Error(exn)) - -fun unleft( x : error> ) : error - match x - Ok(Left(l)) -> Ok(l) - Error(exn) -> Error(exn) - _ -> Error(Exception("invalid left interleaved result",ExnInternal("std/async/interleavedx(action1,action2)"))) -fun unright( x : error> ) : error - match x - Ok(Right(r)) -> Ok(r) - Error(exn) -> Error(exn) - _ -> Error(Exception("invalid right interleaved result",ExnInternal("std/async/interleavedx(action1,action2)"))) +pub fun two/interleaved( action1 : () -> a, action2 : () -> b ) : (a,b) + fun act1() Left(action1()) + fun act2() Right(action2()) + match interleaved([act1, act2]) + [Left(l), Right(r)] -> (l, r) + other -> impossible("two/interleaved returned " ++ other.length.show ++ " values") + +value struct interleaving + incomplete: int + results: list<(int, strand-result>)> + +fun interleaving/show(i: interleaving<_>) + "Interleaving(incomplete=" ++ i.incomplete.show ++ ", " ++ i.results.show ++ ")" + +pub fun list/interleaved(xs : list<() -> a> ) : list + fun runner(f) + fn(_) + f() + xs.map(runner).interleaved-raw.map fn(result) + result.continue-await + +fun list/interleaved-raw(xs : list<(scope) -> a> ) : list> + var state : some interleaving := Interleaving(xs.length, []) + with handler> + finally() + state.results.foreach fn(pair) + pair.snd.finalize-strand + + return(_) + // Treat finalized as cancels, since we're already returning await-result. + state.results.map fn(pair) + match pair.snd + Finalize(_) -> Cancel + Complete(x) -> x + + fun strands-are-busy() + state.incomplete > 0 + + fun strand-done(idx,res) + state := Interleaving(state.incomplete - 1, state.results.insert(idx,res)) + + // with unsafe-pretend-stateless + with mask + interleave-strands(xs) // Private effect to keep track of when a strand in an interleaving is done. // Preferred over using built-in state as this works well if there is an outer handler @@ -311,8 +529,9 @@ fun unright( x : error> ) : error effect strands // Are there still strands that need to be resumed? fun strands-are-busy() : bool + // Call this when a strand is done. - fun strand-done(idx : int, result : error) : () + fun strand-done(idx : int, result : strand-result>) : () // Insert in order with an accumulating list. fun insert-acc( xs : list<(int,a)>, idx : int, value : a, acc : list<(int,a)> ) : list<(int,a)> @@ -328,96 +547,119 @@ fun insert( xs : list<(int,a)>, idx : int, value : a, n : int = 0 ) : list<(int Cons(x,xx) | x.fst < idx -> Cons(x, insert(xx, idx, value, n + 1)) _ -> Cons((idx,value),xs) - -// Interleave a list actions around their asynchronous operations and explicitly returning either -// either their result or their exception. -pub fun list/interleavedx( xs : list<() -> a> ) : list> - val n = xs.length - if n==0 then [] - elif n==1 then xs.map(unsafe-try-all) - else interleavedn(n,xs) - -fun interleavedn( n : int, xs : list<() -> a> ) : list> - unsafe-no-ndet-div - var cr : some (int,list<(int,error)>) := (n,[]) - with handler - return(x) - cr.snd.map( snd ) - fun strands-are-busy() - cr.fst > 0 - fun strand-done(idx,res) - cr := (cr.fst - 1, cr.snd.insert(idx,res)) - mask - interleaved-div(xs) - - inline extern unsafe-no-ndet-div-cast : forall (() -> a) -> (() -> e a) inline "#1" fun unsafe-no-ndet-div( action : () -> a ) : e a unsafe-no-ndet-div-cast(action)() -inline extern inject-effects : forall (() -> e a) -> total (() -> ,ndet,div|e> a) +inline extern inject-effects : forall (() -> e a) -> total (() -> |e> a) inline "#1" -fun error/is-finalize( t : error ) : bool - match t - Error(exn) -> exn.is-finalize - _ -> False - -fun error/is-cancel( t : error ) : bool - match t - Error(exn) -> exn.is-cancel - _ -> False - -fun interleaved-div( xs : list<() -> a> ) : |e> () - val strands = xs.map-indexed fn(i,action) - return fn() - val res = unsafe-try-all(inject-effects(action)) - strand-done(i,res) - if res.is-finalize then cancel() // cancel others if finalization happens +// Warning: Cancelation should not be caught unless you're carefully maintaining +// cancelation semantics yourself. +pub fun unsafe-catch-cancel(a: () -> a): await-result + handle({ mask behind(a) }) + final ctl discontinue() + Cancel + return(x) + Result(x) + + // passthrough others + val async-scope = async-scope + fun do-await(setup, scope) do-await(setup,scope) + fun no-await(setup, scope, f) no-await(setup,scope, f) + fun async-iox(x) async-iox(x) + fun cancel-scope(scope) cancel-scope(scope) + +// fun unsafe-pretend-stateless(a: () -> |e> a): e a +// unsafe-total(a) + +// fun mask-global-state(a: () -> e a): |e> a +// with mask> +// with mask> +// with mask> +// a() + +// TODO is there a better way to inform the caller of scope than having every item in the list accept it? +fun interleave-strands(xs : list<(scope) -> a> ) : |e> () + val inner-scope: ref = ref(Nothing) + val keep-spawning: ref = ref(True) val ch : some channel<() -> a> = channel() - val handle-strand = handler - raw ctl do-await(setup,scope,c) - no-await( setup, scope, c) fn(res) - // emit a resumption of this strand into the channel - ch.emit-io( /* no-cps */ { rcontext.resume(res) } ) - () // stop the strand at this point - // redirect all other operations - fun no-await(setup,scope,c,f) - no-await(setup,scope,c,f) - fun async-iox(f) - async-iox(f) - fun cancel(scope) - cancel(scope) - - strands.foreach fn(strand) - handle-strand{ mask behind(strand) } - + + // interleave has custom cancelation handling. When the scope + // is canceled, we don't discontinue() but instead + // perform explicit cancelation of the child scope, and + // let that flow through into registering all outstanding scopes + // with a `Cancel` result. + with unsafe-override-cancel { + // Note we can't cancel() directly in this hook because cancel is async, + // but we can add it to the queue of actions to run + keep-spawning := False + ch.emit-io + (!inner-scope).foreach(cancel-scope) + } + + val sentinel-scope = async-scope + + child-scope-without-auto-cancel fn(cid) // introduce a child scope which doesn't cancel-on-finalize + inner-scope.set(Just(cid)) + val yield-on-await = fn(action) + (handler { + raw ctl do-await(setup,scope) + // turn awaits into no-awaits, allowing other strands to execute + val unique-id = unsafe-total(unique) + no-await(setup, scope) fn(res) + ch.emit-io({ rcontext.resume(res) }) + () + + // passthrough others + val async-scope = async-scope + fun no-await(setup,scope,f) no-await(setup,scope,f) + fun async-iox(f) async-iox(f) + fun cancel-scope(scope) cancel-scope(scope) + fun discontinue() discontinue() + }) { + with mask behind + action() + } + + xs.foreach-indexed fn(i, action) + if !(!keep-spawning) then + // Finalization has occurred synchronously, don't + // keep spawning actions. + strand-done(i, Complete(Cancel)) + else + with yield-on-await + val res = strand-exec(inject-effects { action(sentinel-scope) }) + strand-done(i, res) + if res.is-finalize then + keep-spawning := False + cancel-scope(cid) + () + + // this runs outside of the `child-scope` (unaffected by `cancel-scope(cid)`) while { strands-are-busy() } // while there are resumptions on the strands.. - // the only way receive can throw is through a cancelation -- but in that case - // we should not cancel but instead await all canceled strands; so keep listening on the channel. - match(ch.receivex(False)) - Error(_exn) -> () // ignore cancelation on receive - Ok(strand-resume) -> strand-resume() + val strand-resume = ch.receive + strand-resume() () - - // ---------------------------------------------------------------------------- // Await wrappers // ---------------------------------------------------------------------------- // Convenience function for awaiting a NodeJS style callback where the first argument is a possible exception. -pub fun await-exn0( setup : (cb : (null) -> io-noexn () ) -> io maybe<() -> io-noexn ()> ) : asyncx () - await fn(cb) - setup( fn(nexn) cb(nexn.unnull(())) ) +pub fun await-exn0( setup : (cb : (null) -> io-noexn () ) -> io-noexn maybe<() -> io-noexn ()> ) : async-exn () + setup/await fn(cb) + setup( fn(nexn) cb(Result(nexn.unnull(()))) ) + .continue-await.untry // Convenience function for awaiting a NodeJS style callback where the first argument is a possible exception // and the second argument the possible result value. -pub fun await-exn1( setup : (cb : (null,a) -> io-noexn () ) -> io maybe<() -> io-noexn ()> ) : asyncx a - await fn(cb) - setup( fn(nexn,x) cb(nexn.unnull(x)) ) +pub fun await-exn1( setup : (cb : (null,a) -> io-noexn () ) -> io-noexn maybe<() -> io-noexn ()> ) : async-exn a + setup/await fn(cb) + setup( fn(nexn,x) cb(Result(nexn.unnull(x))) ) + .continue-await.untry fun unnull( nexn : null, x : a ) : error match nexn.maybe @@ -425,200 +667,163 @@ fun unnull( nexn : null, x : a ) : error Just(exn) -> Error(exn) // Convenience function for awaiting a zero argument callback. -pub fun await0( setup : (cb : () -> io-noexn () ) -> io () ) : asyncx () - await fn(cb) - setup( fn() cb(Ok(())) ) - Nothing - -// Convenience function for awaiting a single argument callback. -pub fun await1( setup : (cb : (a) -> io-noexn () ) -> io () ) : asyncx a - await fn(cb) - setup( fn(x) cb(Ok(x)) ) - Nothing - -// Execute `setup` to set up an asynchronous callback with the host platform. Invoke `cb` as the callback: -// it takes either an exception or a result `a`. Usually `setup` returns `Nothing` but you can return a `Just(cleanup)` -// value where the `cleanup` functions is invoked on cancellation to dispose of any resources (see the implementation of `wait`). -// The callback should be invoked exactly once -- when that happens `await` is resumed with the result using `untry` -// either raise an exception or return the plain result. -pub fun setup/await( setup : (cb : error -> io-noexn () ) -> io maybe<() -> io-noexn ()> ) : asyncx a - await-exn(setup).untry - - - - -// ---------------------------------------------------------------------------- -// Async effect -// ---------------------------------------------------------------------------- -alias await-result = error -alias await-setup = (cb : (error,bool) -> io-noexn ()) -> io-noexn (maybe<() -> io-noexn ()>) - -// Asynchronous operations have the `:async` effect. -pub effect async - ctl do-await( setup : await-setup, scope : scope, cancelable : bool ) : error - ctl no-await( setup : await-setup, scope : scope, cancelable : bool, f : error -> io-noexn () ) : () - ctl async-iox( action : () -> io-noexn a ) : a - ctl cancel( scope : scope ) : () +pub fun await0( setup : (cb : () -> io-noexn () ) -> io-noexn () ) : asyncx () + setup/await fn(cb) + setup( fn() cb(Result(())) ) + Nothing + .continue-await +// // Convenience function for awaiting a single argument callback. +pub fun await1( setup : (cb : (a) -> io-noexn () ) -> io-noexn () ) : asyncx a + setup/await fn(cb) + setup( fn(x) cb(Result(x)) ) + Nothing + .continue-await -// The `cancel` operations cancels any outstanding asynchronous operation under the innermost -// `cancelable` handler by returning the `Cancel` exception. The `cancel` operation itself returns normally -// without raising a `Cancel` exception. -pub fun noscope/cancel() : async () - cancel(empty-scope) // Primitive: Execute `setup` to set up an asynchronous callback with the host platform. Invoke `cb` as the callback: // it takes either an exception or a result `a`. Usually `setup` returns `Nothing` but you can return a `Just(cleanup)` // value where the `cleanup` functions is invoked on cancellation to dispose of any resources (see the implementation of `wait`). // The callback should be invoked exactly once -- when that happens `await-exn` is resumed with the result. -pub fun await-exn( setup : (cb : (error) -> io-noexn ()) -> io (maybe<() -> io-noexn ()>) ) : async error - do-await(fn(cb) - match (try{ setup(fn(res) cb(res,True) ) }) - Ok(mcleanup) -> mcleanup - Error(exn) -> - cb(Error(exn),True) - Nothing - , empty-scope, True) - -// Primitive: Execute `setup` to set up an asynchronous callback with the host platform. Invoke `cb` as the callback: it takes either -// an exception or a result value, together with boolean parameter whether the callback is done. -// The callback `cb` will eventually emit the result into the given channel `ch` after applying the transformation `f` to the result.\ -// Note: once you exit the `cancelable` scope where `await-to-channel` was called, the callback is invoked with a `Cancel` exception. -// The channel should always be awaited within the same `cancelable` scope as the `await-to-channel` invokation. -pub fun await-to-channel( setup : (cb : (error,bool) -> io-noexn ()) -> io (maybe<() -> io-noexn ()>), ch : channel, f : error -> b ) : async channel - no-await(fn(cb) - match(try{setup(cb)}) - Ok(mcleanup) -> mcleanup - Error(exn) -> - cb(Error(exn),True) - Nothing - , empty-scope,True, fn(res) - ch.emit-io( f(res) ) - ) - ch +pub fun setup/await( setup : (cb : (a) -> io-noexn ()) -> io-noexn (maybe<() -> io-noexn ()>) ) : asyncx a + val result = do-await(fn(cb) { setup(fn(res) cb(Result(res),True)) }, async-scope) + result.continue-await + +// // Primitive: Execute `setup` to set up an asynchronous callback with the host platform. Invoke `cb` as the callback: it takes either +// // an exception or a result value, together with boolean parameter whether the callback is done. +// // The callback `cb` will eventually emit the result into the given channel `ch` after applying the transformation `f` to the result.\ +// // Note: once you exit the `child-scope` scope where `await-to-channel` was called, the callback is invoked with a `Cancel` exception. +// // The channel should always be awaited within the same `child-scope` scope as the `await-to-channel` invokation. +// pub fun await-to-channel( setup : (cb : (a,bool) -> io-noexn ()) -> io-noexn (maybe<() -> io-noexn ()>), ch : channel, f : a -> b) : async channel +// fun wrapped-setup(cb: (await-result<_>,bool) -> io-noexn ()) +// setup fn (value, b) +// cb(Result(value), b) + +// no-await(wrapped-setup, async-scope) fn(res) +// ch.emit-io(f(res.continue-await)) +// ch fun async-io-noexn( f : () -> io-noexn a ) : a async-iox(f) -// Perform an I/O operation at the outer level; exceptions are propagated back. -fun async-io( f : () -> io a ) : asyncx a - async-io-noexn( { try(f) } ).untry - +// // ---------------------------------------------------------------------------- +// // Async handlers: child-scope +// // ---------------------------------------------------------------------------- -// ---------------------------------------------------------------------------- -// Async handlers: cancelable -// ---------------------------------------------------------------------------- - -inline extern interject-async( action : () -> a) : total ( () -> a) - inline "#1" +// Execute `action` in a child-scope scope. Once `action` is complete +// (successfully or otherwise), all outstanding actions +// within this scope are canceled. +pub fun child-scope( action : () -> a ) : a + child-scope-without-auto-cancel fn(cid) + with finally + // cancel any outstanding operations still in our scope. + // this might be needed for `no-await` operations. + cancel-scope(cid) + action() -// Execute `action` in a cancelable scope. If `cancel` is called within `action`, -// any outstanding asynchronous operations started in the cancelable scope are canceled. -// (Outstanding operations outside the cancelable scope are not canceled). -pub fun cancelable( action : () -> a ) : a +// like child-scope, but the caller is responsible +// for canceling the created scope +fun child-scope-without-auto-cancel(action : (int) -> a ) : a val cid = async-iox{ unique() } - fun extend(scope : scope ) - parent-scope(cid,scope) + handle ({mask behind { action(cid) }}) + val async-scope = async-scope.child(cid) + fun do-await(setup,scope) do-await(setup,scope) + fun no-await(setup,scope,f) no-await(setup,scope,f) + fun cancel-scope(scope) cancel-scope(scope) + fun async-iox(f) async-iox(f) + fun discontinue() discontinue() - handle ({mask behind(action)}) - return(x) -> - // cancel any outstanding operations still in our scope. - // this might be needed for `no-await` operations. - cancel(empty-scope.extend) - x - fun do-await(setup,scope,c) -> do-await(setup,scope.extend,c) - fun no-await(setup,scope,c,f) -> no-await(setup,scope.extend,c,f) - fun cancel(scope) -> cancel(scope.extend) - fun async-iox(f) -> async-iox(f) -// ---------------------------------------------------------------------------- -// Async handle -// ---------------------------------------------------------------------------- +// // ---------------------------------------------------------------------------- +// // Async handle +// // ---------------------------------------------------------------------------- pub fun @default-async(action) async/handle(action) -fun nodispose() : io-noexn () +fun noop(): () () // The outer `:async` effect handler. This is automatically applied by the compiler // around the `main` function if it has an `:async` effect. -pub fun async/handle(action : () -> () ) : io-noexn () - val callbacks : ref io-noexn ())>> = unsafe-total{ref([])} - fun handle-await( setup : await-setup, scope : scope, f : error -> io-noexn (), cancelable : bool) : io-noexn () - val cscope = child-scope(unique(),scope) - val dispose = ref(nodispose) - fun cb( res : error<_>, is-done : bool ) : io-noexn () - if ((!callbacks).contains(cscope)) then - if is-done then - callbacks := (!callbacks).remove(cscope) - if res.is-error then try(!dispose).default(()) - f(res) - - // trace("register: " + cscope.show) - callbacks := Cons((cscope, if cancelable then fn(){ cb(Error(Exception("cancel",Cancel)),True) } else nodispose), !callbacks) - try { - // setup the callback which returns a possible dispose function - match(setup(cb)) - Just(d) -> dispose := d - Nothing -> () - } fn(exn) - // if setup fails, immediately resume with the exception - cb(Error(exn),True) - - fun handle-cancel( scope : scope ) : io-noexn () - (!callbacks).foreach fn(entry) - val (cscope,cb) = entry - if (cscope.in-scope-of(scope)) then cb() - - handle(action) - raw ctl do-await( setup, scope, c ) - handle-await(setup,scope, fn(x) rcontext.resume(x), c) // returns to outer event loop - fun no-await( setup, scope, c, f ) - handle-await(setup,scope,f,c) - fun cancel( scope ) +pub fun async/handle(action : () -> a ) : io-noexn () + val cancel-functions : ref ())>> = unsafe-total{ref([])} + fun handle-await( setup : await-setup, scope : scope, f : await-result -> io-noexn ()) : io-noexn () + // Each callback gets its own scope so we can tell when a given + // callback can be removed from `cancel-functions` + val cscope = scope.child(unique()) + + // `cancel-hook` is the event-system action which should happen + // on cancellation, e.g. clearTimeout(...) + val cancel-hook: ref = ref(noop) + + // TODO is there any code path where is-done is False? + fun cb( res : await-result<_>, is-done : bool ): io-noexn () + fun cleanup() + if is-done && ((!cancel-functions).contains(cscope)) then + cancel-functions := (!cancel-functions).remove(cscope) + if res.is-cancel then + (!cancel-hook)() + cleanup() + f(res) + + val trigger-cancel: () -> io-noexn () = fn() cb(Cancel, True) + cancel-functions := Cons((cscope, trigger-cancel), !cancel-functions) + match(setup(cb)) + Just(d) -> cancel-hook := d + Nothing -> () + + fun handle-cancel( scope-id) : io-noexn () + // TODO what happens if we try to cancel more than once? + (!cancel-functions).foreach fn(entry) + val (scope,trigger-cancel) = entry + if (scope.is-in-scope(scope-id)) then + trigger-cancel() + + with handler + val async-scope = empty-scope + raw ctl do-await(setup, scope) + handle-await(setup, scope, fn(x) { rcontext.resume(x); () }) + fun no-await(setup, scope, f) + handle-await(setup, scope, f) + fun cancel-scope(scope) handle-cancel(scope) - fun async-iox( f ) - f() + final ctl discontinue() + impossible("discontinue occurred at the toplevel scope") -fun io-noexn( f : () -> io-noexn a ) : io a - f() + fun async-iox( f ) + f() -fun io-noexn1( f : (a1) -> io-noexn a, x1 : a1 ) : io a - f(x1) + action() + () // ---------------------------------------------------------------------------- // Scope identifiers // ---------------------------------------------------------------------------- -abstract struct scope( : list ) +// A scope has an ID, plus a list of child IDs. +// Every created scope has a unique ID +alias scope-id = int +abstract value struct scope( : list ) +// A scope is the chain starting with the current scope, including parent scopes up to the root scope ID +// A scope is "in" a given scope-id if that scope-id appears in its id list val empty-scope = Scope([]) -fun parent-scope( cid : int, scope : scope ) : scope - match scope - Scope(cids) -> Scope(Cons(cid,cids)) - -fun child-scope( id : int, scope : scope ) : scope - match scope - Scope(cids) -> Scope(cids ++ [id]) - -fun ids/in-scope-of( child : list, parent : list ) : bool +fun child(parent : scope, child-id: scope-id) : scope match parent - Nil -> True - Cons(p,ps) -> match child - Cons(c,cs) -> (c == p && in-scope-of(cs,ps)) - Nil -> False + Scope(ids) -> Scope(Cons(child-id, ids)) -fun in-scope-of( child : scope, parent : scope ) : bool - match parent - Scope(pids) -> match child - Scope(cids) -> in-scope-of(cids,pids) +fun is-in-scope(child : scope, parent : scope-id ) : bool + match child + Scope(ids) -> ids.find(fn(x) x == parent).is-just -fun scope/(==)(scope1 : scope, scope2 : scope ) : bool +// every scope with the same ID must implicitly contain the same +// parent chain, so a simple comparison on the head is sufficient +pub fun scope/(==)(scope1 : scope, scope2 : scope ) : bool match scope1 Scope(ids1) -> match scope2 - Scope(ids2) -> ids1==ids2 + Scope(ids2) -> maybe/(==)(ids1.head, ids2.head, ?(==) = int/(==)) // Convenience functions for scope maps fun remove( xs : list<(scope,a)>, scope : scope ) : list<(scope,a)> @@ -630,35 +835,21 @@ fun lookup( xs : list<(scope,a)>, scope : scope ) : maybe fun contains( xs : list<(scope,a)>, scope : scope ) : bool xs.lookup(scope).bool -fun show( s : scope ) : string +pub fun scope/show( s : scope ) : string match s - Scope(ids) -> ids.map(show).join("-") - - + Scope(ids) -> " " ++ ids.map(fn(id) "{" ++ id.show ++ "}").join("<") ++ "<" -abstract extend type exception-info - con Cancel - con Finalize(yld:yield-info) - -// Was this a cancelation exception? -fun exn/is-cancel( exn : exception ) : bool - match exn.info - Cancel -> True - _ -> False - -// Was this a finalization exception? -fun exn/is-finalize(exn : exception) : bool - match exn.info - Finalize -> True - _ -> False +// TODO make private? +pub fun scope/cancel(s: scope) + match s + Scope(Cons(scope-id, _)) -> cancel-scope(scope-id) -fun unsafe-try-all( action : () -> a ) : e error - val fin = unsafe-try-finalize{ try(action) } - match fin - Right(t) -> t - Left(yld) -> Error(Exception("finalize",Finalize(yld))) + // TODO: make it impossible by types to have an empty scope? + _ -> () -fun rethrow( exn : exception ) : exn a - match exn.info - Finalize(yld) -> unsafe-reyield(yld) - _ -> throw-exn(exn) +// execute an action, but observe whether it completed normally or +// needs to instead run a finalizer (due to cancellation, exception, etc) +fun strand-exec( action : () -> a ): strand-result> + match unsafe-try-finalize({ unsafe-catch-cancel(action) }) + Left(fin) -> Finalize(fin) + Right(v) -> Complete(v) diff --git a/test/async/async-test.kk b/test/async/async-test.kk new file mode 100644 index 0000000..ed14f26 --- /dev/null +++ b/test/async/async-test.kk @@ -0,0 +1,379 @@ +import std/test +import std/num/int32 +import std/num/int64 +import std/async/async +import std/async/timer +import std/time/duration +import std/core-extras +import std/core/unsafe +import std/data/linearset + +fun list/push(l, item) + l ++ [item] + +fun listref/push(lr: ref<_, list>, item: a, ?show: (a) -> string) + // println("PUSH: " ++ item.show) + lr.modify fn(l) + l := l.push(item) + +val short-pause = 1.milli-seconds +val long-pause = 100.milli-seconds + +fun register(pending, api, scope: async/scope) + val id = unique() + val entry = (id, scope) + // println("[track-leaks " ++ api ++ "] -> [" ++ entry.show ++ "]") + + pending := (!pending).add(entry) + fun done(result) + // unsafe-total + // println("[track-leaks " ++ api ++ "] <- [" ++ entry.show ++ "] cancel=" ++ result.is-cancel.show) + pending := (!pending).remove(entry) + done +fun noleaks(action: () -> ()): () + val pending: ref> = ref(linear-set([])) + with override + fun do-await(setup, scope) + val done = pending.register("do-await", scope) + val result = do-await(setup, scope) + done(result) + result + + fun no-await(setup,scope,f) + val done = pending.register("no-await", scope) + no-await(setup, scope) fn(result) + done(result) + f(result) + + fun async-iox(f) + async-iox(f) + + fun cancel-scope(scope) + cancel-scope(scope) + + val async-scope = async-scope + fun discontinue() discontinue() + + with finally + if ! (!pending).is-empty then + // println("[track-leaks] pending=" ++ (!pending).map(fst).show) + throw("Test leaked " ++ (!pending).length.show ++ " async callbacks") + action() + +// for testing arbitrary effects that are't async-aware +effect can-abort + final ctl abort(): a + +fun can-abort(f: () -> ()): e () + with handler + final ctl abort() + () + f() + +effect audit + fun record(value: a): () + fun records(): list + +// fun audit(action: () -> ,expect,div|e> b, ?show: (a) -> div string): b +fun audit(action: () -> ,expect,div,console|e> b, ?show: (a) -> div string): b + var log := [] + handle(action) + fun record(value) + // TODO `hint` instead of println + // println("record: " ++ value.show) + log := log.push(value) + () + fun records() -> log + +fun error-message(err: error<_>): maybe + match err + Error(ex) -> Just(ex.message) + Ok(_) -> Nothing + +fun noleak-test(name, body, ?kk-module, ?kk-line) + // effectful-test("[vanilla] " ++ name, body) // rule out interference by noleaks itself + effectful-test(name) { noleaks(body) } + + +effect flip + ctl flip(): bool + +fun both(f: () -> a): e list + handle(f) + return(x) [x] + ctl flip() + resume(False) ++ resume(True) + +fun main(): () + run-tests + group("noleaks") + effectful-test("fails if a block leaks a callback") + val result = try + with noleaks + no-await(fn (cb) { Nothing }, async-scope, fn(_) {}) + expect(Just("Test leaked 1 async callbacks")) { result.error-message } + + effectful-test("doesn't interfere with async execution") + val result = try + with noleaks + wait(short-pause) + throw("fail!") + expect(Just("fail!")) { result.error-message } + + noleak-test("simple sleep") + expect(123) + wait(short-pause) + 123 + + noleak-test("interleaved singleton") + expect([1]) + fun action(): int + wait(short-pause) + 1 + interleaved([action]) + + noleak-test("interleaved singleton which throws") + with audit + fun action(): ,io> int + with finally + record("finally") + wait(short-pause) + throw("error") + + expect(Just("error")) + try { interleaved([action]) }.error-message + + expect(["finally"]) { records() } + + noleak-test("interleaved pair") + val pair = + interleaved { + wait(short-pause) + 1 + } { + wait(short-pause) + 2 + } + expect((1,2)) { pair } + + noleak-test("cancel the scope above an interleave") + with audit + val result = unsafe-catch-cancel + with child-scope + val outer-scope = async-scope + interleaved { + wait(short-pause) + outer-scope.cancel() + } { + with finally + record("finally") + wait(long-pause) + "strand 2 complete" + } + expect(["finally"]) { records() } + expect(True) { result.is-cancel } + + noleak-test("firstof doesn't run more actions if the first is synchronous") + with audit + val result = firstof { + 1 + } { + wait(short-pause) + record("second branch executed") + 2 + } + expect(result) { 1 } + expect([]: list) { records() } + + noleak-test("firstof doesn't spawn further actions if the first raises synchronously") + with audit + val result = try + firstof { + throw("branch 1") + } { + wait(short-pause) + record("second branch executed") + } + expect(Just("branch 1")) { result.error-message } + expect([]: list) { records() } + + noleak-test("interleave doesn't spawn further actions if the first raises synchronously") + with audit + val result = try + interleaved([{ + throw("branch 1") + }, { + wait(short-pause) + record("second branch executed") + }]) + expect(Just("branch 1")) { result.error-message } + expect([]: list) { records() } + + noleak-test("interleave with finalization after exn") + with audit + val result = try + interleaved { + wait(short-pause) + throw("strand 1 failed") + } { + with finally + record("finally") + on-cancel { + record("cancel") + } { + wait(long-pause) + } + "strand 2 complete" + } + expect(["cancel", "finally"]) { records() } + expect(Just("strand 1 failed")) { result.error-message } + + + noleak-test("firstof with finalization in slower branch") + with audit + val result = firstof { + wait(short-pause) + 1 + } { + with finally + record("finally") + on-cancel { + record("cancel") + } { + wait(long-pause) + } + 2 + } + expect(["cancel", "finally"]) { records() } + expect(1) { result } + + noleak-test("finalization can be async") + val log = ref([]: list) + fun long-running(): string + fun cleanup() + wait(long-pause) + log.push("cleanup") + + with finally(cleanup) + log.push("start") + wait(long-pause) // <- will be canceled + log.push("wakeup") + "long" + + fun short-running(): string + wait(short-pause) + log.push("cancel") + "short" + + val result = firstof(long-running, short-running) + log.push("result: " ++ result) + val e = list/(==)(["a"], ["b"]) + expect(["start", "cancel", "cleanup", "result: short"]) { !log } + + noleak-test("cancellation and try") + val interruptible = fn() + with finally { record("finally") } + try { + record("start") + wait(long-pause) + } fn(e) { + record("catch") + } + record("after-catch") + wait(short-pause) + record("still running...") + "result" + + with audit + expect(Nothing: maybe) + timeout(short-pause, interruptible) + + expect(["start", "finally"]) { records() } + + noleak-test("error in cancelation is raised") + val interruptible = fn() + with finally { throw("finalization exn") } + wait(long-pause) + "result" + + expect("finalization exn") + try { + timeout(short-pause, interruptible) + "success" + } fn(e) { e.message } + + noleak-test("exception in main code takes priority over error in cancelation") + val interruptible = fn() + with finally { throw("finalization exn") } + wait(long-pause) + "main exn" + + expect("finalization exn") + try { + firstof(interruptible) + wait(short-pause) + throw("main exn") + "success" + } fn(e) { e.message } + + noleak-test("defer-cancelation defers cancelation signal until block is complete") + val log = ref([]: list) + val interruptible = fn() + defer-cancelation + wait(long-pause) + log.push("in critical section") + log.push("after critical section") + + expect(Nothing) { timeout(short-pause, interruptible) } + expect(["in critical section"]) { !log } + + noleak-test("a cancelable block within a defered-cancelation block is canceled immediately") + with audit + val interruptible = fn() + defer-cancelation() + val inner = fn() + on-cancel { record("inner canceled") } + wait(long-pause) + record("inner still running after cancel") + timeout(short-pause, inner) + record("after-inner") + record("after-defer") + timeout(short-pause, interruptible) + expect(["inner canceled", "after-inner"]) { records() } + + noleak-test("cancelation and finalization both occur with aborting effect") + with audit + can-abort { + val _ = list/interleaved([ + fn() { + with finally + record("finally") + with on-cancel + record("cancel") + wait(long-pause) + record("first-branch continue") + }, + fn() { + wait(short-pause) + abort() + record("second-branch continue") + } + ]) + () + } + expect(["cancel", "finally"]) { records() } + + // TODO: this currently fails at runtime with `ev_10958.hnd._ctl_flip is not a function` + // noleak-test("multiple resumption works within async") + // with audit + // val result = both + // wait(short-pause) + // if flip() then + // wait(short-pause) + // record(True) + // "heads" + // else + // record(False) + // "tails" + // expect([True, False]) { records() } + // expect(["heads", "tails"]) { result }