August 24, 2024
Disclaimer: I'm new to ZIO Schedule, and I have a sneaking suspicion that I missed something in the documentation.
I am working on a project that requires multiple processes to continually chew through relational data, forever. The processes need to fire fast when there is data, and use an exponential backoff when there is no data.
Given the codebase already uses the ZIO library, I started exploring the documentation looking for recommend ways of working with long running processes. I discovered that ZIO has the ability to elegantly Schedule recurring effects, and this appeared to be the best path forward for trying to tackle this problem.
My first attempt at creating a naive forever looping process, looked like this:
val schedule = Schedule.forever val loop = for { _ <- printLine("Hello World") quantityProcessed <- ZIO.succeed(10) } yield quantityProcessed loop.repeat(schedule)
The code when executed produced a never ending stream of Hello World. Certainly a step in the right direction, but it needed to be refined.
The schedule class contains many pre-defined schedules and combinators, such as recurs, spaced, exponential, zip, whileInput, andThen. Mixing these methods together produces a wide range of scheduling behaviours. My second attempt was almost exactly what I needed.
// While the input (data size) is greater than zero, run forever. val hasDataSchedule = Schedule.forever.whileInput[Int](_ > 0) // While the output duration of the exponential is less than 30s, continue to recur. val exponentialBackoff = Schedule .exponential(50.milliseconds) .whileOutput(_ < 30.seconds) // Space recurrences at 30s. val maxSpacing = Schedule.spaced(30.seconds) // Once the exponential backoff stops then the max spacing schedule takes over. val noDataSchedule = exponentialBackoff andThen maxSpacing // Intersect the two schedules so the smallest duration will be used, recurs if // either recurs. val schedule = hasDataSchedule || noDataSchedule
Note: the above code excludes the for-comprehension and loop code.
If you run the above code, at first glance it produces the desired scheduling behaviour, but there's a subtle issue with the exponential backoff - it doesn't reset when it encounters input data. The exponential backoff resumes at the last duration in state. This is the desired default behaviour for exponential backoff, but in the context of a loop, the state should reset when an input predicate is true.
The sample data and comments below illustrate where the exponential backoff should reset and increment.
val sampleData = Seq( Set(4, 5, 6), // Data, fire immediately Set(7, 8, 9), // Data, fire immediately Set(), // No data, start exponential backoff Set(), // No data, increment backoff Set(), // No data, increment backoff Set(), // No data, increment backoff Set(1, 2, 3), // Data, reset backoff, fire immediately Set(2, 3, 7), // Data, fire immediately Set(8, 2, 1), // Data, fire immediately Set(), // No data, start exponential backoff Set(), // No data, increment backoff )
The Schedule methods that would reset the exponential backoff on input data eluded me. The methods that jumped out as potential candidates for solving this problem were resetWhen(f: Out => Boolean) and resetAfter(duration: Duration). Using the output, or an arbitrary duration to determine when to reset the backoff wastes unnecessary time.
There is probably a way to pass the input data through using built in methods, but it wasn't clear to me how to do that. A reset that is based on the input data would be perfect — unfortunately I couldn't find one.
I felt the need to write a function that would reset the exponential backoff when input data was present. Scala has an amazing feature called extension methods, formerly known as implicits. With extension methods any type already defined can have new methods added to it; this allows a method like resetWhenInput to be added to the Schedule type:
extension (currentSchedule: Schedule): def resetWhenInput(predicate): Schedule = ???
When extension methods are imported into a file they can be called using the dot operator on the instance of the type extended, for example: noDataSchedule.resetWhenInput().
The input should be a predicate function that takes an argument and returns a boolean, f(A) => Boolean. The input should also accept anything that is a subtype, conforms-to, or extends that type, [B <: A]. The new method signature looks like this resetWhenInput[B <: A](f: B => Boolean).
A Schedule[Env, In, Out] defines a recurring schedule, which consumes values of type In, and which returns values of type Out.
The extension method must also incorporate the generic Schedule types. Those generic Schedule types are [Env, In, Out]. Merging the predicate types with the Schedule generic types produces the following method signature:
extension [Env, In, Out](self: Schedule[Env, In, Out]): def resetWhenInput[B <: In](f: B => Boolean): Schedule.WithState[self.State, Env, B, Out] = ???
The final stretch requires digging into the inner workings of Schedule. There is a State type, initial state, and a step function. The step function is the most important part for determining whether to continue (step) or not. Essentially if the predicate is true, then use the initial state ("reset"), otherwise use the current state.
extension [Env, In, Out](self: Schedule[Env, In, Out]) def resetWhenInput[In1 <: In](f: In1 => Boolean) = new Schedule[Env, In1, Out] { override type State = self.State override final val initial: State = self.initial override final def step(now: OffsetDateTime, in: In1, state: State)( implicit trace: Trace ): ZIO[Env, Nothing, (State, Out, Decision)] = { if (f(in)) then self.step(now, in, self.initial) else self.step(now, in, state) } }
To verify this method works as expected:
// TODO: TESTS
Adding the new resetWhenInput method to the exponential backoff produces the desired scheduling outcome. For convenience, I wrapped the schedule in a function for quick usage and easy testing.
def foreverWithBackoff( base: Duration = 50.milliseconds, max: Duration = 30.seconds, ) = { val hasDataSchedule = Schedule.forever.whileInput[Int](_ > 0) val exponentialBackoff = Schedule.exponential(base).whileOutput(_ < max) val noDataSchedule = (exponentialBackoff andThen Schedule.spaced(max)) .resetWhenInput[Int](_ > 0) hasDataSchedule || noDataSchedule }
Testing functions that deal with the passage of time can be a slow and painful process, but ZIO only describes the effects, so the passage of time can be adjusted. For example, to adjust the passage of time by 2 minutes in tests, use TestClock.adjust(2.minutes). This will give instant results, but the code will behave as if 2 minutes has passed. The clock adjustment is really helpful when testing long running processes.
In the sample data below, the first repeat doesn't take place until after the loop function has been executed once. Also important to know, the ZIO exponential function is defined as base * factor ^ n. With a base of 50ms, and a default factor of 2, the first no-data repeat will be 100ms (50 * 2 ^ 1 == 100) or 0.1s.
object ScheduleSpec extends ZIOSpecDefault { def spec = suiteAll("Schedule") { val sampleData: Array[Set[Int]] = Array( // Next Repeat Elapsed Time // =========================== Set(4, 5, 6), // 0.0s 0.0s Set(7, 8, 9), // 0.0s 0.0s <- first repeat executed Set(), // 0.1s 0.0s Set(), // 0.2s 0.1s Set(), // 0.4s 0.3s Set(), // 0.8s 0.7s Set(), // 1.6s 1.5s Set(), // 3.2s 3.1s Set(), // 6.4s 6.3s Set(), // 12.8s 12.7s Set(), // 25.6s 25.5s Set(), // 30.0s 51.1s Set(), // 30.0s 81.1s Set(1, 2, 3), // 0.0s 111.1s Set(), // 0.1s 111.1s Set(), // 0.2s 111.2s Set(), // 0.4s 111.4s Set(), // 0.8s 111.8s Set(), // 1.6s 112.6s Set(), // 3.2s 114.2s Set(), // 6.4s 117.4s Set(), // 12.8s 123.8s ) // Increment the index but respect the length of sample data. val incrementIndex = (currentIndex: Int) => { val newIndex = currentIndex + 1 if newIndex > sampleData.length - 1 then 0 else newIndex } /** * This is an impure function, it is mutating indexRef. This is only * to make the tests more readable, don't do this. */ val loop = (indexRef: Ref[Int]) => for { index <- indexRef.get time <- currentTime(TimeUnit.MILLISECONDS) data = sampleData(index) _ <- printLine(s"$index: $data, $time") _ <- indexRef.update(incrementIndex) } yield data.size test("Exponential backoff") { for { indexRef <- Ref.make(0) fiber <- loop(indexRef).repeat(foreverWithBackoff()).fork _ <- TestClock.adjust(13.seconds) index <- indexRef.get } yield assertTrue(index == 10) } test("Exponential backoff with input data reset") { for { indexRef <- Ref.make(0) fiber <- loop(indexRef).repeat(foreverWithBackoff()).fork _ <- TestClock.adjust(115.seconds) index <- indexRef.get } yield assertTrue(index == 20) } } }
… and Voila! Green tests across the board. Writing this function didn't take very long, thanks to the great ZIO documentation, both on the web and in the code. In fact this blog post took 1,000,000 times longer to write than the function.
I hope some part of this code journey was helpful for you.