package moonpool

  1. Overview
  2. Docs

Source file moonpool_dpool.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
module Bb_queue = struct
  type 'a t = {
    mutex: Mutex.t;
    cond: Condition.t;
    q: 'a Queue.t;
  }

  let create () : _ t =
    { mutex = Mutex.create (); cond = Condition.create (); q = Queue.create () }

  let push (self : _ t) x : unit =
    Mutex.lock self.mutex;
    let was_empty = Queue.is_empty self.q in
    Queue.push x self.q;
    if was_empty then Condition.broadcast self.cond;
    Mutex.unlock self.mutex

  let pop (self : 'a t) : 'a =
    Mutex.lock self.mutex;
    let rec loop () =
      if Queue.is_empty self.q then (
        Condition.wait self.cond self.mutex;
        (loop [@tailcall]) ()
      ) else (
        let x = Queue.pop self.q in
        Mutex.unlock self.mutex;
        x
      )
    in
    loop ()
end

module Lock = struct
  type 'a t = {
    mutex: Mutex.t;
    mutable content: 'a;
  }

  let create content : _ t = { mutex = Mutex.create (); content }

  let with_ (self : _ t) f =
    Mutex.lock self.mutex;
    try
      let x = f self.content in
      Mutex.unlock self.mutex;
      x
    with e ->
      Mutex.unlock self.mutex;
      raise e

  let[@inline] update_map l f =
    with_ l (fun x ->
        let x', y = f x in
        l.content <- x';
        y)

  let get l =
    Mutex.lock l.mutex;
    let x = l.content in
    Mutex.unlock l.mutex;
    x
end

type domain = Domain_.t

type event =
  | Run of (unit -> unit)  (** Run this function *)
  | Decr  (** Decrease count *)

(* State for a domain worker. It should not do too much except for starting
    new threads for pools. *)
type worker_state = {
  q: event Bb_queue.t;
  th_count: int Atomic_.t;  (** Number of threads on this *)
}

(** Array of (optional) workers.

    Workers are started/stop on demand. For each index we have
    the (currently active) domain's state
    including a work queue and a thread refcount; and the domain itself,
    if any, in a separate option because it might outlive its own state. *)
let domains_ : (worker_state option * Domain_.t option) Lock.t array =
  let n = max 1 (Domain_.recommended_number ()) in
  Array.init n (fun _ -> Lock.create (None, None))

(** main work loop for a domain worker.

   A domain worker does two things:
   - run functions it's asked to (mainly, to start new threads inside it)
   - decrease the refcount when one of these threads stops. The thread
     will notify the domain that it's exiting, so the domain can know
     how many threads are still using it. If all threads exit, the domain
     polls a bit (in case new threads are created really shortly after,
     which happens with a [Pool.with_] or [Pool.create() … Pool.shutdown()]
     in a tight loop), and if nothing happens it tries to stop to free resources.
*)
let work_ idx (st : worker_state) : unit =
  let main_loop () =
    let continue = ref true in
    while !continue do
      match Bb_queue.pop st.q with
      | Run f -> (try f () with _ -> ())
      | Decr ->
        if Atomic_.fetch_and_add st.th_count (-1) = 1 then (
          continue := false;

          (* wait a bit, we might be needed again in a short amount of time *)
          try
            for _n_attempt = 1 to 50 do
              Thread.delay 0.001;
              if Atomic_.get st.th_count > 0 then (
                (* needed again! *)
                continue := true;
                raise Exit
              )
            done
          with Exit -> ()
        )
    done
  in

  while
    main_loop ();

    (* exit: try to remove ourselves from [domains]. If that fails, keep living. *)
    let is_alive =
      Lock.update_map domains_.(idx) (function
        | None, _ -> assert false
        | Some _st', dom ->
          assert (st == _st');

          if Atomic_.get st.th_count > 0 then
            (* still alive! *)
            (Some st, dom), true
          else
            (None, dom), false)
    in

    is_alive
  do
    ()
  done;
  ()

(* special case for main domain: we start a worker immediately *)
let () =
  assert (Domain_.is_main_domain ());
  let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in
  (* thread that stays alive *)
  ignore (Thread.create (fun () -> work_ 0 w) () : Thread.t);
  domains_.(0) <- Lock.create (Some w, None)

let[@inline] max_number_of_domains () : int = Array.length domains_

let run_on (i : int) (f : unit -> unit) : unit =
  assert (i < Array.length domains_);
  let w =
    Lock.update_map domains_.(i) (function
      | (Some w, _) as st ->
        Atomic_.incr w.th_count;
        st, w
      | None, dying_dom ->
        (* join previous dying domain, to free its resources, if any *)
        Option.iter Domain_.join dying_dom;
        let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in
        let worker : domain = Domain_.spawn (fun () -> work_ i w) in
        (Some w, Some worker), w)
  in
  Bb_queue.push w.q (Run f)

let decr_on (i : int) : unit =
  assert (i < Array.length domains_);
  match Lock.get domains_.(i) with
  | Some st, _ -> Bb_queue.push st.q Decr
  | None, _ -> ()

let run_on_and_wait (i : int) (f : unit -> 'a) : 'a =
  let q = Bb_queue.create () in
  run_on i (fun () ->
      let x = f () in
      Bb_queue.push q x);
  Bb_queue.pop q
OCaml

Innovation. Community. Security.