diff --git a/src/Specular/FRP/Base.purs b/src/Specular/FRP/Base.purs index 313f3cc..6771530 100644 --- a/src/Specular/FRP/Base.purs +++ b/src/Specular/FRP/Base.purs @@ -53,6 +53,10 @@ module Specular.FRP.Base , map2 , mapN + + , mapAsync + + , module X.Incremental ) where import Prelude @@ -67,13 +71,12 @@ import Effect.Class (class MonadEffect, liftEffect) import Effect.Uncurried (EffectFn2, mkEffectFn1, mkEffectFn2, runEffectFn1, runEffectFn2, runEffectFn3) import Effect.Unsafe (unsafePerformEffect) import Safe.Coerce (coerce) +import Specular.Internal.Incremental (AsyncComputation(..), AsyncState(..)) as X.Incremental import Specular.Internal.Incremental as I import Specular.Internal.Incremental.Node (Node) import Specular.Internal.Incremental.Node as Node import Specular.Internal.Incremental.Optional as Optional import Specular.Internal.Profiling as Profiling -import Specular.Internal.Queue (Queue) -import Specular.Internal.Queue as Queue -- | import Partial.Unsafe (unsafeCrashWith) -- | import Unsafe.Coerce (unsafeCoerce) @@ -112,8 +115,6 @@ readNode node = do ------------------------------------------------------------- -type Unsubscribe = Effect Unit - -- | A source of occurences. -- | -- | During a frame, an Event occurs at most once with a value of type a. @@ -166,7 +167,7 @@ filterEvent f (Event node) = Event $ unsafePerformEffect do subscribeNode :: forall m a. MonadEffect m => MonadCleanup m => (a -> Effect Unit) -> Node a -> m Unit subscribeNode handler event = do - unsub <- liftEffect $ runEffectFn2 _subscribeNode handler event + unsub <- liftEffect $ runEffectFn2 I.subscribeNode handler event onCleanup unsub filterJustEvent :: forall a. Event (Maybe a) -> Event a @@ -175,23 +176,9 @@ filterJustEvent = filterMapEvent identity subscribeEvent_ :: forall m a. MonadEffect m => MonadCleanup m => (a -> Effect Unit) -> Event a -> m Unit subscribeEvent_ handler (Event node) = subscribeNode handler node -globalEffectQueue :: Queue (Effect Unit) -globalEffectQueue = unsafePerformEffect Queue.new - -drainEffects :: Effect Unit -drainEffects = runEffectFn2 Queue.drain globalEffectQueue (mkEffectFn1 \handler -> handler) - -_subscribeEvent :: forall a. EffectFn2 (a -> Effect Unit) (Event a) Unsubscribe +_subscribeEvent :: forall a. EffectFn2 (a -> Effect Unit) (Event a) I.Unsubscribe _subscribeEvent = mkEffectFn2 \handler (Event node) -> - runEffectFn2 _subscribeNode handler node - -_subscribeNode :: forall a. EffectFn2 (a -> Effect Unit) (Node a) Unsubscribe -_subscribeNode = mkEffectFn2 \handler node -> do - let - h = mkEffectFn1 \value -> do - runEffectFn2 Queue.enqueue globalEffectQueue (handler value) - runEffectFn2 I.addObserver node h - pure (runEffectFn2 I.removeObserver node h) + runEffectFn2 I.subscribeNode handler node -- | Create an Event that can be triggered externally. -- | Each `fire` will run a frame where the event occurs. @@ -210,11 +197,7 @@ newEvent = liftEffect do } stabilize :: Effect Unit -stabilize = do - mark <- runEffectFn1 Profiling.begin "Specular.stabilize" - I.stabilize - drainEffects - runEffectFn1 Profiling.end mark +stabilize = I.stabilize -- | Create a new Behavior whose value can be modified outside a frame. newBehavior :: forall m a. MonadEffect m => a -> m { behavior :: Behavior a, set :: a -> Effect Unit } @@ -500,3 +483,16 @@ instance semigroupDynamic :: Semigroup a => Semigroup (Dynamic a) where instance monoidDynamic :: Monoid a => Monoid (Dynamic a) where mempty = pure mempty + +-- | Map a possibly-asynchronous function over a Dynamic. +-- | +-- | When the source dynamic changes, the mapping function is re-evaluated. If it returns `Sync`, +-- | this works like `map` - the change is propagated in the same cycle. +-- | +-- | If it returns `Async`, then the dynamic will first transition to `InProgress` and start the async computation. +-- | After it finished, it will transition to `Finished` (which might contain an error). +mapAsync :: forall a b. (a -> I.AsyncComputation b) -> Dynamic a -> Dynamic (I.AsyncState b) +mapAsync f (Dynamic node) = Dynamic $ unsafePerformEffect do + n <- runEffectFn2 I.mapAsync f node + runEffectFn2 Node.annotate n ("mapAsync " <> Node.name node) + pure n diff --git a/src/Specular/FRP/List.js b/src/Specular/FRP/List.js deleted file mode 100644 index 06053ef..0000000 --- a/src/Specular/FRP/List.js +++ /dev/null @@ -1,6 +0,0 @@ -// nextMicrotask :: Effect Unit -> Effect Unit -export function nextMicrotask(eff) { - return function () { - Promise.resolve().then(eff); - }; -} diff --git a/src/Specular/FRP/List.purs b/src/Specular/FRP/List.purs index 760deac..f824069 100644 --- a/src/Specular/FRP/List.purs +++ b/src/Specular/FRP/List.purs @@ -7,15 +7,16 @@ module Specular.FRP.List import Prelude -import Effect (Effect) -import Effect.Class (liftEffect) import Control.Monad.Replace (class MonadReplace, Slot, newSlot, appendSlot, replaceSlot, destroySlot) import Data.Array as Array import Data.Maybe (Maybe(..)) import Data.Traversable (traverse) +import Effect (Effect) +import Effect.Class (liftEffect) +import Effect.Ref (Ref, new, read, write) import Specular.FRP (Dynamic, holdDyn, subscribeDyn_) import Specular.FRP.Base (class MonadFRP, holdUniqDynBy, newEvent) -import Effect.Ref (Ref, new, read, write) +import Specular.Internal.Effect (nextMicrotask) import Unsafe.Reference (unsafeRefEq) -- | `dynamicListWithIndex dynArray handler` @@ -83,8 +84,6 @@ updateList latestRef mainSlot handler newArray = do write newLatest latestRef pure $ map _.result newLatest -foreign import nextMicrotask :: Effect Unit -> Effect Unit - dynamicList :: forall m a b . MonadFRP m diff --git a/src/Specular/Internal/Effect.js b/src/Specular/Internal/Effect.js index 64771eb..79cd5a8 100644 --- a/src/Specular/Internal/Effect.js +++ b/src/Specular/Internal/Effect.js @@ -29,3 +29,10 @@ export function sequenceEffects(effects) { } }; } + +// nextMicrotask :: Effect Unit -> Effect Unit +export function nextMicrotask(eff) { + return function () { + Promise.resolve().then(eff); + }; +} diff --git a/src/Specular/Internal/Effect.purs b/src/Specular/Internal/Effect.purs index 1dd8c05..aa3e2c3 100644 --- a/src/Specular/Internal/Effect.purs +++ b/src/Specular/Internal/Effect.purs @@ -4,6 +4,7 @@ module Specular.Internal.Effect , pushDelayed , unsafeFreezeDelayed , sequenceEffects + , nextMicrotask ) where import Prelude @@ -19,3 +20,5 @@ foreign import pushDelayed :: DelayedEffects -> Effect Unit -> Effect Unit foreign import unsafeFreezeDelayed :: DelayedEffects -> Effect (Array (Effect Unit)) foreign import sequenceEffects :: Array (Effect Unit) -> Effect Unit + +foreign import nextMicrotask :: Effect Unit -> Effect Unit diff --git a/src/Specular/Internal/ExclusiveTask.purs b/src/Specular/Internal/ExclusiveTask.purs new file mode 100644 index 0000000..2d69a39 --- /dev/null +++ b/src/Specular/Internal/ExclusiveTask.purs @@ -0,0 +1,53 @@ +module Specular.Internal.ExclusiveTask where + +import Prelude + +import Control.Monad.Error.Class (class MonadError, catchJust) +import Data.Maybe (Maybe(..)) +import Effect (Effect) +import Effect.Aff (Aff, Error, Fiber, error, joinFiber, killFiber, launchAff_, launchSuspendedAff) +import Effect.Class (class MonadEffect, liftEffect) +import Effect.Ref as ERef +import Unsafe.Reference (unsafeRefEq) + +data State + = Idle + | Running (Fiber Unit) + +newtype ExclusiveTask = ExclusiveTask + { state :: ERef.Ref State + } + +new :: forall m. MonadEffect m => m ExclusiveTask +new = do + state_ <- liftEffect $ ERef.new Idle + pure $ ExclusiveTask { state: state_ } + +-- | Run an Aff action in this exclusive task slot. +-- | If there was a previous task running, it is first cancelled. +run :: ExclusiveTask -> Aff Unit -> Effect Unit +run (ExclusiveTask self) block = do + newFiber <- launchSuspendedAff block + launchAff_ $ catchCancelled do + state <- liftEffect $ ERef.read self.state + case state of + Idle -> + pure unit + Running fiber -> + killFiber cancelledError fiber + + liftEffect $ ERef.write (Running newFiber) self.state + -- Only now resume the new fiber + joinFiber newFiber + + liftEffect $ ERef.write Idle self.state + +cancelledError :: Error +cancelledError = error "Cancelled" + +isCancelledError :: Error -> Boolean +isCancelledError e = e `unsafeRefEq` cancelledError + +catchCancelled :: forall m. MonadError Error m => m Unit -> m Unit +catchCancelled block = + catchJust (\e -> if isCancelledError e then Just e else Nothing) block (\_ -> pure unit) diff --git a/src/Specular/Internal/Incremental.purs b/src/Specular/Internal/Incremental.purs index 39f5865..d80ad20 100644 --- a/src/Specular/Internal/Incremental.purs +++ b/src/Specular/Internal/Incremental.purs @@ -2,26 +2,38 @@ module Specular.Internal.Incremental where import Prelude +import Data.Either (Either(..)) import Data.Function.Uncurried (Fn2, runFn2) +import Data.Generic.Rep (class Generic) +import Data.Maybe (Maybe(..)) +import Data.Show.Generic (genericShow) import Effect (Effect) +import Effect.Aff (Aff, Error) +import Effect.Class (liftEffect) import Effect.Console as Console import Effect.Uncurried (EffectFn1, EffectFn2, EffectFn3, mkEffectFn1, mkEffectFn2, mkEffectFn3, runEffectFn1, runEffectFn2, runEffectFn3, runEffectFn4) import Effect.Unsafe (unsafePerformEffect) import Partial.Unsafe (unsafeCrashWith) +import Specular.Internal.ExclusiveTask as ExclusiveTask +import Specular.Internal.Effect (nextMicrotask) import Specular.Internal.Incremental.Array as Array import Specular.Internal.Incremental.Effect (foreachUntil) -import Specular.Internal.Incremental.Global (globalCurrentStabilizationNum, globalTotalRefcount, globalLastStabilizationNum, stabilizationIsNotInProgress) +import Specular.Internal.Incremental.Global (globalCurrentStabilizationNum, globalLastStabilizationNum, globalTotalRefcount, stabilizationIsNotInProgress) import Specular.Internal.Incremental.Mutable (Field(..)) import Specular.Internal.Incremental.MutableArray as MutableArray -import Specular.Internal.Incremental.Node (Node, SomeNode, Observer, toSomeNode, toSomeNodeArray) +import Specular.Internal.Incremental.Node (Node, Observer, SomeNode, toSomeNode, toSomeNodeArray) import Specular.Internal.Incremental.Node as Node import Specular.Internal.Incremental.Optional (Optional) import Specular.Internal.Incremental.Optional as Optional import Specular.Internal.Incremental.PriorityQueue as PQ import Specular.Internal.Incremental.Ref as Ref import Specular.Internal.Profiling as Profiling +import Specular.Internal.Queue (Queue) +import Specular.Internal.Queue as Queue import Unsafe.Coerce (unsafeCoerce) +type Unsubscribe = Effect Unit + -- | Priority queue for propagating node changes in dependency order. globalRecomputeQueue :: PQ.PQ SomeNode globalRecomputeQueue = unsafePerformEffect $ @@ -49,6 +61,10 @@ newVar = mkEffectFn1 \val -> do setVar :: forall a. EffectFn2 (Var a) a Unit setVar = mkEffectFn2 \(Var node) val -> do runEffectFn2 Node.set_value node (Optional.some val) + runEffectFn1 addToRecomputeQueue node + +addToRecomputeQueue :: forall a. EffectFn1 (Node a) Unit +addToRecomputeQueue = mkEffectFn1 \node -> do _ <- runEffectFn2 PQ.add globalRecomputeQueue (toSomeNode node) pure unit @@ -175,12 +191,30 @@ disconnect = mkEffectFn1 \node -> do runEffectFn1 Profiling.end mark +-- * Effect queue + +globalEffectQueue :: Queue (Effect Unit) +globalEffectQueue = unsafePerformEffect Queue.new + +subscribeNode :: forall a. EffectFn2 (a -> Effect Unit) (Node a) Unsubscribe +subscribeNode = mkEffectFn2 \handler node -> do + let + h = mkEffectFn1 \value -> do + runEffectFn2 Queue.enqueue globalEffectQueue (handler value) + runEffectFn2 addObserver node h + pure (runEffectFn2 removeObserver node h) + -- * Recompute stabilize :: Effect Unit stabilize = do mark <- runEffectFn1 Profiling.begin "stabilize" + stabilizationNum <- runEffectFn1 Ref.read globalCurrentStabilizationNum + if stabilizationNum /= stabilizationIsNotInProgress then + unsafeCrashWith "Specular: stabilize called when stabilization already in progress" + else pure unit + oldStabilizationNum <- runEffectFn1 Ref.read globalLastStabilizationNum let currentStabilizationNum = oldStabilizationNum + 1 runEffectFn2 Ref.write globalLastStabilizationNum currentStabilizationNum @@ -191,6 +225,10 @@ stabilize = do runEffectFn2 Ref.write globalCurrentStabilizationNum stabilizationIsNotInProgress runEffectFn1 Profiling.end mark + mark2 <- runEffectFn1 Profiling.begin "drainEffects" + runEffectFn2 Queue.drain globalEffectQueue (mkEffectFn1 \handler -> handler) + runEffectFn1 Profiling.end mark2 + recomputeNode :: EffectFn1 SomeNode Unit recomputeNode = mkEffectFn1 \node -> do height <- runEffectFn1 Node.get_height node @@ -282,6 +320,60 @@ mapOptional = mkEffectFn2 \fn a -> do , dependencies: pure deps } +data AsyncComputation a = Sync a | Async (Aff a) + +data AsyncState a + = InProgress (Maybe (Either Error a)) + | Finished (Either Error a) + +derive instance Generic (AsyncState a) _ +instance Show a => Show (AsyncState a) where + show = genericShow + +mapAsync :: forall a b. EffectFn2 (a -> AsyncComputation b) (Node a) (Node (AsyncState b)) +mapAsync = mkEffectFn2 \fn a -> do + let deps = [ toSomeNode a ] + task <- ExclusiveTask.new + finishedRef <- runEffectFn1 Ref.new Nothing + runEffectFn1 Node.create + { compute: mkEffectFn1 \self -> do + -- Need to determine why we're updating - because the input changed, or because the computation finished? + finished <- runEffectFn1 Ref.read finishedRef + case finished of + Nothing -> do + value_a <- runEffectFn1 Node.valueExc a + case fn value_a of + Sync x -> + pure (Optional.some (Finished (Right x))) + Async aff -> do + nextMicrotask do + ExclusiveTask.run task do + newValue <- aff + liftEffect do + runEffectFn2 Ref.write finishedRef (Just (Right newValue)) + runEffectFn1 addToRecomputeQueue self + stabilize + previous <- runEffectFn1 Node.get_value self + pure (Optional.some (InProgress (getPreviousValue previous))) + Just x -> do + -- Hmm. Can the input also be changing at the same time we're reporting the result of async computation? + -- Currently not, because we always stabilize after changing a node. + -- But if we introduced some batching later on, it could happen, + -- in which case we'd need to check `isChangingInCurrentStabilization` of our dependency. + + runEffectFn2 Ref.write finishedRef Nothing + pure (Optional.some (Finished x)) + , dependencies: pure deps + } + + where + getPreviousValue opt + | Optional.isSome opt = + case Optional.fromSome opt of + InProgress x -> x + Finished x -> Just x + | otherwise = Nothing + map2 :: forall a b c. EffectFn3 (Fn2 a b c) (Node a) (Node b) (Node c) map2 = mkEffectFn3 \fn a b -> do let deps = [ toSomeNode a, toSomeNode b ] diff --git a/test/Test/Utils.purs b/test/Test/Utils.purs index d81deac..c14ff77 100644 --- a/test/Test/Utils.purs +++ b/test/Test/Utils.purs @@ -5,21 +5,21 @@ module Test.Utils import Prelude hiding (append) -import Effect.Aff (Aff, Milliseconds(..), delay) -import Effect (Effect) -import Effect.Class (liftEffect) import Data.Array (snoc) +import Effect (Effect) +import Effect.Aff (Aff, Milliseconds(..), delay) +import Effect.Class (class MonadEffect, liftEffect) import Effect.Ref (Ref, modify_, new, read, write) -import Test.Spec.Assertions (fail, shouldEqual) -import Type.Prelude (Proxy) import Effect.Uncurried (runEffectFn1) import Specular.Internal.Incremental.Global (globalTotalRefcount) import Specular.Internal.Incremental.Ref as Ref +import Test.Spec.Assertions (fail, shouldEqual) +import Type.Prelude (Proxy) -append :: forall a. Ref (Array a) -> a -> Effect Unit -append ref value = modify_ (\a -> snoc a value) ref +append :: forall m a. MonadEffect m => Ref (Array a) -> a -> m Unit +append ref value = liftEffect $ modify_ (\a -> snoc a value) ref -clear :: forall a. Ref (Array a) -> Aff Unit +clear :: forall m a. MonadEffect m => Ref (Array a) -> m Unit clear ref = liftEffect $ write [] ref shouldHaveValue :: forall a. Eq a => Show a => Ref a -> a -> Aff Unit diff --git a/test/node/AsyncSpec.purs b/test/node/AsyncSpec.purs index 74691cb..6a94b30 100644 --- a/test/node/AsyncSpec.purs +++ b/test/node/AsyncSpec.purs @@ -12,7 +12,7 @@ import Effect.Class (liftEffect) import Specular.FRP (current, newEvent, pull, subscribeEvent_) import Specular.FRP.Async (RequestState(..), asyncRequestMaybe, performEvent) import Specular.FRP.Base (readBehavior, subscribeDyn_, readDynamic) -import Effect.Ref (new) +import Effect.Ref as Ref import Test.Spec (Spec, describe, it) import Test.Spec.Assertions (shouldEqual) import Test.Utils (append, clear, shouldHaveValue, shouldReturn) @@ -22,7 +22,7 @@ spec = do describe "asyncRequestMaybe" $ do it "makes a request for initial value" $ do avar <- AVar.empty - log <- liftEffect $ new [] + log <- liftEffect $ Ref.new [] let request = AVar.take avar @@ -38,7 +38,7 @@ spec = do it "makes a request when the value changes" $ do avar <- AVar.empty - log <- liftEffect $ new [] + log <- liftEffect $ Ref.new [] let request = AVar.take avar @@ -61,7 +61,7 @@ spec = do it "ignores responses to requests older than the current" $ do avar1 <- AVar.empty avar2 <- AVar.empty - log <- liftEffect $ new [] + log <- liftEffect $ Ref.new [] Tuple dyn setDyn <- liftEffect $ newDynamic Nothing @@ -78,7 +78,7 @@ spec = do clear log AVar.put "result1" avar1 - log `shouldHaveValue` [] -- should be ignored, as new request is going on + log `shouldHaveValue` [] -- should be ignored, as Ref.new request is going on clear log AVar.put "result2" avar2 @@ -87,7 +87,7 @@ spec = do it "ignores out-of-order responses" $ do avar1 <- AVar.empty avar2 <- AVar.empty - log <- liftEffect $ new [] + log <- liftEffect $ Ref.new [] Tuple dyn setDyn <- liftEffect $ newDynamic Nothing @@ -128,7 +128,7 @@ spec = do -- In `log` we'll have pairs of (String, String) -- The first String is the request description, the second is the result. - log <- liftEffect $ new [] + log <- liftEffect $ Ref.new [] Tuple result _ <- runCleanupT do status <- asyncRequestMaybe $ map snd dyn @@ -172,7 +172,7 @@ spec = do describe "performEvent" $ do it "runs handler and pushes return value to event" $ do { event, fire } <- liftEffect newEvent - log <- liftEffect $ new [] + log <- liftEffect $ Ref.new [] _ <- liftEffect $ runCleanupT $ do result <- performEvent $ map (\x -> liftEffect (append log ("handler:" <> x)) *> pure x) diff --git a/test/node/Main.purs b/test/node/Main.purs index efbe286..0442a43 100644 --- a/test/node/Main.purs +++ b/test/node/Main.purs @@ -11,6 +11,7 @@ import RIOSpec as RIOSpec import Test.Spec.Reporter (consoleReporter) import Test.Spec.Runner (runSpec) import TraceSpec as TraceSpec +import MapAsyncSpec as MapAsyncSpec main :: Effect Unit main = launchAff_ $ runSpec [ consoleReporter ] do @@ -19,3 +20,4 @@ main = launchAff_ $ runSpec [ consoleReporter ] do RIOSpec.spec TraceSpec.spec AsyncSpec.spec + MapAsyncSpec.spec diff --git a/test/node/MapAsyncSpec.purs b/test/node/MapAsyncSpec.purs new file mode 100644 index 0000000..01b31cc --- /dev/null +++ b/test/node/MapAsyncSpec.purs @@ -0,0 +1,220 @@ +module MapAsyncSpec where + +import Prelude hiding (append) + +import Control.Monad.Cleanup (execCleanupT, runCleanupT) +import Data.Tuple (Tuple(..), fst, snd) +import Effect.Aff.AVar as AVar +import Effect.Class (liftEffect) +import Effect.Ref as ERef +import Specular.FRP.Base (AsyncComputation(..), mapAsync, readDynamic, subscribeDyn_) +import Specular.Ref as Ref +import Test.Spec (Spec, describe, it) +import Test.Utils (append, clear, shouldHaveValue, shouldReturn, yieldAff) + +spec :: Spec Unit +spec = describe "mapAsync" do + it "makes a request for initial value" do + avar <- AVar.empty + log <- liftEffect $ ERef.new [] + + let request = AVar.take avar + let d = mapAsync Async (pure request) + + _ <- execCleanupT do + subscribeDyn_ (append log <<< show) d + + log `shouldHaveValue` [ "(InProgress Nothing)" ] + + clear log + AVar.put 1 avar + yieldAff + + log `shouldHaveValue` [ "(Finished (Right 1))" ] + + it "doesn't make a request when not subscribed" do + log <- liftEffect $ ERef.new [] + + let request = liftEffect $ append log "requested" + let d = mapAsync Async (pure request) + + yieldAff + + log `shouldHaveValue` [] + + -- Do something with `d`, to prevent it being DCEd (I think this works, but not sure...) + _ <- execCleanupT do + subscribeDyn_ (append log <<< show) d + pure unit + + it "forces sync Affs to run after reporting Loading" do + log <- liftEffect $ ERef.new [] + + let request = liftEffect $ append log "requested" *> pure 1 + let d = mapAsync Async (pure request) + + _ <- execCleanupT do + subscribeDyn_ (append log <<< ("sub1: " <> _) <<< show) d + + yieldAff + + log `shouldHaveValue` + ["sub1: (InProgress Nothing)", "requested", "sub1: (Finished (Right 1))"] + + pure unit + + it "only requests once for two subscriptions" do + log <- liftEffect $ ERef.new [] + + let request = liftEffect $ append log "requested" + let d = mapAsync Async (pure request) + + _ <- execCleanupT do + subscribeDyn_ (append log <<< ("sub1: " <> _) <<< show) d + subscribeDyn_ (append log <<< ("sub2: " <> _) <<< show) d + + yieldAff + + log `shouldHaveValue` + [ "sub1: (InProgress Nothing)" + , "sub2: (InProgress Nothing)" + , "requested" + , "sub1: (Finished (Right unit))" + , "sub2: (Finished (Right unit))" + ] + + pure unit + + it "makes a request when the value changes" do + avar <- AVar.empty + log <- liftEffect $ ERef.new [] + + r <- Ref.new (Sync 1) + let result = mapAsync identity (Ref.value r) + + _ <- execCleanupT do + subscribeDyn_ (append log <<< show) result + + log `shouldHaveValue` [ "(Finished (Right 1))" ] + + clear log + Ref.write r $ Async do + append log "requested" + AVar.take avar + yieldAff + log `shouldHaveValue` [ "(InProgress (Just (Right 1)))", "requested" ] + + clear log + AVar.put 2 avar + log `shouldHaveValue` [ "(Finished (Right 2))" ] + + it "ignores responses to requests older than the current" do + avar1 <- AVar.empty + avar2 <- AVar.empty + log <- liftEffect $ ERef.new [] + + r <- Ref.new (Sync 1) + let result = mapAsync identity (Ref.value r) + + _ <- execCleanupT do + subscribeDyn_ (append log <<< show) result + + log `shouldHaveValue` [ "(Finished (Right 1))" ] + + clear log + Ref.write r $ Async $ AVar.take avar1 + yieldAff + Ref.write r $ Async $ AVar.take avar2 + yieldAff + + log `shouldHaveValue` + [ "(InProgress (Just (Right 1)))" + , "(InProgress (Just (Right 1)))" + ] + + clear log + AVar.put 11 avar1 + yieldAff + log `shouldHaveValue` [] -- should be ignored, as new request is going on + + clear log + AVar.put 12 avar2 + yieldAff + log `shouldHaveValue` [ "(Finished (Right 12))" ] + + it "ignores out-of-order responses" do + avar1 <- AVar.empty + avar2 <- AVar.empty + log <- liftEffect $ ERef.new [] + + r <- Ref.new (Sync 1) + let result = mapAsync identity (Ref.value r) + + _ <- execCleanupT do + subscribeDyn_ (append log <<< show) result + + Ref.write r $ Async $ AVar.take avar1 + yieldAff + Ref.write r $ Async $ AVar.take avar2 + yieldAff + + clear log + AVar.put 12 avar2 + yieldAff + log `shouldHaveValue` [ "(Finished (Right 12))" ] + + clear log + AVar.put 11 avar1 + yieldAff + log `shouldHaveValue` [] -- should be ignored, as this request was replaced by avar2 + + it "request dynamic and status dynamic are consistent" do + -- `let status = mapAsync identity request` + -- Some relations must hold between the values of `status` and `request`: + -- + -- - If `request` is `Sync x`, then `status` is immediately `Finished (Right x)` + -- - If `request` is `Just x`, then `status` is either `Loading` or `Loaded y`, + -- where `y` is the result of running `x`. + -- + -- A naive implementation would expose intermediate states where these + -- invariants don't hold. This test checks for this. + + avar <- AVar.empty + + -- In `r` we'll store pairs of (String, AsyncComputation Int). + -- The first string is a description, and goes to the log; + -- the action is the request. + r <- Ref.new $ Tuple "Sync 1" (Sync 1 :: AsyncComputation Int) + + -- In `log` we'll have pairs of (String, String) + -- The first String is the request description, the second is the result. + log <- liftEffect $ ERef.new [] + + let status = mapAsync snd (Ref.value r) + let result = Tuple <$> map fst (Ref.value r) <*> (show <$> status) + + _ <- runCleanupT do + subscribeDyn_ (append log) result + + log `shouldHaveValue` [ Tuple "Sync 1" "(Finished (Right 1))" ] + readDynamic result `shouldReturn` Tuple "Sync 1" "(Finished (Right 1))" + + -- Test with asynchronous action + clear log + Ref.write r $ Tuple "async 2" $ Async $ AVar.take avar + yieldAff + log `shouldHaveValue` [ Tuple "async 2" "(InProgress (Just (Right 1)))" ] + readDynamic result `shouldReturn` Tuple "async 2" "(InProgress (Just (Right 1)))" + + clear log + AVar.put 2 avar + yieldAff + log `shouldHaveValue` [ Tuple "async 2" "(Finished (Right 2))" ] + readDynamic result `shouldReturn` Tuple "async 2" "(Finished (Right 2))" + + -- Test with change to Sync + clear log + Ref.write r $ Tuple "Sync 3" (Sync 3) + yieldAff + log `shouldHaveValue` [ Tuple "Sync 3" "(Finished (Right 3))" ] + readDynamic result `shouldReturn` Tuple "Sync 3" "(Finished (Right 3))"