Skip to content

Commit 6b35d58

Browse files
committed
added Cmd.batchedThrottle
1 parent 01a105d commit 6b35d58

File tree

2 files changed

+133
-1
lines changed

2 files changed

+133
-1
lines changed

src/Fabulous.Tests/CmdTests.fs

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ namespace Fabulous.Tests
33
open Fabulous
44
open NUnit.Framework
55

6-
type CmdTestsMsg = NewValue of int
6+
type CmdTestsMsg =
7+
| NewValue of int
8+
| NewValues of int list
79

810
module CmdTestsHelper =
911
let execute dispatch (cmd: Cmd<'msg>) =
@@ -149,3 +151,64 @@ type ``Cmd tests``() =
149151
Assert.AreEqual(2, messageCount)
150152
Assert.AreEqual(Some(NewValue 2), actualValue)
151153
}
154+
155+
[<Test>]
156+
member _.``Cmd.batchedThrottle dispatches all undispatched values on interval expiry``() =
157+
async {
158+
let mutable messageCount = 0
159+
let mutable dispatched = [] // records dispatched messages latest first
160+
161+
let dispatch msg =
162+
messageCount <- messageCount + 1
163+
dispatched <- msg :: dispatched
164+
165+
let batchedThrottleCmd = Cmd.batchedThrottle 100 NewValues
166+
167+
batchedThrottleCmd 1 |> CmdTestsHelper.execute dispatch
168+
batchedThrottleCmd 2 |> CmdTestsHelper.execute dispatch
169+
batchedThrottleCmd 3 |> CmdTestsHelper.execute dispatch
170+
batchedThrottleCmd 4 |> CmdTestsHelper.execute dispatch
171+
172+
do! Async.Sleep 200 // Wait longer than the throttle interval
173+
174+
// All three values should have been dispatched
175+
Assert.AreEqual(2, messageCount)
176+
Assert.AreEqual([ NewValues [ 2; 3; 4 ]; NewValues [ 1 ] ], dispatched)
177+
}
178+
179+
[<Test>]
180+
member _.``Cmd.batchedThrottle dispatches messages immediately if interval not expired``() =
181+
async {
182+
let mutable messageCount = 0
183+
let mutable dispatched = [] // records dispatched messages latest first
184+
185+
let dispatch msg =
186+
messageCount <- messageCount + 1
187+
dispatched <- msg :: dispatched
188+
189+
let batchedThrottleCmd = Cmd.batchedThrottle 100 NewValues
190+
191+
batchedThrottleCmd 1 |> CmdTestsHelper.execute dispatch
192+
batchedThrottleCmd 2 |> CmdTestsHelper.execute dispatch
193+
194+
// Only the first value should have been dispatched immediately
195+
Assert.AreEqual(1, messageCount)
196+
Assert.AreEqual([ NewValues[1] ], dispatched)
197+
198+
(* Wait for longer than twice the throttle interval,
199+
giving second value time to dispatch and elapsing time until next dispatch *)
200+
do! Async.Sleep 210
201+
202+
batchedThrottleCmd 3 |> CmdTestsHelper.execute dispatch
203+
batchedThrottleCmd 4 |> CmdTestsHelper.execute dispatch
204+
205+
// Second value should have dispatched delayed, third immediately
206+
Assert.AreEqual(3, messageCount)
207+
Assert.AreEqual([ NewValues[3]; NewValues[2]; NewValues[1] ], dispatched)
208+
209+
do! Async.Sleep 110 // Wait longer than the throttle interval
210+
211+
// All values should have been dispatched eventually
212+
Assert.AreEqual(4, messageCount)
213+
Assert.AreEqual([ NewValues[4]; NewValues[3]; NewValues[2]; NewValues[1] ], dispatched)
214+
}

src/Fabulous/Cmd.fs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,3 +311,72 @@ module Cmd =
311311
},
312312
cts.Token
313313
)) ]
314+
315+
/// <summary>
316+
/// Creates a factory for Commands that dispatch messages with a list of pending values at a fixed maximum rate,
317+
/// ensuring that all pending values are dispatched when the specified interval elapses.
318+
/// This function is similar to <see cref="bufferedThrottle"/>, but instead of dispatching only the last value,
319+
/// it remembers and dispatches all undispatched values within the specified interval.
320+
/// Helpful for scenarios where you want to throttle messages but cannot afford to lose any of the values they carry,
321+
/// ensuring all values are processed at a controlled rate.
322+
/// Note that this function creates an object with internal state and is intended to be used per Program
323+
/// or longer-running background process rather than once per message in the update function.
324+
/// </summary>
325+
/// <param name="interval">The minimum time interval between two consecutive Command executions in milliseconds.</param>
326+
/// <param name="fn">A function that maps a list of factory input values to a message for dispatch.</param>
327+
/// <returns>
328+
/// A Command factory function that maps a list of input values to a Command which dispatches a message (mapped from the pending values),
329+
/// either immediately or after a delay respecting the interval, while remembering and dispatching all remembered values
330+
/// when the interval has elapsed, ensuring no values are lost.
331+
/// </returns>
332+
let batchedThrottle (interval: int) (mapValuesToMsg: 'value list -> 'msg) : 'value -> Cmd<'msg> =
333+
let rateLimit = System.TimeSpan.FromMilliseconds(float interval)
334+
let funLock = obj() // ensures safe access to resources shared across different threads
335+
let mutable lastDispatch = System.DateTime.MinValue
336+
let mutable pendingValues: 'value list = []
337+
let mutable cts: CancellationTokenSource = null // if set, allows cancelling the last issued Command
338+
339+
// dispatches all pendingValues and resets them while updating lastDispatch
340+
let dispatchBatch (dispatch: 'msg -> unit) =
341+
// Dispatch in the order they were received
342+
pendingValues |> List.rev |> mapValuesToMsg |> dispatch
343+
344+
lastDispatch <- System.DateTime.UtcNow
345+
pendingValues <- []
346+
347+
// Return a factory function mapping input values to sleeping Commands dispatching all pending messages
348+
fun (value: 'value) ->
349+
[ fun dispatch ->
350+
lock funLock (fun () ->
351+
let now = System.DateTime.UtcNow
352+
let elapsedSinceLastDispatch = now - lastDispatch
353+
pendingValues <- value :: pendingValues
354+
355+
// If the interval has elapsed since the last dispatch, dispatch all pending messages
356+
if elapsedSinceLastDispatch >= rateLimit then
357+
dispatchBatch dispatch
358+
else // schedule dispatch
359+
360+
// if the the last sleeping dispatch can still be cancelled, do so
361+
if cts <> null then
362+
cts.Cancel()
363+
cts.Dispose()
364+
365+
// used to enable cancelling this dispatch if newer values come into the factory
366+
cts <- new CancellationTokenSource()
367+
368+
Async.Start(
369+
async {
370+
// wait only as long as we have to before next dispatch
371+
do! Async.Sleep(rateLimit - elapsedSinceLastDispatch)
372+
373+
lock funLock (fun () ->
374+
dispatchBatch dispatch
375+
376+
// done; invalidate own cancellation
377+
if cts <> null then
378+
cts.Dispose()
379+
cts <- null)
380+
},
381+
cts.Token
382+
)) ]

0 commit comments

Comments
 (0)