package lockfree

  1. Overview
  2. Docs

Source file mpmc_relaxed_queue.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
(*
  # General idea
  
  It is the easiest to explain the general idea on an array of infinite size. 
  Let's start with that. Each element in such an array constitutes a single-use 
  exchange slot. Enqueuer increments [tail] and treats prior value as index of 
  its slot. Same for dequeuer and [head]. This effectively creates pairs 
  (enqueuer, dequeuer) assigned to the same slot. Enqueuer leaves the value in 
  the slot, dequer copies it out. 
   
  Enqueuer never fails. It always gets a brand-new slot and places item in it. 
  Dequeuer, on the other hand, may witness an empty slot. That's because [head] 
  may jump behind [tail]. Remember, indices are implemented blindy. For now, 
  assume dequeuer simply spins on the empty slot until an item appears. 

  That's it. There's a few things flowing from this construction: 
  * Slots are atomic. This is where paired enqueuer and dequeuer communicate. 
  * [head] overshooting [tail] is a normal condition and that's good - we want 
  to keep operations on [head] and [tail] independent.

  # Finite array

  Now, to make it work in real-world, simply treat finite array as circular, 
  i.e. wrap around when reached the end. Slots are now re-used, so we need to be 
  more careful. 
  
  Firstly, if there's too many items, enqueuer may witness a full slot. Let's assume 
  enqueuer simply spins on full slot until some dequeuer appears and takes the old
  value.
  
  Secondly, in the case of overlap, there can be more than 2 threads (1x enqueuer, 
  1x dequeuer) assigned to a single slot (imagine 10 enqueuers spinning on an 8-slot 
  array). In fact, it could be any number. Thus, all operations on slot have to use 
  CAS to ensure that no item is overwrriten on store and no item is dequeued by two 
  threads at once. 

  Above works okay in practise, and there is some relevant literature, e.g.  
  (DOI: 10.1145/3437801.3441583) analyzed this particular design. There's also 
  plenty older papers looking at similar approaches 
  (e.g. DOI: 10.1145/2851141.2851168). 

  Note, this design may violate FIFO (on overlap). The risk can be minimized by 
  ensuring size of array >> number of threads but it's never zero.
  (github.com/rigtorp/MPMCQueue has a nice way of fixing this, we could add it).

  # Blocking (non-lockfree paths on full, empty)

  Up until now [push] and [pop] were allowed to block indefinitely on empty and full 
  queue. Overall, what can be done in those states?

  1. Busy wait until able to finish.
  2. Rollback own index with CAS (unassign itself from slot).
  3. Move forward other index with CAS (assign itself to the same slot as opposite 
  action).
  4. Mark slot as burned - dequeue only.

  Which one then?

  Let's optimize for stability, i.e. some reasonable latency that won't get much worse
  under heavy load. Busy wait is great because it does not cause any contention in the
  hotspots ([head], [tail]). Thus, start with busy wait (1). If queue is busy and 
  moving fast, there is a fair chance that within, say, 30 spins, we'll manage to 
  complete action without having to add contention elsewhere. 
  
  Once N busy-loops happen and nothing changes, we probably want to return even if its 
  costs. (2), (3) both allow that. (2) doesn't add contention to the other index like 
  (3) does. Say, there's a lot more dequeuers than enqueuers, if all dequeurs did (3), 
  they would add a fair amount of contention to the [tail] index and slow the 
  already-outnumbered enqueuers further. So, (2) > (3) for that reason.

  However, with just (2), some dequeuers will struggle to return. If many dequeuers 
  constatly try to pop an element and fail, they will form a chain.

   tl                 hd
   |                  |
  [.]-[A]-[B]-[C]-..-[X]

  For A to rollback, B has to rollback first. For B to rollback C has to rollback first.

  [A] is likely to experience a large latency spike. In such a case, it is easier for [A]
  to do (3) rather than hope all other active dequeuers will unblock it at some point. 
  Thus, it's worthwile also trying to do (3) periodically.

  Thus, the current policy does (1) for a bit, then (1), (2) with periodic (3). 

  What about burned slots (4)?

  It's present in the literature. Weakly I'm not a fan. If dequeuers are faster to remove 
  items than enqueuers supply them, slots burned by dequeuers are going to make enqueuers 
  do even more work.

  # Resizing 

  The queue does not support resizing, but it can be simulated by wrapping it in a 
  lockfree list. 
*)

type 'a t = {
  array : 'a Option.t Atomic.t Array.t;
  head : int Atomic.t;
  tail : int Atomic.t;
  mask : int;
}

let create ~size_exponent () : 'a t =
  let size = 1 lsl size_exponent in
  let array = Array.init size (fun _ -> Atomic.make None) in
  let mask = size - 1 in
  let head = Atomic.make 0 in
  let tail = Atomic.make 0 in
  { array; head; tail; mask }

(* [ccas] A slightly nicer CAS. Tries without taking microarch lock first. Use on indices. *)
let ccas cell seen v =
  if Atomic.get cell != seen then false else Atomic.compare_and_set cell seen v

module Spin = struct
  let push { array; tail; mask; _ } item =
    let tail_val = Atomic.fetch_and_add tail 1 in
    let index = tail_val land mask in
    let cell = Array.get array index in
    while not (ccas cell None (Some item)) do
      Domain.cpu_relax ()
    done

  let pop { array; head; mask; _ } =
    let head_val = Atomic.fetch_and_add head 1 in
    let index = head_val land mask in
    let cell = Array.get array index in
    let item = ref (Atomic.get cell) in
    while Option.is_none !item || not (ccas cell !item None) do
      Domain.cpu_relax ();
      item := Atomic.get cell
    done;
    Option.get !item
end

module Not_lockfree = struct
  (* [spin_threshold] Number of times on spin on a slot before trying an exit strategy. *)
  let spin_threshold = 30

  (* [try_other_exit_every_n] There is two strategies that push/pop can take to fix state (
     to be able to return without completion). Generally, we want to try to do "rollback" more
     than "push forward", as the latter adds contention to the side that might already not
     be keeping up. *)
  let try_other_exit_every_n = 10
  let time_to_try_push_forward n = n mod try_other_exit_every_n == 0

  let push { array; tail; head; mask; _ } item =
    let tail_val = Atomic.fetch_and_add tail 1 in
    let index = tail_val land mask in
    let cell = Array.get array index in

    (* spin for a bit *)
    let i = ref 0 in
    while
      !i < spin_threshold && not (Atomic.compare_and_set cell None (Some item))
    do
      i := !i + 1
    done;

    (* define clean up function *)
    let rec take_or_rollback nth_attempt =
      if Atomic.compare_and_set cell None (Some item) then
        (* succedded to push *)
        true
      else if ccas tail (tail_val + 1) tail_val then (* rolled back tail *)
        false
      else if
        time_to_try_push_forward nth_attempt && ccas head tail_val (tail_val + 1)
      then (* pushed forward head *)
        false
      else (* retry *)
        take_or_rollback (nth_attempt + 1)
    in

    (* if succeeded return true otherwise clean up *)
    if !i < spin_threshold then true else take_or_rollback 0

  let take_item cell =
    let value = Atomic.get cell in
    if Option.is_some value && Atomic.compare_and_set cell value None then value
    else None

  let pop queue =
    let ({ array; head; tail; mask; _ } : 'a t) = queue in
    let head_value = Atomic.get head in
    let tail_value = Atomic.get tail in
    if head_value - tail_value >= 0 then None
    else
      let old_head = Atomic.fetch_and_add head 1 in
      let cell = Array.get array (old_head land mask) in

      (* spin for a bit *)
      let i = ref 0 in
      let item = ref None in
      while !i < spin_threshold && not (Option.is_some !item) do
        item := take_item cell;
        i := !i + 1
      done;

      (* define clean up function *)
      let rec take_or_rollback nth_attempt =
        let value = Atomic.get cell in
        if Option.is_some value && Atomic.compare_and_set cell value None then
          (* dequeued an item, return it *)
          value
        else if ccas head (old_head + 1) old_head then
          (* rolled back head *)
          None
        else if
          time_to_try_push_forward nth_attempt
          && ccas tail old_head (old_head + 1)
        then (* pushed tail forward *)
          None
        else take_or_rollback (nth_attempt + 1)
      in

      (* return if got item, clean up otherwise *)
      if Option.is_some !item then !item else take_or_rollback 0

  module CAS_interface = struct
    let rec push ({ array; tail; head; mask; _ } as t) item =
      let tail_val = Atomic.get tail in
      let head_val = Atomic.get head in
      let size = mask + 1 in
      if tail_val - head_val >= size then false
      else if ccas tail tail_val (tail_val + 1) then (
        let index = tail_val land mask in
        let cell = Array.get array index in
        (*
          Given that code above checks for overlap, is this CAS needed?

          Yes. Even though a thread cannot explicitely enter overlap,
          it can still occur just because enqueuer may theoretically be
          unscheduled for unbounded amount of time between incrementing
          index and filling the slot.

          I doubt we'd observe that case in real-life (outside some
          extreme circumstances), but this optimization has to be left
          for the user to decide. After all, algorithm would not pass
          model-checking without it.

          Incidentally, it also makes this method interoperable with
          standard interface.
        *)
        while not (Atomic.compare_and_set cell None (Some item)) do
          ()
        done;
        true)
      else push t item

    let rec pop ({ array; tail; head; mask; _ } as t) =
      let tail_val = Atomic.get tail in
      let head_val = Atomic.get head in
      if head_val - tail_val >= 0 then None
      else if ccas head head_val (head_val + 1) then (
        let index = head_val land mask in
        let cell = Array.get array index in
        let item = ref (Atomic.get cell) in
        while
          not (Option.is_some !item && Atomic.compare_and_set cell !item None)
        do
          item := Atomic.get cell
        done;
        !item)
      else pop t
  end
end
OCaml

Innovation. Community. Security.