package moonpool-lwt

  1. Overview
  2. Docs
Event loop for moonpool based on Lwt-engine (experimental)

Install

Dune Dependency

Authors

Maintainers

Sources

moonpool-0.6.tbz
sha256=3efd095c82a37bba8c7ab6a2532aee3c445ebe1ecaed84ef3ffb560bc65e7633
sha512=e4bcab82e6638299c2d0beb1dbf204f7b43379a5387418c6edff85b9bf90c13ad1bdd8eb44b69cd421268d1bc45bcf918bcf77e1c924348211ac27d6643aac78

doc/src/moonpool-lwt/base.ml.html

Source file base.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
open Common_
module Fiber = Moonpool_fib.Fiber
module FLS = Moonpool_fib.Fls

(** Action scheduled from outside the loop *)
module Action = struct
  type event = Lwt_engine.event
  type cb = event -> unit

  (** Action that we ask the lwt loop to perform, from the outside *)
  type t =
    | Wait_readable of Unix.file_descr * cb
    | Wait_writable of Unix.file_descr * cb
    | Sleep of float * bool * cb
    (* TODO: provide actions with cancellation, alongside a "select" operation *)
    (* | Cancel of event *)
    | On_termination : 'a Lwt.t * ('a Exn_bt.result -> unit) -> t
    | Wakeup : 'a Lwt.u * 'a -> t
    | Wakeup_exn : _ Lwt.u * exn -> t
    | Other of (unit -> unit)

  (** Perform the action from within the Lwt thread *)
  let perform (self : t) : unit =
    match self with
    | Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event)
    | Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event)
    | Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event)
    (* | Cancel ev -> Lwt_engine.stop_event ev *)
    | On_termination (fut, f) ->
      Lwt.on_any fut
        (fun x -> f @@ Ok x)
        (fun exn -> f @@ Error (Exn_bt.get_callstack 10 exn))
    | Wakeup (prom, x) -> Lwt.wakeup prom x
    | Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e
    | Other f -> f ()
end

module Action_queue = struct
  type t = { q: Action.t list Atomic.t } [@@unboxed]

  let create () : t = { q = Atomic.make [] }
  let pop_all (self : t) : _ list = Atomic.exchange self.q []

  (** Push the action and return whether the queue was previously empty *)
  let push (self : t) (a : Action.t) : bool =
    let is_first = ref true in
    while
      let old = Atomic.get self.q in
      if Atomic.compare_and_set self.q old (a :: old) then (
        is_first := old == [];
        false
      ) else
        true
    do
      ()
    done;
    !is_first
end

module Perform_action_in_lwt = struct
  open struct
    let actions_ : Action_queue.t = Action_queue.create ()

    (** Gets the current set of notifications and perform them from inside the
    Lwt thread *)
    let perform_pending_actions () : unit =
      let@ _sp =
        Moonpool.Private.Tracing_.with_span
          "moonpool-lwt.perform-pending-actions"
      in

      let l = Action_queue.pop_all actions_ in
      List.iter Action.perform l

    let notification : int =
      Lwt_unix.make_notification ~once:false perform_pending_actions
  end

  let schedule (a : Action.t) : unit =
    let is_first = Action_queue.push actions_ a in
    if is_first then Lwt_unix.send_notification notification
end

let get_runner () : M.Runner.t =
  match M.Runner.get_current_runner () with
  | Some r -> r
  | None -> failwith "Moonpool_lwt.get_runner: not inside a runner"

let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
  let lwt_fut, lwt_prom = Lwt.wait () in
  M.Fut.on_result fut (function
    | Ok x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (lwt_prom, x)
    | Error (exn, _) ->
      Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (lwt_prom, exn));
  lwt_fut

let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
  match Lwt.poll lwt_fut with
  | Some x -> M.Fut.return x
  | None ->
    let fut, prom = M.Fut.make () in
    Lwt.on_any lwt_fut
      (fun x -> M.Fut.fulfill prom (Ok x))
      (fun e -> M.Fut.fulfill prom (Error (e, Printexc.get_callstack 10)));
    fut

let await_lwt (fut : _ Lwt.t) =
  match Lwt.poll fut with
  | Some x -> x
  | None ->
    (* suspend fiber, wake it up when [fut] resolves *)
    M.Private.Suspend_.suspend
      {
        handle =
          (fun ~run:_ ~resume sus ->
            let on_lwt_done _ = resume sus @@ Ok () in
            Perform_action_in_lwt.(
              schedule Action.(On_termination (fut, on_lwt_done))));
      };

    (match Lwt.poll fut with
    | Some x -> x
    | None -> assert false)

let run_in_lwt f : _ M.Fut.t =
  let fut, prom = M.Fut.make () in
  Perform_action_in_lwt.schedule
    (Action.Other
       (fun () ->
         let lwt_fut = f () in
         Lwt.on_any lwt_fut
           (fun x -> M.Fut.fulfill prom @@ Ok x)
           (fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn))));
  fut

let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f

let detach_in_runner ~runner f : _ Lwt.t =
  let fut, promise = Lwt.wait () in
  M.Runner.run_async runner (fun () ->
      match f () with
      | x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (promise, x)
      | exception exn ->
        Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn));
  fut

let main_with_runner ~runner (f : unit -> 'a) : 'a =
  let lwt_fut, lwt_prom = Lwt.wait () in

  let _fiber =
    Fiber.spawn_top ~on:runner (fun () ->
        try
          let x = f () in
          Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x))
        with exn ->
          Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn)))
  in

  Lwt_main.run lwt_fut

let main f =
  let@ runner = M.Ws_pool.with_ () in
  main_with_runner ~runner f
OCaml

Innovation. Community. Security.