package eio

  1. Overview
  2. Docs

Source file lf_queue.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
(* A lock-free multi-producer, single-consumer, thread-safe queue without support for cancellation.
   This makes a good data structure for a scheduler's run queue.

   See: "Implementing lock-free queues"
   https://people.cs.pitt.edu/~jacklange/teaching/cs2510-f12/papers/implementing_lock_free.pdf
   
   It is simplified slightly because we don't need multiple consumers.
   Therefore [head] is not atomic. *)

exception Closed

module Node : sig
  type 'a t = {
    next : 'a opt Atomic.t;
    mutable value : 'a;
  }
  and +'a opt

  val make : next:'a opt -> 'a -> 'a t

  val none : 'a opt
  (** [t.next = none] means that [t] is currently the last node. *)

  val closed : 'a opt
  (** [t.next = closed] means that [t] will always be the last node. *)

  val some : 'a t -> 'a opt
  val fold : 'a opt -> none:(unit -> 'b) -> some:('a t -> 'b) -> 'b
end = struct
  (* https://github.com/ocaml/RFCs/pull/14 should remove the need for magic here *)

  type +'a opt  (* special | 'a t *)

  type 'a t = {
    next : 'a opt Atomic.t;
    mutable value : 'a;
  }

  type special =
    | Nothing
    | Closed

  let none : 'a. 'a opt = Obj.magic Nothing
  let closed : 'a. 'a opt = Obj.magic Closed
  let some (t : 'a t) : 'a opt = Obj.magic t

  let fold (opt : 'a opt) ~none:n ~some =
    if opt == none then n ()
    else if opt == closed then raise Closed
    else some (Obj.magic opt : 'a t)

  let make ~next value = { value; next = Atomic.make next }
end

type 'a t = {
  tail : 'a Node.t Atomic.t;
  mutable head : 'a Node.t;
}
(* [head] is the last node dequeued (or a dummy node, initially).
   [head.next] gives the real first node, if not [Node.none].
   If [tail.next] is [none] then it is the last node in the queue.
   Otherwise, [tail.next] is a node that is closer to the tail. *)

let push t x =
  let node = Node.(make ~next:none) x in
  let rec aux () =
    let p = Atomic.get t.tail in
    (* While [p.next == none], [p] is the last node in the queue. *)
    if Atomic.compare_and_set p.next Node.none (Node.some node) then (
      (* [node] has now been added to the queue (and possibly even consumed).
         Update [tail], unless someone else already did it for us. *)
      ignore (Atomic.compare_and_set t.tail p node : bool)
    ) else (
      (* Someone else added a different node first ([p.next] is not [none]).
         Make [t.tail] more up-to-date, if it hasn't already changed, and try again. *)
      Node.fold (Atomic.get p.next)
        ~none:(fun () -> assert false)
        ~some:(fun p_next ->
            ignore (Atomic.compare_and_set t.tail p p_next : bool);
            aux ()
          )
    )
  in
  aux ()

let rec push_head t x =
  let p = t.head in
  let next = Atomic.get p.next in
  if next == Node.closed then raise Closed;
  let node = Node.make ~next x in
  if Atomic.compare_and_set p.next next (Node.some node) then (
    (* We don't want to let [tail] get too far behind, so if the queue was empty, move it to the new node. *)
    if next == Node.none then (
      ignore (Atomic.compare_and_set t.tail p node : bool);
    ) else (
      (* If the queue wasn't empty, there's nothing to do.
         Either tail isn't at head or there is some [push] thread working to update it.
         Either [push] will update it directly to the new tail, or will update it to [node]
         and then retry. Either way, it ends up at the real tail. *)
    )
  ) else (
    (* Someone else changed it first. This can only happen if the queue was empty. *)
    assert (next == Node.none);
    push_head t x
  )

let rec close (t:'a t) =
  (* Mark the tail node as final. *)
  let p = Atomic.get t.tail in
  if not (Atomic.compare_and_set p.next Node.none Node.closed) then (
    (* CAS failed because [p] is no longer the tail (or is already closed). *)
    Node.fold (Atomic.get p.next)
      ~none:(fun () -> assert false)    (* Can't switch from another state to [none] *)
      ~some:(fun p_next ->
          (* Make [tail] more up-to-date if it hasn't changed already *)
          ignore (Atomic.compare_and_set t.tail p p_next : bool);
          (* Retry *)
          close t
        )
  )

let pop t =
  let p = t.head in
  (* [p] is the previously-popped item. *)
  let node = Atomic.get p.next in
  Node.fold node
    ~none:(fun () -> None)
    ~some:(fun node ->
        t.head <- node;
        let v = node.value in
        node.value <- Obj.magic ();         (* So it can be GC'd *)
        Some v
      )

let is_empty t =
  Node.fold (Atomic.get t.head.next)
    ~none:(fun () -> true)
    ~some:(fun _ -> false)

let create () =
  let dummy = { Node.value = Obj.magic (); next = Atomic.make Node.none } in
  { tail = Atomic.make dummy; head = dummy }
OCaml

Innovation. Community. Security.