Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 22 additions & 26 deletions src/Specular/FRP/Base.purs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ module Specular.FRP.Base

, map2
, mapN

, mapAsync

, module X.Incremental
) where

import Prelude
Expand All @@ -67,13 +71,12 @@ import Effect.Class (class MonadEffect, liftEffect)
import Effect.Uncurried (EffectFn2, mkEffectFn1, mkEffectFn2, runEffectFn1, runEffectFn2, runEffectFn3)
import Effect.Unsafe (unsafePerformEffect)
import Safe.Coerce (coerce)
import Specular.Internal.Incremental (AsyncComputation(..), AsyncState(..)) as X.Incremental
import Specular.Internal.Incremental as I
import Specular.Internal.Incremental.Node (Node)
import Specular.Internal.Incremental.Node as Node
import Specular.Internal.Incremental.Optional as Optional
import Specular.Internal.Profiling as Profiling
import Specular.Internal.Queue (Queue)
import Specular.Internal.Queue as Queue

-- | import Partial.Unsafe (unsafeCrashWith)
-- | import Unsafe.Coerce (unsafeCoerce)
Expand Down Expand Up @@ -112,8 +115,6 @@ readNode node = do

-------------------------------------------------------------

type Unsubscribe = Effect Unit

-- | A source of occurences.
-- |
-- | During a frame, an Event occurs at most once with a value of type a.
Expand Down Expand Up @@ -166,7 +167,7 @@ filterEvent f (Event node) = Event $ unsafePerformEffect do

subscribeNode :: forall m a. MonadEffect m => MonadCleanup m => (a -> Effect Unit) -> Node a -> m Unit
subscribeNode handler event = do
unsub <- liftEffect $ runEffectFn2 _subscribeNode handler event
unsub <- liftEffect $ runEffectFn2 I.subscribeNode handler event
onCleanup unsub

filterJustEvent :: forall a. Event (Maybe a) -> Event a
Expand All @@ -175,23 +176,9 @@ filterJustEvent = filterMapEvent identity
subscribeEvent_ :: forall m a. MonadEffect m => MonadCleanup m => (a -> Effect Unit) -> Event a -> m Unit
subscribeEvent_ handler (Event node) = subscribeNode handler node

globalEffectQueue :: Queue (Effect Unit)
globalEffectQueue = unsafePerformEffect Queue.new

drainEffects :: Effect Unit
drainEffects = runEffectFn2 Queue.drain globalEffectQueue (mkEffectFn1 \handler -> handler)

_subscribeEvent :: forall a. EffectFn2 (a -> Effect Unit) (Event a) Unsubscribe
_subscribeEvent :: forall a. EffectFn2 (a -> Effect Unit) (Event a) I.Unsubscribe
_subscribeEvent = mkEffectFn2 \handler (Event node) ->
runEffectFn2 _subscribeNode handler node

_subscribeNode :: forall a. EffectFn2 (a -> Effect Unit) (Node a) Unsubscribe
_subscribeNode = mkEffectFn2 \handler node -> do
let
h = mkEffectFn1 \value -> do
runEffectFn2 Queue.enqueue globalEffectQueue (handler value)
runEffectFn2 I.addObserver node h
pure (runEffectFn2 I.removeObserver node h)
runEffectFn2 I.subscribeNode handler node

-- | Create an Event that can be triggered externally.
-- | Each `fire` will run a frame where the event occurs.
Expand All @@ -210,11 +197,7 @@ newEvent = liftEffect do
}

stabilize :: Effect Unit
stabilize = do
mark <- runEffectFn1 Profiling.begin "Specular.stabilize"
I.stabilize
drainEffects
runEffectFn1 Profiling.end mark
stabilize = I.stabilize

-- | Create a new Behavior whose value can be modified outside a frame.
newBehavior :: forall m a. MonadEffect m => a -> m { behavior :: Behavior a, set :: a -> Effect Unit }
Expand Down Expand Up @@ -500,3 +483,16 @@ instance semigroupDynamic :: Semigroup a => Semigroup (Dynamic a) where

instance monoidDynamic :: Monoid a => Monoid (Dynamic a) where
mempty = pure mempty

-- | Map a possibly-asynchronous function over a Dynamic.
-- |
-- | When the source dynamic changes, the mapping function is re-evaluated. If it returns `Sync`,
-- | this works like `map` - the change is propagated in the same cycle.
-- |
-- | If it returns `Async`, then the dynamic will first transition to `InProgress` and start the async computation.
-- | After it finished, it will transition to `Finished` (which might contain an error).
mapAsync :: forall a b. (a -> I.AsyncComputation b) -> Dynamic a -> Dynamic (I.AsyncState b)
mapAsync f (Dynamic node) = Dynamic $ unsafePerformEffect do
n <- runEffectFn2 I.mapAsync f node
runEffectFn2 Node.annotate n ("mapAsync " <> Node.name node)
pure n
6 changes: 0 additions & 6 deletions src/Specular/FRP/List.js

This file was deleted.

9 changes: 4 additions & 5 deletions src/Specular/FRP/List.purs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ module Specular.FRP.List

import Prelude

import Effect (Effect)
import Effect.Class (liftEffect)
import Control.Monad.Replace (class MonadReplace, Slot, newSlot, appendSlot, replaceSlot, destroySlot)
import Data.Array as Array
import Data.Maybe (Maybe(..))
import Data.Traversable (traverse)
import Effect (Effect)
import Effect.Class (liftEffect)
import Effect.Ref (Ref, new, read, write)
import Specular.FRP (Dynamic, holdDyn, subscribeDyn_)
import Specular.FRP.Base (class MonadFRP, holdUniqDynBy, newEvent)
import Effect.Ref (Ref, new, read, write)
import Specular.Internal.Effect (nextMicrotask)
import Unsafe.Reference (unsafeRefEq)

-- | `dynamicListWithIndex dynArray handler`
Expand Down Expand Up @@ -83,8 +84,6 @@ updateList latestRef mainSlot handler newArray = do
write newLatest latestRef
pure $ map _.result newLatest

foreign import nextMicrotask :: Effect Unit -> Effect Unit

dynamicList
:: forall m a b
. MonadFRP m
Expand Down
7 changes: 7 additions & 0 deletions src/Specular/Internal/Effect.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,10 @@ export function sequenceEffects(effects) {
}
};
}

// nextMicrotask :: Effect Unit -> Effect Unit
export function nextMicrotask(eff) {
return function () {
Promise.resolve().then(eff);
};
}
3 changes: 3 additions & 0 deletions src/Specular/Internal/Effect.purs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module Specular.Internal.Effect
, pushDelayed
, unsafeFreezeDelayed
, sequenceEffects
, nextMicrotask
) where

import Prelude
Expand All @@ -19,3 +20,5 @@ foreign import pushDelayed :: DelayedEffects -> Effect Unit -> Effect Unit
foreign import unsafeFreezeDelayed :: DelayedEffects -> Effect (Array (Effect Unit))

foreign import sequenceEffects :: Array (Effect Unit) -> Effect Unit

foreign import nextMicrotask :: Effect Unit -> Effect Unit
53 changes: 53 additions & 0 deletions src/Specular/Internal/ExclusiveTask.purs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
module Specular.Internal.ExclusiveTask where

import Prelude

import Control.Monad.Error.Class (class MonadError, catchJust)
import Data.Maybe (Maybe(..))
import Effect (Effect)
import Effect.Aff (Aff, Error, Fiber, error, joinFiber, killFiber, launchAff_, launchSuspendedAff)
import Effect.Class (class MonadEffect, liftEffect)
import Effect.Ref as ERef
import Unsafe.Reference (unsafeRefEq)

data State
= Idle
| Running (Fiber Unit)

newtype ExclusiveTask = ExclusiveTask
{ state :: ERef.Ref State
}

new :: forall m. MonadEffect m => m ExclusiveTask
new = do
state_ <- liftEffect $ ERef.new Idle
pure $ ExclusiveTask { state: state_ }

-- | Run an Aff action in this exclusive task slot.
-- | If there was a previous task running, it is first cancelled.
run :: ExclusiveTask -> Aff Unit -> Effect Unit
run (ExclusiveTask self) block = do
newFiber <- launchSuspendedAff block
launchAff_ $ catchCancelled do
state <- liftEffect $ ERef.read self.state
case state of
Idle ->
pure unit
Running fiber ->
killFiber cancelledError fiber

liftEffect $ ERef.write (Running newFiber) self.state
-- Only now resume the new fiber
joinFiber newFiber

liftEffect $ ERef.write Idle self.state

cancelledError :: Error
cancelledError = error "Cancelled"

isCancelledError :: Error -> Boolean
isCancelledError e = e `unsafeRefEq` cancelledError

catchCancelled :: forall m. MonadError Error m => m Unit -> m Unit
catchCancelled block =
catchJust (\e -> if isCancelledError e then Just e else Nothing) block (\_ -> pure unit)
96 changes: 94 additions & 2 deletions src/Specular/Internal/Incremental.purs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,38 @@ module Specular.Internal.Incremental where

import Prelude

import Data.Either (Either(..))
import Data.Function.Uncurried (Fn2, runFn2)
import Data.Generic.Rep (class Generic)
import Data.Maybe (Maybe(..))
import Data.Show.Generic (genericShow)
import Effect (Effect)
import Effect.Aff (Aff, Error)
import Effect.Class (liftEffect)
import Effect.Console as Console
import Effect.Uncurried (EffectFn1, EffectFn2, EffectFn3, mkEffectFn1, mkEffectFn2, mkEffectFn3, runEffectFn1, runEffectFn2, runEffectFn3, runEffectFn4)
import Effect.Unsafe (unsafePerformEffect)
import Partial.Unsafe (unsafeCrashWith)
import Specular.Internal.ExclusiveTask as ExclusiveTask
import Specular.Internal.Effect (nextMicrotask)
import Specular.Internal.Incremental.Array as Array
import Specular.Internal.Incremental.Effect (foreachUntil)
import Specular.Internal.Incremental.Global (globalCurrentStabilizationNum, globalTotalRefcount, globalLastStabilizationNum, stabilizationIsNotInProgress)
import Specular.Internal.Incremental.Global (globalCurrentStabilizationNum, globalLastStabilizationNum, globalTotalRefcount, stabilizationIsNotInProgress)
import Specular.Internal.Incremental.Mutable (Field(..))
import Specular.Internal.Incremental.MutableArray as MutableArray
import Specular.Internal.Incremental.Node (Node, SomeNode, Observer, toSomeNode, toSomeNodeArray)
import Specular.Internal.Incremental.Node (Node, Observer, SomeNode, toSomeNode, toSomeNodeArray)
import Specular.Internal.Incremental.Node as Node
import Specular.Internal.Incremental.Optional (Optional)
import Specular.Internal.Incremental.Optional as Optional
import Specular.Internal.Incremental.PriorityQueue as PQ
import Specular.Internal.Incremental.Ref as Ref
import Specular.Internal.Profiling as Profiling
import Specular.Internal.Queue (Queue)
import Specular.Internal.Queue as Queue
import Unsafe.Coerce (unsafeCoerce)

type Unsubscribe = Effect Unit

-- | Priority queue for propagating node changes in dependency order.
globalRecomputeQueue :: PQ.PQ SomeNode
globalRecomputeQueue = unsafePerformEffect $
Expand Down Expand Up @@ -49,6 +61,10 @@ newVar = mkEffectFn1 \val -> do
setVar :: forall a. EffectFn2 (Var a) a Unit
setVar = mkEffectFn2 \(Var node) val -> do
runEffectFn2 Node.set_value node (Optional.some val)
runEffectFn1 addToRecomputeQueue node

addToRecomputeQueue :: forall a. EffectFn1 (Node a) Unit
addToRecomputeQueue = mkEffectFn1 \node -> do
_ <- runEffectFn2 PQ.add globalRecomputeQueue (toSomeNode node)
pure unit

Expand Down Expand Up @@ -175,12 +191,30 @@ disconnect = mkEffectFn1 \node -> do

runEffectFn1 Profiling.end mark

-- * Effect queue

globalEffectQueue :: Queue (Effect Unit)
globalEffectQueue = unsafePerformEffect Queue.new

subscribeNode :: forall a. EffectFn2 (a -> Effect Unit) (Node a) Unsubscribe
subscribeNode = mkEffectFn2 \handler node -> do
let
h = mkEffectFn1 \value -> do
runEffectFn2 Queue.enqueue globalEffectQueue (handler value)
runEffectFn2 addObserver node h
pure (runEffectFn2 removeObserver node h)

-- * Recompute

stabilize :: Effect Unit
stabilize = do
mark <- runEffectFn1 Profiling.begin "stabilize"

stabilizationNum <- runEffectFn1 Ref.read globalCurrentStabilizationNum
if stabilizationNum /= stabilizationIsNotInProgress then
unsafeCrashWith "Specular: stabilize called when stabilization already in progress"
else pure unit

oldStabilizationNum <- runEffectFn1 Ref.read globalLastStabilizationNum
let currentStabilizationNum = oldStabilizationNum + 1
runEffectFn2 Ref.write globalLastStabilizationNum currentStabilizationNum
Expand All @@ -191,6 +225,10 @@ stabilize = do
runEffectFn2 Ref.write globalCurrentStabilizationNum stabilizationIsNotInProgress
runEffectFn1 Profiling.end mark

mark2 <- runEffectFn1 Profiling.begin "drainEffects"
runEffectFn2 Queue.drain globalEffectQueue (mkEffectFn1 \handler -> handler)
runEffectFn1 Profiling.end mark2

recomputeNode :: EffectFn1 SomeNode Unit
recomputeNode = mkEffectFn1 \node -> do
height <- runEffectFn1 Node.get_height node
Expand Down Expand Up @@ -282,6 +320,60 @@ mapOptional = mkEffectFn2 \fn a -> do
, dependencies: pure deps
}

data AsyncComputation a = Sync a | Async (Aff a)

data AsyncState a
= InProgress (Maybe (Either Error a))
| Finished (Either Error a)

derive instance Generic (AsyncState a) _
instance Show a => Show (AsyncState a) where
show = genericShow

mapAsync :: forall a b. EffectFn2 (a -> AsyncComputation b) (Node a) (Node (AsyncState b))
mapAsync = mkEffectFn2 \fn a -> do
let deps = [ toSomeNode a ]
task <- ExclusiveTask.new
finishedRef <- runEffectFn1 Ref.new Nothing
runEffectFn1 Node.create
{ compute: mkEffectFn1 \self -> do
-- Need to determine why we're updating - because the input changed, or because the computation finished?
finished <- runEffectFn1 Ref.read finishedRef
case finished of
Nothing -> do
value_a <- runEffectFn1 Node.valueExc a
case fn value_a of
Sync x ->
pure (Optional.some (Finished (Right x)))
Async aff -> do
nextMicrotask do
ExclusiveTask.run task do
newValue <- aff
liftEffect do
runEffectFn2 Ref.write finishedRef (Just (Right newValue))
runEffectFn1 addToRecomputeQueue self
stabilize
previous <- runEffectFn1 Node.get_value self
pure (Optional.some (InProgress (getPreviousValue previous)))
Just x -> do
-- Hmm. Can the input also be changing at the same time we're reporting the result of async computation?
-- Currently not, because we always stabilize after changing a node.
-- But if we introduced some batching later on, it could happen,
-- in which case we'd need to check `isChangingInCurrentStabilization` of our dependency.

runEffectFn2 Ref.write finishedRef Nothing
pure (Optional.some (Finished x))
, dependencies: pure deps
}

where
getPreviousValue opt
| Optional.isSome opt =
case Optional.fromSome opt of
InProgress x -> x
Finished x -> Just x
| otherwise = Nothing

map2 :: forall a b c. EffectFn3 (Fn2 a b c) (Node a) (Node b) (Node c)
map2 = mkEffectFn3 \fn a b -> do
let deps = [ toSomeNode a, toSomeNode b ]
Expand Down
16 changes: 8 additions & 8 deletions test/Test/Utils.purs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ module Test.Utils

import Prelude hiding (append)

import Effect.Aff (Aff, Milliseconds(..), delay)
import Effect (Effect)
import Effect.Class (liftEffect)
import Data.Array (snoc)
import Effect (Effect)
import Effect.Aff (Aff, Milliseconds(..), delay)
import Effect.Class (class MonadEffect, liftEffect)
import Effect.Ref (Ref, modify_, new, read, write)
import Test.Spec.Assertions (fail, shouldEqual)
import Type.Prelude (Proxy)
import Effect.Uncurried (runEffectFn1)
import Specular.Internal.Incremental.Global (globalTotalRefcount)
import Specular.Internal.Incremental.Ref as Ref
import Test.Spec.Assertions (fail, shouldEqual)
import Type.Prelude (Proxy)

append :: forall a. Ref (Array a) -> a -> Effect Unit
append ref value = modify_ (\a -> snoc a value) ref
append :: forall m a. MonadEffect m => Ref (Array a) -> a -> m Unit
append ref value = liftEffect $ modify_ (\a -> snoc a value) ref

clear :: forall a. Ref (Array a) -> Aff Unit
clear :: forall m a. MonadEffect m => Ref (Array a) -> m Unit
clear ref = liftEffect $ write [] ref

shouldHaveValue :: forall a. Eq a => Show a => Ref a -> a -> Aff Unit
Expand Down
Loading