package amqp-client-async

  1. Overview
  2. Docs
Amqp client library, async version

Install

Dune Dependency

Authors

Maintainers

Sources

2.3.0.tar.gz
md5=9db83accd0dfa9231c3f2ca0de9c8d9f
sha512=921c3f4d0d655dc5caa5c89fe8c4309a6e22d91167676062e0e73f3007b0b5de20e7b461aefdddca6dbdf716d57d90eaefb7e974ae218cce0f0a20fb461c965d

doc/src/amqp-client-async/exchange.ml.html

Source file exchange.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
open Thread
open Amqp_client_lib
open Spec.Exchange

(* type match_type = Any | All *)

type _ exchange_type =
  | Direct: [`Queue of string] exchange_type
  | Fanout: unit exchange_type
  | Topic:  [`Topic of string] exchange_type
  | Match:  [`Headers of Types.header list] exchange_type

let direct_t = Direct
let fanout_t = Fanout
let topic_t = Topic
let match_t = Match

type 'a t = { name : string;
              exchange_type: 'a exchange_type }

(** Predefined Default exchange *)
let default    = { name=""; exchange_type = Direct }

(** Predefined Direct exchange *)
let amq_direct = { name = "amq.direct"; exchange_type = Direct }

(** Predefined Fanout exchange *)
let amq_fanout = { name = "amq.fanout";  exchange_type = Fanout }

(** Predefined topic exchange *)
let amq_topic  = { name = "amq.topic"; exchange_type = Topic }

(** Predefined match (header) exchange *)
let amq_match = { name = "amq.match"; exchange_type = Match }

let string_of_exchange_type: type a. a exchange_type -> string  = function
  | Direct -> "direct"
  | Fanout -> "fanout"
  | Topic -> "topic"
  | Match -> "match"

module Internal = struct
  let bind_queue: type a. _ Channel.t -> a t -> string -> a -> unit Deferred.t =
    let open Spec.Queue in
    fun channel { name; exchange_type} queue ->
      let bind ?(routing_key="") ?(arguments=[]) () =
        let query = { Bind.queue;
                      exchange = name;
                      routing_key;
                      no_wait = false;
                      arguments;
                    }
        in
        Bind.request (Channel.channel channel) query
      in
      match exchange_type with
      | Direct -> fun (`Queue routing_key) -> bind ~routing_key ()
      | Fanout -> fun () -> bind ()
      | Topic -> fun (`Topic routing_key) -> bind ~routing_key ()
      | Match -> fun (`Headers arguments) -> bind ~arguments ()

  let unbind_queue: type a. _ Channel.t -> a t -> string -> a -> unit Deferred.t =
    let open Spec.Queue in
    fun channel { name; exchange_type} queue ->
      let unbind ?(routing_key="") ?(arguments=[]) () =
        let query = { Unbind.queue;
                      exchange = name;
                      routing_key;
                      arguments;
                    }
        in
        Unbind.request (Channel.channel channel) query
      in
      match exchange_type with
      | Direct -> fun (`Queue routing_key) -> unbind ~routing_key ()
      | Fanout -> fun () -> unbind ()
      | Topic -> fun (`Topic routing_key) -> unbind ~routing_key ()
      | Match -> fun (`Headers arguments) -> unbind ~arguments ()
end


let declare: type a. ?passive:bool -> ?durable:bool -> ?auto_delete:bool -> ?internal:bool ->
  _ Channel.t -> a exchange_type -> ?arguments:Types.table -> string -> a t Deferred.t =
  fun ?(passive=false) ?(durable=false) ?(auto_delete=false) ?(internal=false)
    channel exchange_type ?(arguments=[]) name ->
    Declare.request (Channel.channel channel)
      { Declare.exchange = name;
        amqp_type = (string_of_exchange_type exchange_type);
        passive;
        durable;
        auto_delete;
        internal;
        no_wait = false;
        arguments; } >>= fun () ->
    return { name; exchange_type }

let delete ?(if_unused=false) channel t =
  Delete.request (Channel.channel channel)
    { Delete.exchange = t.name;
      if_unused;
      no_wait = false;
    }

let bind: type a. _ Channel.t -> destination:_ t -> source:a t -> a -> unit Deferred.t=
  fun channel ~destination ~source ->
    let bind ?(routing_key="") ?(arguments=[]) () =
      let query = { Bind.destination = destination.name;
                    source = source.name;
                    routing_key;
                    no_wait = false;
                    arguments;
                  }
      in
      Bind.request (Channel.channel channel) query
    in
    match source.exchange_type with
    | Direct -> fun (`Queue routing_key) -> bind ~routing_key ()
    | Fanout -> fun () -> bind ()
    | Topic -> fun (`Topic routing_key) -> bind ~routing_key ()
    | Match -> fun (`Headers arguments) -> bind ~arguments ()

let unbind: type a. _ Channel.t -> destination:_ t -> source:a t -> a -> unit Deferred.t=
  fun channel ~destination ~source ->
    let unbind ?(routing_key="") ?(arguments=[]) () =
      let query = { Unbind.destination = destination.name;
                    source = source.name;
                    routing_key;
                    no_wait = false;
                    arguments;
                  }
      in
      Unbind.request (Channel.channel channel) query
    in
    match source.exchange_type with
    | Direct -> fun (`Queue routing_key) -> unbind ~routing_key ()
    | Fanout -> fun () -> unbind ()
    | Topic -> fun (`Topic routing_key) -> unbind ~routing_key ()
    | Match -> fun (`Headers arguments) -> unbind ~arguments ()

let publish channel t
    ?(mandatory=false)
    ~routing_key
    (header, body) =

  let open Spec.Basic in
  let header = match header.Content.app_id with
    | Some _ -> header
    | None -> { header with Content.app_id = Some (Channel.id channel) }
  in
  let wait_for_confirm = Channel.Internal.wait_for_confirm channel in
  Publish.request (Channel.channel channel)
    ({Publish.exchange = t.name;
      routing_key;
      mandatory;
      immediate=false},
     header, body) >>= fun () ->
  wait_for_confirm ~routing_key ~exchange_name:t.name

let name t = t.name
OCaml

Innovation. Community. Security.