Skip to content

Commit 63f739a

Browse files
committed
rewrote Cmd.batchedThrottle to Dispatch.batchThrottled extension
because it feels more natural to use it that way with a dispatch inside an ofEffect that produces values rapidly
1 parent 0cc2a6a commit 63f739a

File tree

2 files changed

+72
-71
lines changed

2 files changed

+72
-71
lines changed

src/Fabulous.Tests/CmdTests.fs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ type ``Cmd tests``() =
153153
}
154154

155155
[<Test>]
156-
member _.``Cmd.batchedThrottle dispatches all undispatched values on interval expiry``() =
156+
member _.``Dispatch.batchThrottled dispatches all undispatched values on interval expiry``() =
157157
async {
158158
let mutable messageCount = 0
159159
let mutable dispatched = [] // records dispatched messages latest first
@@ -162,12 +162,12 @@ type ``Cmd tests``() =
162162
messageCount <- messageCount + 1
163163
dispatched <- msg :: dispatched
164164

165-
let batchedThrottleCmd, _ = Cmd.batchedThrottle 100 NewValues
165+
let batchedThrottleCmd, _ = dispatch.batchThrottled(100, NewValues)
166166

167-
batchedThrottleCmd 1 |> CmdTestsHelper.execute dispatch
168-
batchedThrottleCmd 2 |> CmdTestsHelper.execute dispatch
169-
batchedThrottleCmd 3 |> CmdTestsHelper.execute dispatch
170-
batchedThrottleCmd 4 |> CmdTestsHelper.execute dispatch
167+
batchedThrottleCmd 1
168+
batchedThrottleCmd 2
169+
batchedThrottleCmd 3
170+
batchedThrottleCmd 4
171171

172172
do! Async.Sleep 200 // Wait longer than the throttle interval
173173

@@ -177,7 +177,7 @@ type ``Cmd tests``() =
177177
}
178178

179179
[<Test>]
180-
member _.``Cmd.batchedThrottle dispatches messages immediately if interval not expired``() =
180+
member _.``Dispatch.batchThrottled dispatches messages immediately if interval not expired``() =
181181
async {
182182
let mutable messageCount = 0
183183
let mutable dispatched = [] // records dispatched messages latest first
@@ -186,10 +186,10 @@ type ``Cmd tests``() =
186186
messageCount <- messageCount + 1
187187
dispatched <- msg :: dispatched
188188

189-
let batchedThrottleCmd, _ = Cmd.batchedThrottle 100 NewValues
189+
let batchedThrottleCmd, _ = dispatch.batchThrottled(100, NewValues)
190190

191-
batchedThrottleCmd 1 |> CmdTestsHelper.execute dispatch
192-
batchedThrottleCmd 2 |> CmdTestsHelper.execute dispatch
191+
batchedThrottleCmd 1
192+
batchedThrottleCmd 2
193193

194194
// Only the first value should have been dispatched immediately
195195
Assert.AreEqual(1, messageCount)
@@ -199,8 +199,8 @@ type ``Cmd tests``() =
199199
giving second value time to dispatch and elapsing time until next dispatch *)
200200
do! Async.Sleep 210
201201

202-
batchedThrottleCmd 3 |> CmdTestsHelper.execute dispatch
203-
batchedThrottleCmd 4 |> CmdTestsHelper.execute dispatch
202+
batchedThrottleCmd 3
203+
batchedThrottleCmd 4
204204

205205
// Second value should have dispatched delayed, third immediately
206206
Assert.AreEqual(3, messageCount)
@@ -214,7 +214,7 @@ type ``Cmd tests``() =
214214
}
215215

216216
[<Test>]
217-
member _.``Cmd.batchedThrottle factory can be awaited for completion``() =
217+
member _.``Dispatch.batchThrottled factory can be awaited for completion``() =
218218
async {
219219
let mutable messageCount = 0
220220
let mutable dispatched = [] // records dispatched messages latest first
@@ -223,10 +223,10 @@ type ``Cmd tests``() =
223223
messageCount <- messageCount + 1
224224
dispatched <- msg :: dispatched
225225

226-
let createCmd, awaitNextDispatch = Cmd.batchedThrottle 100 NewValues
226+
let createCmd, awaitNextDispatch = dispatch.batchThrottled(100, NewValues)
227227

228-
createCmd 1 |> CmdTestsHelper.execute dispatch
229-
createCmd 2 |> CmdTestsHelper.execute dispatch
228+
createCmd 1
229+
createCmd 2
230230

231231
// Only the first value should have been dispatched immediately
232232
Assert.AreEqual(1, messageCount)

src/Fabulous/Cmd.fs

Lines changed: 56 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
namespace Fabulous
22

3+
open System.Runtime.CompilerServices
34
open System.Threading
45
open System.Threading.Tasks
56

@@ -312,27 +313,28 @@ module Cmd =
312313
cts.Token
313314
)) ]
314315

316+
type DispatchExtensions =
317+
315318
/// <summary>
316-
/// Creates a factory for Commands that dispatch messages with a list of pending values at a fixed maximum rate,
317-
/// ensuring that all pending values are dispatched when the specified interval elapses.
318-
/// This function is similar to <see cref="bufferedThrottle"/>, but instead of dispatching only the last value,
319-
/// it remembers and dispatches all undispatched values within the specified interval.
320-
/// Helpful for scenarios where you want to throttle messages but cannot afford to lose any of the values they carry,
321-
/// ensuring all values are processed at a controlled rate.
319+
/// Creates a throttled dispatch factory that dispatches values in batches at a fixed minimum interval/maximum rate
320+
/// while ensuring that all values are dispatched eventually.
321+
/// This helps throttle the message dispatch of a rapid producer to avoid overloading the MVU loop
322+
/// without dropping any of the carried values - ensuring all values are processed in batches at a controlled rate.
322323
/// Note that this function creates an object with internal state and is intended to be used per Program
323324
/// or longer-running background process rather than once per message in the update function.
324325
/// </summary>
325-
/// <param name="interval">The minimum time interval between two consecutive Command executions in milliseconds.</param>
326-
/// <param name="fn">A function that maps a list of factory input values to a message for dispatch.</param>
326+
/// <param name="interval">The minimum time interval between two consecutive dispatches in milliseconds.</param>
327+
/// <param name="mapBatchToMsg">A function that maps a list of pending input values to a message for dispatch.</param>
327328
/// <returns>
328-
/// Two methods - the first being a Command factory function that maps a list of input values to a Command
329-
/// which dispatches a message (mapped from the pending values),
330-
/// either immediately or after a delay respecting the interval, while remembering and dispatching all remembered values
331-
/// when the interval has elapsed, ensuring no values are lost.
332-
/// The second can be used for awaiting the next dispatch from the outside while adding some buffer time.
329+
/// Two functions. The first has a Dispatch signature and is used to feed a single value into the factory,
330+
/// where it is either dispatched immediately or after a delay respecting the interval,
331+
/// batched with other pending values in the order they were fed in.
332+
/// The second can be used for awaiting the next dispatch from the outside
333+
/// - while optionally adding some buffer time (in milliseconds) to account for race condiditions.
333334
/// </returns>
334-
let batchedThrottle (interval: int) (mapValuesToMsg: 'value list -> 'msg) : ('value -> Cmd<'msg>) * (System.TimeSpan option -> Async<unit>) =
335-
let rateLimit = System.TimeSpan.FromMilliseconds(float interval)
335+
[<Extension>]
336+
static member batchThrottled((dispatch: Dispatch<'msg>), interval, (mapBatchToMsg: 'value list -> 'msg)) =
337+
let rateLimit = System.TimeSpan.FromMilliseconds(interval)
336338
let funLock = obj() // ensures safe access to resources shared across different threads
337339
let mutable lastDispatch = System.DateTime.MinValue
338340
let mutable pendingValues: 'value list = []
@@ -343,49 +345,48 @@ module Cmd =
343345
lastDispatch.Add(rateLimit) - System.DateTime.UtcNow
344346

345347
// dispatches all pendingValues and resets them while updating lastDispatch
346-
let dispatchBatch (dispatch: 'msg -> unit) =
348+
let dispatchBatch () =
347349
// Dispatch in the order they were received
348-
pendingValues |> List.rev |> mapValuesToMsg |> dispatch
350+
pendingValues |> List.rev |> mapBatchToMsg |> dispatch
349351

350352
lastDispatch <- System.DateTime.UtcNow
351353
pendingValues <- []
352354

353-
// a factory function mapping input values to sleeping Commands dispatching all pending messages
354-
let factory =
355+
// a function with the Dispatch signature for feeding a single value into the throttled batch factory
356+
let dispatchSingle =
355357
fun (value: 'value) ->
356-
[ fun dispatch ->
357-
lock funLock (fun () ->
358-
let untilNextDispatch = getTimeUntilNextDispatch()
359-
pendingValues <- value :: pendingValues
360-
361-
// If the interval has elapsed since the last dispatch, dispatch all pending messages
362-
if untilNextDispatch <= System.TimeSpan.Zero then
363-
dispatchBatch dispatch
364-
else // schedule dispatch
365-
366-
// if the the last sleeping dispatch can still be cancelled, do so
367-
if cts <> null then
368-
cts.Cancel()
369-
cts.Dispose()
370-
371-
// used to enable cancelling this dispatch if newer values come into the factory
372-
cts <- new CancellationTokenSource()
373-
374-
Async.Start(
375-
async {
376-
// wait only as long as we have to before next dispatch
377-
do! Async.Sleep(untilNextDispatch)
378-
379-
lock funLock (fun () ->
380-
dispatchBatch dispatch
381-
382-
// done; invalidate own cancellation
383-
if cts <> null then
384-
cts.Dispose()
385-
cts <- null)
386-
},
387-
cts.Token
388-
)) ]
358+
lock funLock (fun () ->
359+
let untilNextDispatch = getTimeUntilNextDispatch()
360+
pendingValues <- value :: pendingValues
361+
362+
// If the interval has elapsed since the last dispatch, dispatch all pending messages
363+
if untilNextDispatch <= System.TimeSpan.Zero then
364+
dispatchBatch()
365+
else // schedule dispatch
366+
367+
// if the the last sleeping dispatch can still be cancelled, do so
368+
if cts <> null then
369+
cts.Cancel()
370+
cts.Dispose()
371+
372+
// used to enable cancelling this dispatch if newer values come into the factory
373+
cts <- new CancellationTokenSource()
374+
375+
Async.Start(
376+
async {
377+
// wait only as long as we have to before next dispatch
378+
do! Async.Sleep(untilNextDispatch)
379+
380+
lock funLock (fun () ->
381+
dispatchBatch()
382+
383+
// done; invalidate own cancellation
384+
if cts <> null then
385+
cts.Dispose()
386+
cts <- null)
387+
},
388+
cts.Token
389+
))
389390

390391
// a function to wait until after the next async dispatch + some buffer time to ensure the dispatch is complete
391392
let awaitNextDispatch buffer =
@@ -395,12 +396,12 @@ module Cmd =
395396
let untilAfterNextDispatch =
396397
getTimeUntilNextDispatch()
397398
+ match buffer with
398-
| Some value -> value
399+
| Some value -> System.TimeSpan.FromMilliseconds(value)
399400
| None -> System.TimeSpan.Zero
400401

401402
if untilAfterNextDispatch > System.TimeSpan.Zero then
402403
do! Async.Sleep(untilAfterNextDispatch)
403404
})
404405

405-
// return both the factory and the await helper
406-
factory, awaitNextDispatch
406+
// return both the dispatch and the await helper
407+
dispatchSingle, awaitNextDispatch

0 commit comments

Comments
 (0)