Source file paf_cohttp.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
let ( <.> ) f g x = f (g x)
let src = Logs.Src.create "paf-cohttp"
module Log = (val Logs.src_log src : Logs.LOG)
let scheme = Mimic.make ~name:"paf-scheme"
let port = Mimic.make ~name:"paf-port"
let domain_name = Mimic.make ~name:"paf-domain-name"
let ipaddr = Mimic.make ~name:"paf-ipaddr"
type ctx = Mimic.ctx
let default_ctx = Mimic.empty
let httpaf_config = Mimic.make ~name:"httpaf-config"
let error_handler mvar err = Lwt.async @@ fun () -> Lwt_mvar.put mvar err
let response_handler mvar pusher resp body =
let on_eof () = pusher None in
let rec on_read buf ~off ~len =
let str = Bigstringaf.substring buf ~off ~len in
pusher (Some str) ;
Httpaf.Body.schedule_read ~on_eof ~on_read body in
Httpaf.Body.schedule_read ~on_eof ~on_read body ;
Lwt.async @@ fun () -> Lwt_mvar.put mvar resp
let rec unroll body stream =
let open Lwt.Infix in
Lwt_stream.get stream >>= function
| Some str ->
Log.debug (fun m -> m "Transmit to HTTP/AF: %S." str) ;
Httpaf.Body.write_string body str ;
unroll body stream
| None ->
Log.debug (fun m -> m "Close the HTTP/AF writer.") ;
Httpaf.Body.close_writer body ;
Lwt.return_unit
let transmit cohttp_body httpaf_body =
match cohttp_body with
| `Empty -> Httpaf.Body.close_writer httpaf_body
| `String str ->
Httpaf.Body.write_string httpaf_body str ;
Httpaf.Body.close_writer httpaf_body
| `Strings sstr ->
List.iter (Httpaf.Body.write_string httpaf_body) sstr ;
Httpaf.Body.close_writer httpaf_body
| `Stream stream -> Lwt.async @@ fun () -> unroll httpaf_body stream
exception Internal_server_error
exception Invalid_response_body_length of Httpaf.Response.t
exception Malformed_response of string
let with_uri uri ctx =
let scheme_v =
match Uri.scheme uri with
| Some "http" -> Some `HTTP
| Some "https" -> Some `HTTPS
| _ -> None in
let port_v =
match (Uri.port uri, scheme_v) with
| Some port, _ -> Some port
| None, Some `HTTP -> Some 80
| None, Some `HTTPS -> Some 443
| _ -> None in
let domain_name_v, ipaddr_v =
match Uri.host uri with
| Some v -> (
match
( Result.bind (Domain_name.of_string v) Domain_name.host,
Ipaddr.of_string v )
with
| _, Ok v -> (None, Some v)
| Ok v, _ -> (Some v, None)
| _ -> (None, None))
| _ -> (None, None) in
let ctx =
Option.fold ~none:ctx ~some:(fun v -> Mimic.add scheme v ctx) scheme_v in
let ctx = Option.fold ~none:ctx ~some:(fun v -> Mimic.add port v ctx) port_v in
let ctx =
Option.fold ~none:ctx ~some:(fun v -> Mimic.add ipaddr v ctx) ipaddr_v in
let ctx =
Option.fold ~none:ctx
~some:(fun v -> Mimic.add domain_name v ctx)
domain_name_v in
ctx
let with_host uri =
let hostname = Uri.host_with_default ~default:"localhost" uri in
let hostname =
match Uri.port uri with
| Some port -> Fmt.str "%s:%d" hostname port
| None -> hostname in
Httpaf.Headers.add_unless_exists headers "host" hostname
let with_transfer_encoding ~chunked (meth : Cohttp.Code.meth) body =
match (meth, chunked, body, Httpaf.Headers.get headers "content-length") with
| `GET, _, _, _ -> headers
| _, (None | Some false), _, Some _ -> headers
| _, Some true, _, (Some _ | None) | _, None, `Stream _, None ->
Httpaf.Headers.add_unless_exists headers "transfer-encoding" "chunked"
| _, (None | Some false), `Empty, None ->
Httpaf.Headers.add_unless_exists headers "content-length" "0"
| _, (None | Some false), `String str, None ->
Httpaf.Headers.add_unless_exists headers "content-length"
(string_of_int (String.length str))
| _, (None | Some false), `Strings sstr, None ->
let len = List.fold_right (( + ) <.> String.length) sstr 0 in
Httpaf.Headers.add_unless_exists headers "content-length"
(string_of_int len)
| _, Some false, `Stream _, None ->
invalid_arg "Impossible to transfer a stream with a content-length value"
module Httpaf_Client_connection = struct
include Httpaf.Client_connection
let yield_reader _ = assert false
let next_read_operation t =
(next_read_operation t :> [ `Close | `Read | `Yield ])
end
let call ?(ctx = default_ctx) ?
?body:(cohttp_body = Cohttp_lwt.Body.empty) ?chunked meth uri =
Log.debug (fun m -> m "Fill the context with %a." Uri.pp uri) ;
let ctx = with_uri uri ctx in
let config =
match Mimic.get httpaf_config ctx with
| Some config -> config
| None -> Httpaf.Config.default in
let =
match headers with
| Some -> Httpaf.Headers.of_list (Cohttp.Header.to_list headers)
| None -> Httpaf.Headers.empty in
let = with_host headers uri in
let = with_transfer_encoding ~chunked meth cohttp_body headers in
let meth =
match meth with
| #Httpaf.Method.t as meth -> meth
| #Cohttp.Code.meth as meth -> `Other (Cohttp.Code.string_of_method meth)
in
let req = Httpaf.Request.create ~headers meth (Uri.path_and_query uri) in
let stream, pusher = Lwt_stream.create () in
let mvar_res = Lwt_mvar.create_empty () in
let mvar_err = Lwt_mvar.create_empty () in
let open Lwt.Infix in
Mimic.resolve ctx >>= function
| Error (#Mimic.error as err) ->
Lwt.fail (Failure (Fmt.str "%a" Mimic.pp_error err))
| Ok flow -> (
let error_handler = error_handler mvar_err in
let response_handler = response_handler mvar_res pusher in
let httpaf_body, conn =
Httpaf.Client_connection.request ~config ~error_handler
~response_handler req in
Lwt.async (fun () -> Paf.run (module Httpaf_Client_connection) conn flow) ;
transmit cohttp_body httpaf_body ;
Log.debug (fun m -> m "Body transmitted.") ;
Lwt.pick
[
(Lwt_mvar.take mvar_res >|= fun res -> `Response res);
(Lwt_mvar.take mvar_err >|= fun err -> `Error err);
]
>>= function
| `Error (`Exn exn) -> Mimic.close flow >>= fun () -> Lwt.fail exn
| `Error (`Invalid_response_body_length resp) ->
Mimic.close flow >>= fun () ->
Lwt.fail (Invalid_response_body_length resp)
| `Error (`Malformed_response err) ->
Mimic.close flow >>= fun () -> Lwt.fail (Malformed_response err)
| `Response resp ->
Log.debug (fun m -> m "Response received.") ;
let version =
match resp.Httpaf.Response.version with
| { Httpaf.Version.major = 1; minor = 0 } -> `HTTP_1_0
| { major = 1; minor = 1 } -> `HTTP_1_1
| { major; minor } -> `Other (Fmt.str "%d.%d" major minor) in
let status =
match
(resp.Httpaf.Response.status
:> [ Cohttp.Code.status | Httpaf.Status.t ])
with
| #Cohttp.Code.status as status -> status
| #Httpaf.Status.t as status -> `Code (Httpaf.Status.to_code status)
in
let encoding =
match meth with
| #Httpaf.Method.standard as meth -> (
match Httpaf.Response.body_length ~request_method:meth resp with
| `Chunked | `Close_delimited -> Cohttp.Transfer.Chunked
| `Error _err -> raise Internal_server_error
| `Fixed length -> Cohttp.Transfer.Fixed length)
| _ -> Cohttp.Transfer.Chunked in
let =
Cohttp.Header.of_list
(Httpaf.Headers.to_list resp.Httpaf.Response.headers) in
let resp =
Cohttp.Response.make ~version ~status ~encoding ~headers () in
Lwt.return (resp, `Stream stream))
open Lwt.Infix
let head ?ctx ? uri = call ?ctx ?headers `HEAD uri >|= fst
let get ?ctx ? uri = call ?ctx ?headers `GET uri
let delete ?ctx ?body ?chunked ? uri =
call ?ctx ?body ?chunked ?headers `DELETE uri
let post ?ctx ?body ?chunked ? uri =
call ?ctx ?body ?chunked ?headers `POST uri
let put ?ctx ?body ?chunked ? uri =
call ?ctx ?body ?chunked ?headers `PUT uri
let patch ?ctx ?body ?chunked ? uri =
call ?ctx ?body ?chunked ?headers `PATCH uri
let post_form ?ctx:_ ?headers:_ ~params:_ _uri = assert false
let callv ?ctx:_ _uri _stream = assert false
[@@@warning "-32"]
let sexp_of_ctx _ctx = assert false
[@@@warning "+32"]