package cohttp-async

  1. Overview
  2. Docs

Source file client.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
open Base
open Async_kernel
open Async_unix

module Net = struct
  let lookup uri =
    let host = Uri.host_with_default ~default:"localhost" uri in
    match Uri_services.tcp_port_of_uri ~default:"http" uri with
    | None ->
        Deferred.Or_error.error_string
          "Net.lookup: failed to get TCP port form Uri"
    | Some port -> (
        let open Unix in
        Addr_info.get ~host
          [ Addr_info.AI_FAMILY PF_INET; Addr_info.AI_SOCKTYPE SOCK_STREAM ]
        >>| function
        | { Addr_info.ai_addr = ADDR_INET (addr, _); _ } :: _ ->
            Or_error.return (host, Ipaddr_unix.of_inet_addr addr, port)
        | _ -> Or_error.error "Failed to resolve Uri" uri Uri_sexp.sexp_of_t)

  let connect_uri ?interrupt ?ssl_config uri =
    (match Uri.scheme uri with
    | Some "httpunix" ->
        let host = Uri.host_with_default ~default:"localhost" uri in
        return @@ `Unix_domain_socket host
    | _ -> (
        lookup uri |> Deferred.Or_error.ok_exn >>= fun (host, addr, port) ->
        return
        @@
        match (Uri.scheme uri, ssl_config) with
        | Some "https", Some config -> `OpenSSL (addr, port, config)
        | Some "https", None ->
            let config = Conduit_async.V2.Ssl.Config.create ~hostname:host () in
            `OpenSSL (addr, port, config)
        | _ -> `TCP (addr, port)))
    >>= fun mode ->
    Conduit_async.V2.connect ?interrupt mode >>| fun (r, w) ->
    (Input_channel.create r, w)
end

let read_response ic =
  Io.Response.read ic >>| function
  | `Eof -> failwith "Connection closed by remote host"
  | `Invalid reason -> failwith reason
  | `Ok res -> (
      match Cohttp.Response.has_body res with
      | `Yes | `Unknown ->
          (* Build a response pipe for the body *)
          let reader = Io.Response.make_body_reader res ic in
          let pipe =
            Body.Private.pipe_of_body Io.Response.read_body_chunk reader
          in
          (res, pipe)
      | `No ->
          let pipe = Pipe.of_list [] in
          (res, pipe))

let request ?interrupt ?ssl_config ?uri ?(body = `Empty) req =
  (* Connect to the remote side *)
  let uri = match uri with Some t -> t | None -> Cohttp.Request.uri req in
  Net.connect_uri ?interrupt ?ssl_config uri >>= fun (ic, oc) ->
  try_with (fun () ->
      Io.Request.write ~flush:false
        (fun writer ->
          Body.Private.write_body Io.Request.write_body body writer)
        req oc
      >>= fun () ->
      read_response ic >>| fun (resp, body) ->
      don't_wait_for
        ( Pipe.closed body >>= fun () ->
          Deferred.all_unit [ Input_channel.close ic; Writer.close oc ] );
      (resp, `Pipe body))
  >>= function
  | Ok res -> return res
  | Error e ->
      don't_wait_for (Input_channel.close ic);
      don't_wait_for (Writer.close oc);
      raise e

module Connection = struct
  type t' = { ic : Input_channel.t; oc : Writer.t }

  (* we can't send concurrent requests over HTTP/1 *)
  type t = t' Sequencer.t

  let connect ?interrupt ?ssl_config uri =
    Net.connect_uri ?interrupt ?ssl_config uri >>| fun (ic, oc) ->
    let t = { ic; oc } |> Sequencer.create ~continue_on_error:false in
    Throttle.at_kill t (fun { ic; oc } ->
        Deferred.both (Writer.close oc) (Input_channel.close ic)
        >>| fun ((), ()) -> ());
    Deferred.any [ Writer.consumer_left oc; Input_channel.close_finished ic ]
    >>| (fun () -> Throttle.kill t)
    |> don't_wait_for;
    t

  let close t =
    Throttle.kill t;
    Throttle.cleaned t

  let close_finished t = Throttle.cleaned t
  let is_closed t = Throttle.is_dead t

  let request ?(body = Body.empty) t req =
    let res = Ivar.create () in
    Throttle.enqueue t (fun { ic; oc } ->
        Io.Request.write ~flush:false
          (fun writer ->
            Body.Private.write_body Io.Request.write_body body writer)
          req oc
        >>= fun () ->
        read_response ic >>= fun (resp, body) ->
        Ivar.fill_exn res (resp, `Pipe body);
        (* block starting any more requests until the consumer has finished reading this request *)
        Pipe.closed body)
    |> don't_wait_for;
    Ivar.read res
end

let callv ?interrupt ?ssl_config uri reqs =
  Connection.connect ?interrupt ?ssl_config uri >>| fun connection ->
  let responses =
    Pipe.map' ~max_queue_length:1 reqs ~f:(fun reqs ->
        Deferred.Queue.map ~how:`Sequential reqs ~f:(fun (req, body) ->
            Connection.request ~body connection req))
  in
  Pipe.closed responses
  >>= (fun () -> Connection.close connection)
  |> don't_wait_for;
  responses

let call ?interrupt ?ssl_config ?headers ?(chunked = false) ?(body = `Empty)
    meth uri =
  (* Create a request, then make the request. Figure out an appropriate
     transfer encoding *)
  (match chunked with
  | false ->
      Body.Private.disable_chunked_encoding body >>| fun (body, body_length) ->
      ( Cohttp.Request.make_for_client ?headers ~chunked ~body_length meth uri,
        body )
  | true ->
      Deferred.return
        (match Body.is_empty body with
        | `True ->
            (* Don't used chunked encoding with an empty body *)
            ( Cohttp.Request.make_for_client ?headers ~chunked:false
                ~body_length:0L meth uri,
              body )
        | `Unknown | `False ->
            (* Use chunked encoding if there is a body *)
            ( Cohttp.Request.make_for_client ?headers ~chunked:true meth uri,
              body )))
  >>= fun (req, body) -> request ?interrupt ?ssl_config ~body ~uri req

let get ?interrupt ?ssl_config ?headers uri =
  call ?interrupt ?ssl_config ?headers ~chunked:false `GET uri

let head ?interrupt ?ssl_config ?headers uri =
  call ?interrupt ?ssl_config ?headers ~chunked:false `HEAD uri
  >>| fun (res, body) ->
  (match body with `Pipe p -> Pipe.close_read p | _ -> ());
  res

let post ?interrupt ?ssl_config ?headers ?(chunked = false) ?body uri =
  call ?interrupt ?ssl_config ?headers ~chunked ?body `POST uri

let post_form ?interrupt ?ssl_config ?headers ~params uri =
  let headers =
    Cohttp.Header.add_opt_unless_exists headers "content-type"
      "application/x-www-form-urlencoded"
  in
  let body = Body.of_string (Uri.encoded_of_query params) in
  post ?interrupt ?ssl_config ~headers ~chunked:false ~body uri

let put ?interrupt ?ssl_config ?headers ?(chunked = false) ?body uri =
  call ?interrupt ?ssl_config ?headers ~chunked ?body `PUT uri

let patch ?interrupt ?ssl_config ?headers ?(chunked = false) ?body uri =
  call ?interrupt ?ssl_config ?headers ~chunked ?body `PATCH uri

let delete ?interrupt ?ssl_config ?headers ?(chunked = false) ?body uri =
  call ?interrupt ?ssl_config ?headers ~chunked ?body `DELETE uri
OCaml

Innovation. Community. Security.