package core_unix
Unix-specific portions of Core
Install
Dune Dependency
Authors
Maintainers
Sources
v0.17.1.tar.gz
md5=9370dca36f518fcea046d2752e3de22b
sha512=c4e8ce9d5885ac8fa8d554a97e1857f3a1c933e0eb5dfd4fe874412b9d09e6d0a2973b644733855553f33f5c859719228f0e6aaf3a2b7eb5befb46fc513750de
doc/src/core_unix.nano_mutex/nano_mutex.ml.html
Source file nano_mutex.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
open! Core open! Import let ok_exn = Or_error.ok_exn (* A [Blocker.t] is an ordinary mutex and conditional variable used to implement blocking when there is lock contention. *) module Blocker : sig type t [@@deriving sexp_of] val create : unit -> t val critical_section : t -> f:(unit -> 'a) -> 'a val wait : t -> unit val signal : t -> unit val save_unused : t -> unit end = struct (* Our use of mutexes is always via [Mutex.critical_section], so that we always lock them and unlock them from a single thread. So, we use [Core.Mutex], which is error-checking mutexes, which will catch any use that is not what we expect. *) module Condition = Condition module Mutex = Error_checking_mutex type t = { mutex : (Mutex.t[@sexp.opaque]) ; condition : (Condition.t[@sexp.opaque]) } [@@deriving sexp_of] (* We keep a cache of unused blockers, since they are relatively costly to create, and we should never need very many simultaneously. We should never need more blockers than the number of nano mutexes being simultaneously blocked on, which of course is no more than the total number of simultaneous threads. *) let unused : t Thread_safe_queue.t = Thread_safe_queue.create () (* [save_unused t] should be called when [t] is no longer in use, so it can be returned by a future call of [create]. *) let save_unused t = Thread_safe_queue.enqueue unused t let create () = if Thread_safe_queue.length unused > 0 then Thread_safe_queue.dequeue_exn unused else { mutex = Mutex.create (); condition = Condition.create () } ;; let critical_section t ~f = Mutex.critical_section t.mutex ~f let wait t = Condition.wait t.condition t.mutex let signal t = Condition.signal t.condition end module Thread_id_option : sig type t [@@deriving equal, sexp_of] [@@immediate] val none : t val some : int -> t val is_none : t -> bool val is_some : t -> bool end = struct type t = int [@@deriving equal, sexp_of] let none = -1 let[@inline always] is_none t = t = none let[@inline always] is_some t = t <> none let[@inline always] some int = int let sexp_of_t t = if t = none then [%sexp "None"] else [%sexp (t : t)] end (* We represent a nano mutex using an OCaml record. The [id_of_thread_holding_lock] field represents whether the mutex is locked or not, and if it is locked, which thread holds the lock. We use [Thread_id_option] instead of [int option] for performance reasons (using [int option] slows down lock+unlock by a factor of almost two). The mutex record has an optional [blocker] field for use when the mutex is contended. We use the OS-level condition variable in [blocker] to [wait] in a thread that desires the lock and to [signal] from a thread that is releasing it. When thinking about the implementation, it is helpful to remember the following desiderata: * Safety -- only one thread can acquire the lock at a time. This is accomplished usng a test-and-set to set [id_of_thread_holding_lock]. * Liveness -- if the mutex is unlocked and some threads are waiting on it, then one of those threads will be woken up and given a chance to acquire it. This is accomplished by only waiting when we can ensure that there will be a [signal] of the condition variable in the future. See the more detailed comment in [lock]. * Performance -- do not spin trying to acquire the lock. This is accomplished by waiting on a condition variable if a lock is contended. *) type t = { mutable id_of_thread_holding_lock : Thread_id_option.t ; mutable num_using_blocker : int ; mutable blocker : Blocker.t Uopt.t } [@@deriving sexp_of] let invariant t = try assert (t.num_using_blocker >= 0); (* It is the case that if [t.num_using_blocker = 0] then [Option.is_none t.blocker], however the converse does not necessarily hold. The code in [with_blocker] doesn't take care to atomically increment [t.num_using_blocker] and set [t.blocker] to [Some]. It could, but doing so is not necessary for the correctness of of [with_blocker], which only relies on test-and-set of [t.blocker] to make sure there is an agreed-upon winner in the race to create a blocker. *) if t.num_using_blocker = 0 then assert (Uopt.is_none t.blocker) with | exn -> failwiths ~here:[%here] "invariant failed" (exn, t) [%sexp_of: exn * t] ;; let equal (t : t) t' = phys_equal t t' let create () = { id_of_thread_holding_lock = Thread_id_option.none ; num_using_blocker = 0 ; blocker = Uopt.none } ;; let is_locked t = Thread_id_option.is_some t.id_of_thread_holding_lock let current_thread_id () = Thread.id (Thread.self ()) let current_thread_has_lock t = Thread_id_option.equal t.id_of_thread_holding_lock (current_thread_id () |> Thread_id_option.some) ;; let[@cold] error_recursive_lock t = Error (Error.create "attempt to lock mutex by thread already holding it" (current_thread_id (), t) [%sexp_of: int * t]) ;; let try_lock t = (* The following code relies on an atomic test-and-set of [id_of_thread_holding_lock], so that there is a definitive winner in a race between multiple lockers and everybody agrees who acquired the lock. *) let current_thread_id = current_thread_id () |> Thread_id_option.some in (* BEGIN ATOMIC *) if Thread_id_option.is_none t.id_of_thread_holding_lock then ( t.id_of_thread_holding_lock <- current_thread_id; (* END ATOMIC *) Ok `Acquired) else if Thread_id_option.equal current_thread_id t.id_of_thread_holding_lock then error_recursive_lock t else Ok `Not_acquired ;; let try_lock_exn t = ok_exn (try_lock t) (* Marked with attributes so the allocation of [new_blocker] at the call site cannot be sunk down into the atomic section (there exists no barrier in OCaml right now to prevent this) *) let[@inline never] [@specialise never] [@local never] with_blocker0 t ~new_blocker = (* BEGIN ATOMIC *) if Uopt.is_some t.blocker then Uopt.unsafe_value t.blocker else ( t.blocker <- Uopt.some new_blocker; new_blocker) ;; (* END ATOMIC *) (* [with_blocker t f] runs [f blocker] in a critical section. It allocates a blocker for [t] if [t] doesn't already have one. *) let with_blocker t f = t.num_using_blocker <- t.num_using_blocker + 1; let blocker = match%optional.Uopt t.blocker with | Some blocker -> blocker | None -> (* We allocate [new_blocker] here because one cannot allocate inside an atomic region. *) let new_blocker = Blocker.create () in let blocker = (* We need the following test-and-set to be atomic so that there is a definitive winner in a race between multiple calls to [with_blocker], so that everybody agrees what the underlying [blocker] is. *) with_blocker0 t ~new_blocker in if not (phys_equal blocker new_blocker) then Blocker.save_unused new_blocker; blocker in protect ~f:(fun () -> Blocker.critical_section blocker ~f:(fun () -> f blocker) [@nontail]) ~finally:(fun () -> (* We need the following decrement-test-and-set to be atomic so that we're sure that the last user of blocker clears it. *) (* BEGIN ATOMIC *) t.num_using_blocker <- t.num_using_blocker - 1; if t.num_using_blocker = 0 then ( t.blocker <- Uopt.none; (* END ATOMIC *) Blocker.save_unused blocker)) [@nontail] ;; let rec lock t = (* The following code relies on an atomic test-and-set of [id_of_thread_holding_lock], so that there is a definitive winner in a race between multiple [lock]ers, and everybody agrees who acquired the lock. If [is_locked t], we block the locking thread using [Blocker.wait], until some unlocking thread [Blocker.signal]s us. There is a race between the [wait] and the [signal]. If the unlocking thread signals in between our test of [t.id_of_thread_holding_lock] and our [wait], then our [wait] could miss the signal and block forever. We avoid this race by committing to waiting inside a [with_blocker], which increments [t.num_using_blocker]. If the [signal] occurs before the [with_blocker], then it will have cleared [t.id_of_thread_holding_lock], which we will notice as [not (is_locked t)], and then not [wait], and loop trying to [lock] again. Otherwise, when an [unlock] occurs, it will see that [is_some t.blocker], and will enter a critical section on [blocker]. But then it must wait until our critical section on [blocker] finishes, and hence until our call to [wait] finishes. Hence, the [signal] will occur after the [wait]. The recursive call to [lock] will not spin. It happens either because we just lost the race with an unlocker, in which case the subsequent [lock] will succeed, or we actually had to block because someone is holding the lock. The latter is the overwhelmingly common case. Other threads can change [t.id_of_thread_holding_lock] concurrently with this code. However, no other thread can set it to our [current_thread_id], since threads only ever set [t.id_of_thread_holding_lock] to their current thread id, or clear it. *) let current_thread_id = current_thread_id () |> Thread_id_option.some in (* BEGIN ATOMIC *) if Thread_id_option.is_none t.id_of_thread_holding_lock then ( t.id_of_thread_holding_lock <- current_thread_id; (* END ATOMIC *) Ok ()) else if Thread_id_option.equal current_thread_id t.id_of_thread_holding_lock then error_recursive_lock t else ( with_blocker t (fun blocker -> if is_locked t then Blocker.wait blocker); lock t) ;; let lock_exn t = ok_exn (lock t) type message = { current_thread_id : int ; mutex : t } [@@deriving sexp_of] let[@cold] error_attempt_to_unlock_mutex_held_by_another_thread t = Error (Error.create "attempt to unlock mutex held by another thread" { current_thread_id = current_thread_id (); mutex = t } [%sexp_of: message]) ;; let[@cold] error_attempt_to_unlock_an_unlocked_mutex t = Error (Error.create "attempt to unlock an unlocked mutex" { current_thread_id = current_thread_id (); mutex = t } [%sexp_of: message]) ;; let unlock t = let current_thread_id = current_thread_id () in (* We need the following test-and-set to be atomic so that there is a definitive winner in a race between multiple unlockers, so that one unlock succeeds and the rest fail. *) (* BEGIN ATOMIC *) if Thread_id_option.is_some t.id_of_thread_holding_lock then if Thread_id_option.equal t.id_of_thread_holding_lock (current_thread_id |> Thread_id_option.some) then ( t.id_of_thread_holding_lock <- Thread_id_option.none; (* END ATOMIC *) if Uopt.is_some t.blocker then with_blocker t Blocker.signal; Ok ()) else error_attempt_to_unlock_mutex_held_by_another_thread t else error_attempt_to_unlock_an_unlocked_mutex t ;; let unlock_exn t = ok_exn (unlock t) let critical_section t ~f = lock_exn t; protect ~f ~finally:(fun () -> unlock_exn t) ;;
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>