package core

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file nano_mutex.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
open! Core
open! Import

let ok_exn = Or_error.ok_exn

(* A [Blocker.t] is an ordinary mutex and conditional variable used to implement blocking
   when there is lock contention. *)
module Blocker : sig
  type t [@@deriving sexp_of]

  val create : unit -> t
  val critical_section : t -> f:(unit -> 'a) -> 'a
  val wait : t -> unit
  val signal : t -> unit
  val save_unused : t -> unit
end = struct
  (* Our use of mutexes is always via [Mutex.critical_section], so that we always lock
     them and unlock them from a single thread.  So, we use [Core.Mutex], which is
     error-checking mutexes, which will catch any use that is not what we expect. *)
  module Condition = Condition
  module Mutex = Error_checking_mutex

  type t =
    { mutex : (Mutex.t[@sexp.opaque])
    ; condition : (Condition.t[@sexp.opaque])
    }
  [@@deriving sexp_of]

  (* We keep a cache of unused blockers, since they are relatively costly to create, and
     we should never need very many simultaneously.  We should never need more blockers
     than the number of nano mutexes being simultaneously blocked on, which of course is
     no more than the total number of simultaneous threads. *)
  let unused : t Thread_safe_queue.t = Thread_safe_queue.create ()

  (* [save_unused t] should be called when [t] is no longer in use, so it can be returned
     by a future call of [create]. *)
  let save_unused t = Thread_safe_queue.enqueue unused t

  let create () =
    if Thread_safe_queue.length unused > 0
    then Thread_safe_queue.dequeue_exn unused
    else { mutex = Mutex.create (); condition = Condition.create () }
  ;;

  let critical_section t ~f = Mutex.critical_section t.mutex ~f
  let wait t = Condition.wait t.condition t.mutex
  let signal t = Condition.signal t.condition
end

(* We represent a nano mutex using an OCaml record.  The [id_of_thread_holding_lock] field
   represents whether the mutex is locked or not, and if it is locked, which thread holds
   the lock.  We do not use an [int option] for performance reasons (doing so slows down
   lock+unlock by a factor of almost two).  Instead, we have [id_of_thread_holding_lock =
   bogus_thread_id] when the mutex is unlocked.

   The mutex record has an optional [blocker] field for use when the mutex is contended.
   We use the OS-level condition variable in [blocker] to [wait] in a thread that desires
   the lock and to [signal] from a thread that is releasing it.

   When thinking about the implementation, it is helpful to remember the following
   desiderata:

 * Safety -- only one thread can acquire the lock at a time.  This is accomplished
   usng a test-and-set to set [id_of_thread_holding_lock].

 * Liveness -- if the mutex is unlocked and some threads are waiting on it, then one of
   those threads will be woken up and given a chance to acquire it.  This is accomplished
   by only waiting when we can ensure that there will be a [signal] of the condition
   variable in the future.  See the more detailed comment in [lock].

 * Performance -- do not spin trying to acquire the lock.  This is accomplished by
   waiting on a condition variable if a lock is contended. *)
type t =
  { mutable id_of_thread_holding_lock : int
  ; mutable num_using_blocker : int
  ; mutable blocker : Blocker.t option
  }
[@@deriving fields, sexp_of]

let invariant t =
  try
    assert (t.num_using_blocker >= 0);
    (* It is the case that if [t.num_using_blocker = 0] then [Option.is_none t.blocker],
       however the converse does not necessarily hold.  The code in [with_blocker] doesn't
       take care to atomically increment [t.num_using_blocker] and set [t.blocker] to
       [Some].  It could, but doing so is not necessary for the correctness of of
       [with_blocker], which only relies on test-and-set of [t.blocker] to make sure
       there is an agreed-upon winner in the race to create a blocker. *)
    if t.num_using_blocker = 0 then assert (Option.is_none t.blocker)
  with
  | exn -> failwiths "invariant failed" (exn, t) [%sexp_of: exn * t]
;;

let equal (t : t) t' = phys_equal t t'
let bogus_thread_id = -1

let create () =
  { id_of_thread_holding_lock = bogus_thread_id; num_using_blocker = 0; blocker = None }
;;

let is_locked t = t.id_of_thread_holding_lock <> bogus_thread_id
let current_thread_id () = Thread.id (Thread.self ())
let current_thread_has_lock t = t.id_of_thread_holding_lock = current_thread_id ()

let recursive_lock_error t =
  Error.create
    "attempt to lock mutex by thread already holding it"
    (current_thread_id (), t)
    [%sexp_of: int * t]
;;

let try_lock t =
  (* The following code relies on an atomic test-and-set of [id_of_thread_holding_lock],
     so that there is a definitive winner in a race between multiple lockers and everybody
     agrees who acquired the lock. *)
  let current_thread_id = current_thread_id () in
  (* BEGIN ATOMIC *)
  if t.id_of_thread_holding_lock = bogus_thread_id
  then (
    t.id_of_thread_holding_lock <- current_thread_id;
    (* END ATOMIC *)
    Ok `Acquired)
  else if current_thread_id = t.id_of_thread_holding_lock
  then Error (recursive_lock_error t)
  else Ok `Not_acquired
;;

let try_lock_exn t = ok_exn (try_lock t)

(* [with_blocker t f] runs [f blocker] in a critical section.  It allocates a blocker for
   [t] if [t] doesn't already have one. *)
let with_blocker t f =
  t.num_using_blocker <- t.num_using_blocker + 1;
  let blocker =
    match t.blocker with
    | Some blocker -> blocker
    | None ->
      let new_blocker = Blocker.create () in
      (* We allocate [new_blocker_opt] here because one cannot allocate inside an atomic
         region. *)
      let new_blocker_opt = Some new_blocker in
      let blocker =
        (* We need the following test-and-set to be atomic so that there is a definitive
           winner in a race between multiple calls to [with_blocker], so that everybody
           agrees what the underlying [blocker] is. *)
        (* BEGIN ATOMIC *)
        match t.blocker with
        | Some blocker -> blocker
        | None ->
          t.blocker <- new_blocker_opt;
          new_blocker
          (* END ATOMIC *)
      in
      if not (phys_equal blocker new_blocker) then Blocker.save_unused new_blocker;
      blocker
  in
  protect
    ~f:(fun () -> Blocker.critical_section blocker ~f:(fun () -> f blocker))
    ~finally:(fun () ->
      (* We need the following decrement-test-and-set to be atomic so that we're sure that
         the last user of blocker clears it. *)
      (* BEGIN ATOMIC *)
      t.num_using_blocker <- t.num_using_blocker - 1;
      if t.num_using_blocker = 0
      then (
        t.blocker <- None;
        (* END ATOMIC *)
        Blocker.save_unused blocker))
;;

let rec lock t =
  (* The following code relies on an atomic test-and-set of [id_of_thread_holding_lock],
     so that there is a definitive winner in a race between multiple [lock]ers, and
     everybody agrees who acquired the lock.

     If [is_locked t], we block the locking thread using [Blocker.wait], until some
     unlocking thread [Blocker.signal]s us.  There is a race between the [wait] and the
     [signal].  If the unlocking thread signals in between our test of
     [t.id_of_thread_holding_lock] and our [wait], then our [wait] could miss the signal
     and block forever.  We avoid this race by committing to waiting inside a
     [with_blocker], which increments [t.num_using_blocker].  If the [signal] occurs
     before the [with_blocker], then it will have cleared [t.id_of_thread_holding_lock],
     which we will notice as [not (is_locked t)], and then not [wait], and loop trying to
     [lock] again.  Otherwise, when an [unlock] occurs, it will see that [is_some
     t.blocker], and will enter a critical section on [blocker].  But then it must wait
     until our critical section on [blocker] finishes, and hence until our call to [wait]
     finishes.  Hence, the [signal] will occur after the [wait].

     The recursive call to [lock] will not spin.  It happens either because we just lost
     the race with an unlocker, in which case the subsequent [lock] will succeed, or
     we actually had to block because someone is holding the lock.  The latter is the
     overwhelmingly common case.

     Other threads can change [t.id_of_thread_holding_lock] concurrently with this code.
     However, no other thread can set it to our [current_thread_id], since threads only
     ever set [t.id_of_thread_holding_lock] to their current thread id, or clear it. *)
  let current_thread_id = current_thread_id () in
  (* BEGIN ATOMIC *)
  if t.id_of_thread_holding_lock = bogus_thread_id
  then (
    t.id_of_thread_holding_lock <- current_thread_id;
    (* END ATOMIC *)
    Ok ())
  else if current_thread_id = t.id_of_thread_holding_lock
  then Error (recursive_lock_error t)
  else (
    with_blocker t (fun blocker -> if is_locked t then Blocker.wait blocker);
    lock t)
;;

let lock_exn t = ok_exn (lock t)

type message =
  { current_thread_id : int
  ; mutex : t
  }
[@@deriving sexp_of]

let unlock t =
  let current_thread_id = current_thread_id () in
  (* We need the following test-and-set to be atomic so that there is a definitive
     winner in a race between multiple unlockers, so that one unlock succeeds and the
     rest fail. *)
  (* BEGIN ATOMIC *)
  if t.id_of_thread_holding_lock <> bogus_thread_id
  then
    if t.id_of_thread_holding_lock = current_thread_id
    then (
      t.id_of_thread_holding_lock <- bogus_thread_id;
      (* END ATOMIC *)
      if Option.is_some t.blocker then with_blocker t Blocker.signal;
      Ok ())
    else
      Error
        (Error.create
           "attempt to unlock mutex held by another thread"
           { current_thread_id; mutex = t }
           [%sexp_of: message])
  else
    Error
      (Error.create
         "attempt to unlock an unlocked mutex"
         { current_thread_id; mutex = t }
         [%sexp_of: message])
;;

let unlock_exn t = ok_exn (unlock t)

let critical_section t ~f =
  lock_exn t;
  protect ~f ~finally:(fun () -> unlock_exn t)
;;
OCaml

Innovation. Community. Security.