package amqp-client-async

  1. Overview
  2. Docs

Source file channel.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
open Thread
open Spec
open Amqp_client_lib

type no_confirm = [ `Ok ]
type with_confirm = [ `Ok | `Failed ]


type _ confirms =
  | No_confirm: no_confirm confirms
  | With_confirm: with_confirm confirms

let no_confirm = No_confirm
let with_confirm = With_confirm

type on_cancel = unit -> unit
type consumer = Basic.Deliver.t * Basic.Content.t * string -> unit
type consumers = (string, consumer * on_cancel) Hashtbl.t

type result = Delivered | Rejected | Undeliverable
type message_info = { delivery_tag: int;
                      routing_key: string;
                      exchange_name: string;
                      result_handler: result -> unit;
                    }

type publish_confirm = { mutable message_count: int;
                         unacked: message_info Mlist.t }

type _ pcp =
  | Pcp_no_confirm: no_confirm pcp
  | Pcp_with_confirm: publish_confirm -> with_confirm pcp

type return_handler = (Basic.Return.t * (Basic.Content.t * string)) option -> unit

type 'a t = { framing: Framing.t;
              channel_no: int;
              consumers: consumers;
              id: string;
              mutable counter: int;
              publish_confirm: 'a pcp;
              mutable return_handlers: return_handler list;
              mutable closed: unit Ivar.t option;
            }

let channel { framing; channel_no; _ } = (framing, channel_no)

module Internal = struct
  let next_counter t =
    t.counter <- t.counter + 1;
    t.counter

  let unique_id t =
    Printf.sprintf "%s.%d" t.id (next_counter t)

  let register_deliver_handler t =
    let open Basic in
    let handler (deliver, (content, data)) =
      match Hashtbl.find t.consumers deliver.Deliver.consumer_tag with
      | handler, _ ->
        handler (deliver, content, data);
        (* Keep the current handler *)
      | exception Not_found ->
        failwith ("No consumers for: " ^ deliver.Deliver.consumer_tag)
    in
    let read = snd Deliver.Internal.read in
    read ~once:false handler (channel t)

  let register_consumer_handler t consumer_tag handler on_cancel =
    if Hashtbl.mem t.consumers consumer_tag then raise Types.Busy;
    Hashtbl.add t.consumers consumer_tag (handler, on_cancel)

  let deregister_consumer_handler t consumer_tag =
    Hashtbl.remove t.consumers consumer_tag

  let set_result ivar = function
    | Delivered ->
      Ivar.fill ivar `Ok
    | Rejected ->
      Ivar.fill ivar `Failed
    | Undeliverable ->
      Ivar.fill ivar `Failed

  (* Need to add if we should expect returns also. *)
  let wait_for_confirm: type a. a t -> routing_key:string -> exchange_name:string -> a Deferred.t = fun t ~routing_key ~exchange_name ->
    match t.publish_confirm with
    | Pcp_with_confirm t ->
      let var = Ivar.create () in
      let result_handler = set_result var in
      t.message_count <- t.message_count + 1;
      let delivery_tag = t.message_count in
      Mlist.append t.unacked {delivery_tag; routing_key; exchange_name; result_handler};
      (Ivar.read var : [`Ok | `Failed] Deferred.t)
    | Pcp_no_confirm -> return `Ok
end

let close_handler t channel_no close =
  Log.info "Channel closed: %d" channel_no;
  Log.info "Reply code: %d\n" close.Channel.Close.reply_code;
  Log.info "Reply text: %s\n" close.Channel.Close.reply_text;
  Log.info "Message: (%d, %d)\n" close.Channel.Close.class_id close.Channel.Close.method_id;
  match t.closed with
  | None -> raise (Types.Channel_closed channel_no)
  | Some ivar ->
    Ivar.fill ivar ();
    return ()

let consumer_cancel_handler t (cancel : Basic.Cancel.t) =
  let consumer_tag = cancel.Basic.Cancel.consumer_tag in
  match Hashtbl.find t.consumers consumer_tag with
  | _, on_cancel ->
    Hashtbl.remove t.consumers consumer_tag;
    on_cancel ()
  | exception Not_found ->
    failwith
      ("Cannot cancel consumer, as no handler was found for consumer: " ^ consumer_tag)

let register_flow_handler t =
  let (_, read) = Channel.Flow.Internal.read in
  let handler { Channel.Flow.active } =
    Framing.set_flow t.framing t.channel_no active;
    spawn (Channel.Flow_ok.Internal.write (channel t) { Channel.Flow_ok.active })
  in
  read ~once:false handler (channel t)

let handle_confirms channel t =
  let confirm multiple result tag =
    let results = match multiple with
      | true -> Mlist.take_while ~pred:(fun m -> m.delivery_tag <= tag) t.unacked
      | false -> Mlist.take ~pred:(fun m -> m.delivery_tag = tag) t.unacked
                 |> Option.map_default ~f:(fun v -> [v]) ~default:[]
    in
    List.iter (fun m -> m.result_handler result) results
  in

  let return_handler = function
    | Some (r, _) -> begin
        let pred message =
          message.routing_key = r.Basic.Return.routing_key && message.exchange_name = r.Basic.Return.exchange
        in

        match Mlist.take ~pred t.unacked with
        | None -> Log.error "No messages found to mark as undeliverable. This would indicate a library error"
        | Some m -> m.result_handler Undeliverable
      end
    | None -> ()
  in

  let open Basic in
  let read_ack = snd Ack.Internal.read in
  let read_reject = snd Reject.Internal.read in
  read_ack ~once:false (fun m -> confirm m.Ack.multiple Delivered m.Ack.delivery_tag) channel;
  read_reject ~once:false (fun m -> confirm false Rejected m.Reject.delivery_tag) channel;

  Confirm.Select.request channel { Confirm.Select.nowait = false } >>= fun () ->
  return return_handler

let register_return_handler t =
  let (_, read) = Basic.Return.Internal.read in
  let handler m = List.iter (fun h -> h (Some m)) t.return_handlers in
  read ~once:false handler (channel t)


let create: type a. id:string -> a confirms -> Framing.t -> Framing.channel_no -> a t Deferred.t = fun ~id confirm_type framing channel_no ->
  let consumers = Hashtbl.create 0 in
  let id = Printf.sprintf "%s.%s.%d" (Framing.id framing) id channel_no in
  Framing.open_channel framing channel_no >>= fun () ->
  let publish_confirm : a pcp = match confirm_type with
    | With_confirm ->
      Pcp_with_confirm { message_count = 0; unacked = Mlist.create () }
    | No_confirm ->
      Pcp_no_confirm
  in
  let t =
    { framing; channel_no; consumers; id; counter = 0;
      publish_confirm; return_handlers = []; closed = None; }
  in

  spawn (Channel.Close.reply (framing, channel_no) (close_handler t channel_no));
  let (_, read) = Basic.Cancel.Internal.read in
  read ~once:false (consumer_cancel_handler t) (framing, channel_no);

  Channel.Open.request (framing, channel_no) () >>= fun () ->

  begin match publish_confirm with
  | Pcp_with_confirm t ->
    handle_confirms (framing, channel_no) t >>= fun return_handler ->
    return [return_handler]
  | Pcp_no_confirm ->
    return []
  end >>= fun return_handlers ->
  t.return_handlers <- return_handlers;
  Internal.register_deliver_handler t;

  register_flow_handler t;
  register_return_handler t;
  return t

let close { framing; channel_no; return_handlers; _ } =
  let open Channel.Close in
  request (framing, channel_no)
    { reply_code=200;
      reply_text="Closed on user request";
      class_id=0;
      method_id=0; } >>= fun () ->
  Framing.close_channel framing channel_no >>= fun () ->
  List.iter (fun h -> h None) return_handlers;
  return ()

let on_return t =
  let reader, writer = Pipe.create () in
  let handler = function
    | Some m -> Pipe.write_without_pushback writer m
    | None -> Pipe.close_without_pushback writer
  in
  t.return_handlers <- handler :: t.return_handlers;
  reader

let on_closed t =
  let ivar =
    match t.closed with
    | Some ivar ->
      ivar
    | None ->
      let ivar = Ivar.create () in
      t.closed <- Some ivar;
      ivar
  in
  Ivar.read ivar

let flush t =
  Framing.flush_channel t.framing t.channel_no

let id t = t.id

let channel_no t = t.channel_no

let set_prefetch ?(count=0) ?(size=0) t =
  Basic.Qos.request (channel t) { Basic.Qos.prefetch_count=count;
                                  prefetch_size=size;
                                  global=false }

let set_global_prefetch ?(count=0) ?(size=0) t =
  Basic.Qos.request (channel t) { Basic.Qos.prefetch_count=count;
                                  prefetch_size=size;
                                  global=true }

module Transaction = struct
  type tx = EChannel: _ t -> tx

  open Spec.Tx
  let start t =
    Select.request (channel t) () >>= fun () ->
    return (EChannel t)

  let commit (EChannel t) =
    Commit.request (channel t) ()

  let rollback (EChannel t) =
    Rollback.request (channel t) ()
end
OCaml

Innovation. Community. Security.