package async_kernel

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file time_source_intf.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
(** A time source holds a time (possibly wall-clock time, possibly simulated time) and
    gives the ability to schedule Async jobs to run when that time advances.

    There is a single wall-clock time source (returned by [wall_clock ()]) that the Async
    scheduler drives and uses for the [Clock_ns] module.  One can also create a
    user-controlled time source via [create], and advance its clock as desired.  This is
    useful so that state machines can depend on a notion of time that is distinct from
    wall-clock time. *)

open! Core
open! Import
module Deferred = Deferred1

module type Time_source = sig
  (** A time source has a phantom read-write parameter, where [write] gives permission to
      call [advance] and [fire_past_alarms]. *)
  module T1 : sig
    type -'rw t [@@deriving sexp_of]
  end

  module Read_write : sig
    type t = read_write T1.t [@@deriving sexp_of]

    include Invariant.S with type t := t

    val invariant_with_jobs : job:Job.t Invariant.t -> t Invariant.t
  end

  module Id : Unique_id.Id

  type t = read T1.t [@@deriving sexp_of]

  (** [id t] returns a unique, consistent identifier which can be used e.g. as a map or
      hash table key. *)
  val id : _ T1.t -> Id.t

  include Invariant.S with type t := t

  val invariant_with_jobs : job:Job.t Invariant.t -> t Invariant.t
  val read_only : [> read ] T1.t -> t

  val create
    :  ?timing_wheel_config:Timing_wheel.Config.t
    -> now:Time_ns.t
    -> unit
    -> read_write T1.t

  (** A time source with [now t] given by wall-clock time (i.e., [Time_ns.now]) and that
      is advanced automatically as time passes (specifically, at the start of each Async
      cycle).  There is only one wall-clock time source; every call to [wall_clock ()]
      returns the same value.  The behavior of [now] is special for [wall_clock ()]; it
      always calls [Time_ns.now ()], so it can return times that the time source has not
      yet been advanced to. *)
  val wall_clock : unit -> t

  (** Accessors.  [now (wall_clock ())] behaves specially; see [wall_clock] above. *)

  val alarm_precision : [> read ] T1.t -> Time_ns.Span.t
  val is_wall_clock : [> read ] T1.t -> bool
  val next_alarm_fires_at : [> read ] T1.t -> Time_ns.t option
  val now : [> read ] T1.t -> Time_ns.t

  (** Removes the special behavior of [now] for [wall_clock]; it always returns the
      timing_wheel's notion of now. *)
  val timing_wheel_now : [> read ] T1.t -> Time_ns.t


  (** Instead of [advance_directly], you probably should use [advance_by_alarms].
      [advance_directly t ~to_] advances the clock directly to [to_], whereas
      [advance_by_alarms] advances the clock in steps, to each intervening alarm.
      [advance_directly] approximately determines the set of events to fire, up to
      timing-wheel alarm precision, whereas [advance_by_alarms] fires all alarms whose
      time is [<= to_].  With [advance_directly], you must call [fire_past_alarms] if you
      want that behavior (see docs for [Timing_wheel.advance_clock] vs.
      [Timing_wheel.fire_past_alarms]). *)
  val advance_directly : [> write ] T1.t -> to_:Time_ns.t -> unit

  val advance : [> write ] T1.t -> to_:Time_ns.t -> unit
  [@@deprecated
    "[since 2019-06] Use [advance_directly] (to preserve behavior) or \
     [advance_by_alarms]"]

  val advance_directly_by : [> write ] T1.t -> Time_ns.Span.t -> unit

  val advance_by : [> write ] T1.t -> Time_ns.Span.t -> unit
  [@@deprecated
    "[since 2019-06] Use [advance_directly_by] (to preserve behavior) or \
     [advance_by_alarms_by]"]

  val fire_past_alarms : [> write ] T1.t -> unit

  (** [advance_by_alarms t] repeatedly calls [advance t] to drive the time forward in
      steps, where each step is the minimum of [to_] and the next alarm time. After each
      step, [advance_by_alarms] waits for the result of [wait_for] to become determined
      before advancing. By default, [wait_for] will be [Scheduler.yield ()] to allow the
      triggered timers to execute and potentially rearm for subsequent steps. The returned
      deferred is filled when [to_] is reached.

      [advance_by_alarms] is useful in simulation when one wants to efficiently advance to
      a time in the future while giving periodic timers (e.g., resulting from [every]) a
      chance to fire with approximately the same timing as they would live. *)
  val advance_by_alarms
    :  ?wait_for:(unit -> unit Deferred.t)
    -> [> write ] T1.t
    -> to_:Time_ns.t
    -> unit Deferred.t

  val advance_by_max_alarms_in_each_timing_wheel_interval
    :  ?wait_for:(unit -> unit Deferred.t)
    -> [> write ] T1.t
    -> to_:Time_ns.t
    -> unit Deferred.t
  [@@deprecated
    "[since 2021-12] This is the old implementation of [advance_by_alarms], kept in \
     case the new implementation causes problems."]

  (** [advance_by_alarms_by ?wait_for t by] is equivalent to:
      [advance_by_alarms ?wait_for t ~to_:(Time_ns.add (now t) by)] *)
  val advance_by_alarms_by
    :  ?wait_for:(unit -> unit Deferred.t)
    -> [> write ] T1.t
    -> Time_ns.Span.t
    -> unit Deferred.t

  module Continue : sig
    type t

    val immediately : t
  end

  (** See {{!Async_kernel.Clock_intf.Clock.every'}[Clock.every]} for documentation. *)
  val run_repeatedly
    :  ?start:unit Deferred.t (** default is [return ()] *)
    -> ?stop:unit Deferred.t (** default is [Deferred.never ()] *)
    -> ?continue_on_error:bool (** default is [true] *)
    -> ?finished:unit Ivar.t
    -> [> read ] T1.t
    -> f:(unit -> unit Deferred.t)
    -> continue:Continue.t
    -> unit

  (** The functions below here are the same as in clock_intf.ml, except they take an
      explicit [t] argument.  See {{!Async_kernel.Clock_intf}[Clock_intf]} for
      documentation. *)

  val run_at : [> read ] T1.t -> Time_ns.t -> ('a -> unit) -> 'a -> unit
  val run_after : [> read ] T1.t -> Time_ns.Span.t -> ('a -> unit) -> 'a -> unit


  val at : [> read ] T1.t -> Time_ns.t -> unit Deferred.t
  val after : [> read ] T1.t -> Time_ns.Span.t -> unit Deferred.t

  val with_timeout
    :  [> read ] T1.t
    -> Time_ns.Span.t
    -> 'a Deferred.t
    -> [ `Timeout | `Result of 'a ] Deferred.t

  val duration_of
    :  [> read ] T1.t
    -> (unit -> 'a Deferred.t)
    -> ('a * Time_ns.Span.t) Deferred.t

  module Event : sig
    type ('a, 'h) t [@@deriving sexp_of]
    type t_unit = (unit, unit) t [@@deriving sexp_of]

    include Invariant.S2 with type ('a, 'b) t := ('a, 'b) t

    val scheduled_at : (_, _) t -> Time_ns.t

    module Status : sig
      type ('a, 'h) t =
        | Aborted of 'a
        | Happened of 'h
        | Scheduled_at of Time_ns.t
      [@@deriving sexp_of]
    end

    val status : ('a, 'h) t -> ('a, 'h) Status.t
    val run_at : [> read ] T1.t -> Time_ns.t -> ('z -> 'h) -> 'z -> (_, 'h) t
    val run_after : [> read ] T1.t -> Time_ns.Span.t -> ('z -> 'h) -> 'z -> (_, 'h) t

    module Abort_result : sig
      type ('a, 'h) t =
        | Ok
        | Previously_aborted of 'a
        | Previously_happened of 'h
      [@@deriving sexp_of]
    end

    val abort : ('a, 'h) t -> 'a -> ('a, 'h) Abort_result.t
    val abort_exn : ('a, 'h) t -> 'a -> unit
    val abort_if_possible : ('a, _) t -> 'a -> unit

    module Fired : sig
      type ('a, 'h) t =
        | Aborted of 'a
        | Happened of 'h
      [@@deriving sexp_of]
    end

    val fired : ('a, 'h) t -> ('a, 'h) Fired.t Deferred.t

    module Reschedule_result : sig
      type ('a, 'h) t =
        | Ok
        | Previously_aborted of 'a
        | Previously_happened of 'h
      [@@deriving sexp_of]
    end

    val reschedule_at : ('a, 'h) t -> Time_ns.t -> ('a, 'h) Reschedule_result.t
    val reschedule_after : ('a, 'h) t -> Time_ns.Span.t -> ('a, 'h) Reschedule_result.t
    val at : [> read ] T1.t -> Time_ns.t -> (_, unit) t
    val after : [> read ] T1.t -> Time_ns.Span.t -> (_, unit) t
  end

  val at_varying_intervals
    :  ?stop:unit Deferred.t
    -> [> read ] T1.t
    -> (unit -> Time_ns.Span.t)
    -> unit Async_stream.t

  val at_intervals
    :  ?start:Time_ns.t
    -> ?stop:unit Deferred.t
    -> [> read ] T1.t
    -> Time_ns.Span.t
    -> unit Async_stream.t

  (** See {!Clock.every'} for documentation. *)
  val every'
    :  ?start:unit Deferred.t (** default is [return ()] *)
    -> ?stop:unit Deferred.t (** default is [Deferred.never ()] *)
    -> ?continue_on_error:bool (** default is [true] *)
    -> ?finished:unit Ivar.t
    -> [> read ] T1.t
    -> Time_ns.Span.t
    -> (unit -> unit Deferred.t)
    -> unit

  val every
    :  ?start:unit Deferred.t (** default is [return ()] *)
    -> ?stop:unit Deferred.t (** default is [Deferred.never ()] *)
    -> ?continue_on_error:bool (** default is [true] *)
    -> [> read ] T1.t
    -> Time_ns.Span.t
    -> (unit -> unit)
    -> unit

  val run_at_intervals'
    :  ?start:Time_ns.t (** default is [now t] *)
    -> ?stop:unit Deferred.t (** default is [Deferred.never ()] *)
    -> ?continue_on_error:bool (** default is [true] *)
    -> [> read ] T1.t
    -> Time_ns.Span.t
    -> (unit -> unit Deferred.t)
    -> unit

  val run_at_intervals
    :  ?start:Time_ns.t (** default is [now t] *)
    -> ?stop:unit Deferred.t (** default is [Deferred.never ()] *)
    -> ?continue_on_error:bool (** default is [true] *)
    -> [> read ] T1.t
    -> Time_ns.Span.t
    -> (unit -> unit)
    -> unit

  (** [Time_source] and [Synchronous_time_source] are the same data structure and use the
      same underlying timing wheel.  The types are freely interchangeable. *)
  val of_synchronous : 'a Synchronous_time_source0.T1.t -> 'a T1.t

  val to_synchronous : 'a T1.t -> 'a Synchronous_time_source0.T1.t
end
OCaml

Innovation. Community. Security.