package async_extra

  1. Overview
  2. Docs

Extends Core.Schedule_v5 with functions for asynchronously handling events in the schedule.

include module type of struct include Core.Schedule_v5 end

Overview

A Schedule.t describes a (potentially repeating) schedule by selecting a subset of all seconds using the set operations in t. For example:

  • every 5 min after the hour :

    Mins [ 5 ]
  • 9am to 10am every day :

    Between (Time.Ofday.create ~hr:9 (), Time.Ofday.create ~hr:10 ())
  • Every weekday at 3pm:

    And
            [ Weekdays [ Mon; Tue; Wed; Thu; Fri ]
            ; At [ Time.Ofday.create ~hr:15 () ] ]
  • On the 15th of every month at midnight:

    And
            [ Days [ 15 ]
            ; At [ Time.Ofday.start_of_day ] ]
  • 9:30 am on weekends, and 5 am on weekdays

    Or
            [ And
                [ Weekdays [ Sat; Sun ]
                ; At [ Time.Ofday.create ~hr:9 ~min:30 () ] ]
            ; And
                [ Weekdays [ Mon; Tue; Wed; Thu; Fri ]
                ; At [ Time.Ofday.create ~hr:5 () ] ]
            ]

Zones and Tags

On top of this selection language there are two labeling branches of the variant that are important.

In_zone (zone, t) expresses that all of t should be evaluated relative to the time zone given.

Tag (tag, t) tags anything matching t with tag.

Combining these we can express something complex like the on-call groups across three offices:

let weekdays         = Weekdays Day_of_week.weekdays in
let working_hours    = Between Time.Ofday.((create ~hr:8 (), create ~hr:18 ())) in
let working_schedule = And [ weekdays; working_hours ] in
let offices =
  let (!!) = Time.Zone.find_exn in
  Location.Abbrev.([
    tot, !!"America/New_York"
  ; hkg, !!"Asia/Hong_Kong"
  ; ldn, !!"Europe/London" ])
in
List.map offices ~f:(fun (office, zone) ->
  In_zone (zone, Tag (office, working_schedule)))

after which we can use the tags function to extract the groups on call at any moment.

Daylight Savings Time

Schedules are expressed in terms of wall clock time, and as such have interesting behavior around daylight savings time boundaries. There are two circumstances that might affect a schedule. The first is a repeated time, which occurs when time jumps back (e.g., 2:30 may happen twice in one day). The second is a skipped time, which occurs when time jumps forward by an hour.

In both cases Schedule does the naive thing: if the time happens twice and is included in the schedule, it is included twice; if it never happens, Schedule makes no special attempt to artificially include it.

Interface

type zoned = Core.Schedule_v5.zoned =
  1. | Zoned

These phantom types are concrete and exposed to help the compiler understand that zoned and unzoned cannot be the same type (which it could not know if they were abstract), which helps it infer the injectivity of the type t below.

val compare_zoned : zoned -> zoned -> int
type unzoned = Core.Schedule_v5.unzoned =
  1. | Unzoned
val compare_unzoned : unzoned -> unzoned -> int
  • In_zone: see the discussion under Zones and Tags above
  • Tag: see the discussion under Zones and Tags above
  • And, Or, Not: correspond to the set operations intersection, union, and complement.
  • If_then_else (A, B, C): corresponds to (A && B) || (NOT A && C), useful for dealing with schedules that change during certain times of the year (holidays, etc.)
  • At: the exact times given on every day
  • Shift: shifts an entire schedule forward or backwards by a known span. For example:

    • Shift ((sec 3.), Secs[10]) = Secs [13]
    • Shift ((sec (-3.)), Secs[10]) = Secs [7]
  • Between: the contiguous range between the start and end times given. The schedule can span the midnight boundary. If the start and end ofdays are the same then:

    | Exclusive, Exclusive  -> Not At
          | Inclusive, Inclusive -> At
          | Exclusive, Inclusive
          | Inclusive, Exclusive -> Always
  • Zoned_between: As Between, but allows expressing the start and end times in different zones
  • Secs: the given seconds of a minute, repeated every minute
  • Mins: the given minutes of an hour, repeated every hour
  • Hours: the given hours of a day, repeated every day
  • Weekdays: the given weekdays, repeated every week
  • Days: the given calendar dates, repeated every month (if the day occurs in that month)
  • Weeks: the given weeks (numbered by ISO 8601), repeated every year
  • Months: the given months, repeated every year
  • On: the exact dates given
  • Before: all seconds before the given boundary (inclusive or exclusive)
  • After: all seconds after the given boundary (inclusive or exclusive)
  • Always: the set of all seconds
  • Never: the empty set

'a indicates whether the schedule currently has an established zone.

'b is the type of the tag used in this schedule. In many cases it can be unspecified. See tags for more.

Items that take int lists silently ignore ints outside of the viable range. E.g. Days [32] will never occur.

module Inclusive_exclusive = Core.Schedule_v5.Inclusive_exclusive
type ('a, 'b) t = ('a, 'b) Core.Schedule_v5.t =
  1. | Tag : 'b * ('a, 'b) t -> ('a, 'b) t
  2. | And : ('a, 'b) t list -> ('a, 'b) t
  3. | Or : ('a, 'b) t list -> ('a, 'b) t
  4. | Not : ('a, 'b) t -> ('a, 'b) t
  5. | If_then_else : (('a, 'b) t * ('a, 'b) t * ('a, 'b) t) -> ('a, 'b) t
  6. | Shift : Core_kernel.Core_kernel_private.Time_float0.Span.t * ('a, 'b) t -> ('a, 'b) t
  7. | Between : (Inclusive_exclusive.t * Core_kernel.Core_kernel_private.Time_float0.underlying) * (Inclusive_exclusive.t * Core_kernel.Core_kernel_private.Time_float0.underlying) -> (unzoned, 'b) t
  8. | At : Core_kernel.Core_kernel_private.Time_float0.underlying list -> (unzoned, 'b) t
  9. | Secs : int list -> (unzoned, 'b) t
  10. | Mins : int list -> (unzoned, 'b) t
  11. | Hours : int list -> (unzoned, 'b) t
  12. | Weekdays : Core_kernel.Day_of_week.t list -> (unzoned, 'b) t
  13. | Days : int list -> (unzoned, 'b) t
  14. | Weeks : int list -> (unzoned, 'b) t
  15. | Months : Core_kernel.Month.t list -> (unzoned, 'b) t
  16. | Always : ('a, 'b) t
  17. | Never : ('a, 'b) t
val compare : ('a -> 'a -> int) -> ('b -> 'b -> int) -> ('a, 'b) t -> ('a, 'b) t -> int
type 'b zoned_t = (zoned, 'b) t
val sexp_of_zoned_t : ('b -> Ppx_sexp_conv_lib.Sexp.t) -> 'b zoned_t -> Ppx_sexp_conv_lib.Sexp.t
val to_string_zoned : (zoned, 'b) t -> string_of_tag:('b -> string) -> string

Returns a string suitable for debugging purposes.

module Stable = Core.Schedule_v5.Stable
val includes : (zoned, 'b) t -> Core_kernel.Time.t -> bool

includes t time is true if the second represented by time falls within the schedule t.

val tags : (zoned, 'tag) t -> Core_kernel.Time.t -> [ `Not_included | `Included of 'tag list ]

tags t time = `Not_included iff not (includes t time). Otherwise, tags t time = `Included lst, where lst includes all tags of a schedule such that includes t' time is true where t' is a tagged branch of the schedule. E.g., for some t equal to Tag some_tag t', tags t time will return some_tag if and only if includes t' time returns true.

For a more interesting use case, consider the per-office on-call schedule example given in the beginning of this module. Note that a schdeule may have no tags, and therefore, lst can be empty.

val all_tags : (zoned, 'tag) t -> tag_comparator:('tag, 'cmp) Core_kernel.Comparator.t -> ('tag, 'cmp) Core_kernel.Set.t
val fold_tags : (zoned, 'tag) t -> init:'m -> f:('m -> 'tag -> 'm) -> Core_kernel.Time.t -> 'm option

fold_tags t ~init ~f time is nearly behaviorally equivalent to (but more efficient than) List.fold ~init ~f (tags t time), with the exception that it returns None if includes t time is false. It is important that f be pure, as its results may be discarded.

val map_tags : ('a, 'b) t -> f:('b -> 'c) -> ('a, 'c) t

Returns a sequence of schedule changes over time that will never end.

If your schedules ends, you will continue to receive `No_change_until_at_least with increasing times forever.

The return type indicates whether includes t start_time is true and delivers a sequence of subsequent changes over time.

The times returned by the sequence are strictly increasing and never less than start_time. That is, `No_change_until_at_least x can never be followed by `Enter x, only by (at least) `Enter (x + 1s).

if emit is set to Transitions_and_tag_changes then all changes in tags will be present in the resulting sequence. Otherwise only the tags in effect when a schedule is entered are available.

The `In_range | `Out_of_range flag in `No_change_until_at_least indicates whether the covered range is entirely within, or outside of the time covered by the schedule and is only there to help with bookkeeping for the caller. `In_range | `Out_of_range will never disagree with what could be inferred from the `Enter and `Leave events.

The sequence takes care to do only a small amount of work between each element, so that pulling the next element of the sequence is always cheap. This is the primary motivation behind including `No_change_until_at_least.

The Time.t returned by `No_change_until_at_least is guaranteed to be a reasonable amount of time in the future (at least 1 hour).

module Event = Core.Schedule_v5.Event
type ('tag, 'a) emit = ('tag, 'a) Core.Schedule_v5.emit =
  1. | Transitions : ('tag, [ Event.no_change | 'tag Event.transition ]) emit
  2. | Transitions_and_tag_changes : ('tag -> 'tag -> bool) -> ('tag, [ Event.no_change | 'tag Event.transition | 'tag Event.tag_change ]) emit

In Transitions_and_tag_changes, equality for the tag type must be given.

val to_endless_sequence : (zoned, 'tag) t -> start_time:Core_kernel.Time.t -> emit:('tag, 'a) emit -> [ `Started_in_range of 'tag list * 'a Core_kernel.Sequence.t | `Started_out_of_range of 'a Core_kernel.Sequence.t ]
val next_enter_between : (zoned, 'tag) t -> Core_kernel.Time.t -> Core_kernel.Time.t -> Core_kernel.Time.t option

next_enter_between t start end The given start end range is inclusive on both ends. This function is useful for one-off events during the run of a program. If you want to track changes to a schedule over time it is better to call to_endless_sequence.

val next_leave_between : (zoned, 'tag) t -> Core_kernel.Time.t -> Core_kernel.Time.t -> Core_kernel.Time.t option

Like next_enter_between but for leave events.

type ('tag, 'output) pipe_emit =
  1. | Transitions : ('tag, 'tag Event.transition) pipe_emit
  2. | Transitions_and_tag_changes : ('tag -> 'tag -> bool) -> ('tag, [ 'tag Event.transition | 'tag Event.tag_change ]) pipe_emit

In Transitions_and_tag_changes, equality for the tag type must be given.

val to_pipe : (zoned, 'tag) t -> start_time:Core.Time.t -> emit:('tag, 'output) pipe_emit -> ?time_source:Async_kernel.Time_source.t -> unit -> [ `Started_in_range of 'tag list * 'output Async_kernel.Pipe.Reader.t | `Started_out_of_range of 'output Async_kernel.Pipe.Reader.t ]

to_pipe t ~start_time ~emit ?stop () produces a pipe containing the events from to_endless_sequence ~start_time t ~emit, with `No_change_until_at_least filtered out and each event added only at or after its scheduled time.

val next_event : (zoned, 'tag) t -> event:[ `Enter | `Leave ] -> stop:unit Async_kernel.Deferred.t -> ?time_source:Async_kernel.Time_source.t -> ?after:Core.Time.t -> unit -> Core.Time.t Async_kernel.Deferred.t

next_event t ~event ~stop ~after () waits for the time of the next event matching event in t starting at time after. At that time, the resulting deferred is determined and filled with the time of the event.

If stop becomes determined before the next event, the resulting deferred is never filled and the computation to find the next event stops. If the caller intends to never use the returned deferred, stop should be filled or the background computation will continue to keep the deferred alive until the event occurs.

This function is a good choice for handling a single event during the run of a program, like scheduling shutdown. If the intention is to follow along with all events in a schedule, it is preferable to call to_pipe or to_endless_sequence (in the non-async module).

type 'a every_enter_callback = enter:Core.Time.t -> leave:Core.Time.t Async_kernel.Deferred.t -> 'a
val every_enter_without_pushback : (zoned, _) t -> ?time_source:Async_kernel.Time_source.t -> ?start:Core.Time.t -> ?stop:unit Async_kernel.Deferred.t -> ?continue_on_error:bool -> ?start_in_range_is_enter:bool -> unit every_enter_callback -> unit

every_enter_without_pushback t ~start ~stop ~continue_on_error ~start_in_range_is_enter f calls f for each contiguous block of time in t starting at start and continuing until stop becomes determined.

For each block of time with start time enter and end time leave_time, f is called with f ~enter ~leave, where leave is a deferred that becomes determined at leave_time with the value leave_time.

If includes t start && start_in_range_is_enter, then f will be called as soon as possible after start with enter = start. Otherwise, f will not be called for any block of time that includes start.

If continue_on_error = false and f (or any async job started by f) raises an error, f will no longer be called, and all undetermined leave deferreds will remain unfulfilled.

If stop is fulfilled then no further calls to f will be made and all undetermined leave deferreds will remain unfulfilled.

val every_enter : (zoned, _) t -> ?time_source:Async_kernel.Time_source.t -> ?start:Core.Time.t -> ?stop:unit Async_kernel.Deferred.t -> ?continue_on_error:bool -> ?start_in_range_is_enter:bool -> ?on_pushback:unit every_enter_callback -> unit Async_kernel.Deferred.t every_enter_callback -> unit

Like every_enter_without_pushback, except allows at most one call of f to be in flight at a time. If the schedule would cause f to be invoked again before the prior call has finished, then it invokes on_pushback instead (if provided).

val every_tag_change_without_pushback : (zoned, 'tag) t -> ?time_source:Async_kernel.Time_source.t -> ?start:Core.Time.t -> ?stop:unit Async_kernel.Deferred.t -> ?continue_on_error:bool -> ?start_in_range_is_enter:bool -> tag_equal:('tag -> 'tag -> bool) -> (tags:'tag list -> unit every_enter_callback) -> unit

every_tag_change t f calls f for each contiguous block of time in t where the set of tags in effect remains stable (according to tag_equal).

Moving from a range where the schedule is not in effect to one where it is in effect with no tags is considered a tag change.

For each block of time tagged with tags and start time enter and end time leave_time, f is called with f ~tags ~enter ~leave, where leave is a deferred that becomes determined at leave_time with the value leave_time.

stop, continue_on_error, and start_in_range_is_enter act as documented in every_enter.

val every_tag_change : (zoned, 'tag) t -> ?time_source:Async_kernel.Time_source.t -> ?start:Core.Time.t -> ?stop:unit Async_kernel.Deferred.t -> ?continue_on_error:bool -> ?start_in_range_is_enter:bool -> ?on_pushback:(tags:'tag list -> unit every_enter_callback) -> tag_equal:('tag -> 'tag -> bool) -> (tags:'tag list -> unit Async_kernel.Deferred.t every_enter_callback) -> unit

Like every_tag_change_without_pushback, but pushes back in the same manner as every_enter.

OCaml

Innovation. Community. Security.