package cohttp-async

  1. Overview
  2. Docs

Source file server.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
open Base
open Async_kernel
open Async_unix

type ('address, 'listening_on) t = {
  server : ('address, 'listening_on) Tcp.Server.t; [@sexp.opaque]
}
[@@deriving sexp_of]

let num_connections t = Tcp.Server.num_connections t.server

type response = Cohttp.Response.t * Body.t [@@deriving sexp_of]

type response_action =
  [ `Expert of Http.Response.t * (Reader.t -> Writer.t -> unit Deferred.t)
  | `Response of response ]

type 'r respond_t =
  ?headers:Http.Header.t -> ?body:Body.t -> Http.Status.t -> 'r Deferred.t

let close t = Tcp.Server.close t.server
let close_finished t = Tcp.Server.close_finished t.server
let is_closed t = Tcp.Server.is_closed t.server
let listening_on t = Tcp.Server.listening_on t.server

let read_body req rd =
  match Http.Request.has_body req with
  (* TODO maybe attempt to read body *)
  | `No | `Unknown -> `Empty
  | `Yes ->
      (* Create a Pipe for the body *)
      let reader = Io.Request.make_body_reader req rd in
      let pipe = Body.Private.pipe_of_body Io.Request.read_body_chunk reader in
      `Pipe pipe

let collect_errors writer ~f =
  let monitor = Writer.monitor writer in
  (* don't propagate errors up, we handle them here *)
  Monitor.detach_and_get_error_stream monitor |> (ignore : exn Stream.t -> unit);
  choose
    [
      choice (Monitor.get_next_error monitor) (fun e ->
          Error (Exn.Reraised ("Cohttp_async.Server.collect_errors", e)));
      choice (try_with ~name:"Cohttp_async.Server.collect_errors" f) Fn.id;
    ]

let reader_info = Info.of_string "Cohttp_async.Server.Expert: Create reader"

let handle_client handle_request sock rd wr =
  collect_errors wr ~f:(fun () ->
      let rd = Input_channel.create rd in
      let rec loop rd wr sock handle_request =
        if Input_channel.is_closed rd then Deferred.unit
        else
          Io.Request.read rd >>= function
          | `Eof | `Invalid _ -> Deferred.unit
          | `Ok req -> (
              let req_body = read_body req rd in
              handle_request ~body:req_body sock req >>= function
              | `Expert (res, handler) ->
                  Io.Response.write_header res wr >>= fun () ->
                  Input_channel.to_reader reader_info rd >>= fun reader ->
                  handler reader wr
              | `Response (res, res_body) ->
                  (* There are scenarios if a client leaves before consuming the full response,
                     we might have a reference to an async Pipe that doesn't get drained.

                     Not draining or closing a pipe can lead to issues if its holding a resource like
                     a file handle as those resources will never be closed, leading to a leak.

                     Async writers have a promise that's fulfilled whenever they are closed,
                     so we can use it to schedule a close operation on the stream to ensure that we
                     don't leave a stream open if the underlying channels are closed. *)
                  (match res_body with
                  | `Empty | `String _ | `Strings _ -> ()
                  | `Pipe stream ->
                      Deferred.any_unit
                        [ Writer.close_finished wr; Writer.consumer_left wr ]
                      >>> fun () -> Pipe.close_read stream);
                  let keep_alive =
                    Http.Request.is_keep_alive req
                    && Http.Response.is_keep_alive res
                  in
                  let res =
                    let headers =
                      Http.Header.add_unless_exists
                        (Http.Response.headers res)
                        "connection"
                        (if keep_alive then "keep-alive" else "close")
                    in
                    { res with Http.Response.headers }
                  in
                  Io.Response.write ~flush:false
                    (Body.Private.write_body Io.Response.write_body res_body)
                    res wr
                  >>= fun () ->
                  Body.Private.drain req_body >>= fun () ->
                  if keep_alive then loop rd wr sock handle_request
                  else Deferred.unit)
      in
      loop rd wr sock handle_request)
  >>| Result.ok_exn

let respond ?(headers = Http.Header.init ()) ?(body = `Empty) status :
    response Deferred.t =
  let encoding = Body.transfer_encoding body in
  let resp = Cohttp.Response.make ~status ~encoding ~headers () in
  return (resp, body)

let respond_with_pipe ?headers ?(code = `OK) body =
  respond ?headers ~body:(`Pipe body) code

let respond_string ?headers ?(status = `OK) body =
  respond ?headers ~body:(`String body) status

let respond_with_redirect ?headers uri =
  let headers =
    Http.Header.add_opt_unless_exists headers "location" (Uri.to_string uri)
  in
  respond ~headers `Found

let resolve_local_file ~docroot ~uri =
  Cohttp.Path.resolve_local_file ~docroot ~uri

let error_body_default = "<html><body><h1>404 Not Found</h1></body></html>"

let respond_with_file ?headers ?(error_body = error_body_default) filename =
  Monitor.try_with ~run:`Now (fun () ->
      Reader.open_file filename >>= fun rd ->
      let body = `Pipe (Reader.pipe rd) in
      let mime_type = Magic_mime.lookup filename in
      let headers =
        Http.Header.add_opt_unless_exists headers "content-type" mime_type
      in
      respond ~headers ~body `OK)
  >>= function
  | Ok res -> return res
  | Error _exn -> respond_string ~status:`Not_found error_body

type mode = Conduit_async.server

let create_raw ?max_connections ?backlog ?buffer_age_limit ?(mode = `TCP)
    ~on_handler_error where_to_listen handle_request =
  Conduit_async.serve ?max_connections ?backlog ?buffer_age_limit
    ~on_handler_error mode where_to_listen
    (handle_client handle_request)
  >>| fun server -> { server }

let create_expert ?max_connections ?backlog ?buffer_age_limit ?(mode = `TCP)
    ~on_handler_error where_to_listen handle_request =
  create_raw ?max_connections ?backlog ?buffer_age_limit ~on_handler_error ~mode
    where_to_listen handle_request

let create ?max_connections ?backlog ?buffer_age_limit ?(mode = `TCP)
    ~on_handler_error where_to_listen handle_request =
  let handle_request ~body address request =
    handle_request ~body address request >>| fun r -> `Response r
  in
  create_raw ?max_connections ?backlog ?buffer_age_limit ~on_handler_error ~mode
    where_to_listen handle_request

module Expert = struct
  let create handle_request addr reader writer =
    let handle_request ~body addr request =
      handle_request ~body addr request >>| fun r -> `Response r
    in
    handle_client handle_request addr reader writer

  let create_with_response_action handle_request addr reader writer =
    handle_client handle_request addr reader writer
end
OCaml

Innovation. Community. Security.