package tcpip
OCaml TCP/IP networking stack, used in MirageOS
Install
Dune Dependency
Authors
Maintainers
Sources
tcpip-9.0.1.tbz
sha256=fac07ce986811cf5e3d71373d92b631cc30fbef548d6da21b0917212dcf90b03
sha512=01de13f560d58b1524c39619e4e4cb6ebbf069155eb43d0f264aa12b00e0cc8c39792719e3ca46585dd596b692b8e1e3f8c132f005ed9e2d77747c0c158bf4d9
doc/src/tcpip.tcp/user_buffer.ml.html
Source file user_buffer.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
(* * Copyright (c) 2010 http://github.com/barko 00336ea19fcb53de187740c490f764f4 * Copyright (c) 2011 Anil Madhavapeddy <anil@recoil.org> * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above * copyright notice and this permission notice appear in all copies. * * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) open Lwt.Infix let lwt_sequence_add_l s seq = let (_:'a Lwt_dllist.node) = Lwt_dllist.add_l s seq in () (* A bounded queue to receive data segments and let readers block on receiving them. Also supports a monitor that is informed when the queue size changes *) module Rx = struct (* TODO: check that flow control works on the rx side - ie if the application stops taking data the window closes so the other side stops sending *) type t = { q: Cstruct.t option Lwt_dllist.t; wnd: Window.t; writers: unit Lwt.u Lwt_dllist.t; readers: Cstruct.t option Lwt.u Lwt_dllist.t; mutable watcher: int32 Lwt_mvar.t option; mutable max_size: int32; mutable cur_size: int32; } let create ~max_size ~wnd = let q = Lwt_dllist.create () in let writers = Lwt_dllist.create () in let readers = Lwt_dllist.create () in let watcher = None in let cur_size = 0l in { q; wnd; writers; readers; max_size; cur_size; watcher } let notify_size_watcher t = let rx_wnd = max 0l (Int32.sub t.max_size t.cur_size) in Window.set_rx_wnd t.wnd rx_wnd; match t.watcher with |None -> Lwt.return_unit |Some w -> Lwt_mvar.put w t.cur_size let seglen s = match s with | None -> 0 | Some b -> Cstruct.length b let remove_all t = let rec rm = function | 0 -> () | n -> ignore (Lwt_dllist.take_l t.q); rm (pred n) in rm (Lwt_dllist.length t.q) let add_r t s = if t.cur_size > t.max_size then let th,u = Lwt.wait () in let node = Lwt_dllist.add_r u t.writers in Lwt.on_cancel th (fun _ -> Lwt_dllist.remove node); (* Update size before blocking, which may push cur_size above max_size *) t.cur_size <- Int32.(add t.cur_size (of_int (seglen s))); notify_size_watcher t >>= fun () -> th >>= fun () -> ignore(Lwt_dllist.add_r s t.q); Lwt.return_unit else match Lwt_dllist.take_opt_l t.readers with | None -> t.cur_size <- Int32.(add t.cur_size (of_int (seglen s))); ignore(Lwt_dllist.add_r s t.q); notify_size_watcher t | Some u -> Lwt.return (Lwt.wakeup u s) let take_l t = if Lwt_dllist.is_empty t.q then begin let th,u = Lwt.wait () in let node = Lwt_dllist.add_r u t.readers in Lwt.on_cancel th (fun _ -> Lwt_dllist.remove node); th end else begin let s = Lwt_dllist.take_l t.q in t.cur_size <- Int32.(sub t.cur_size (of_int (seglen s))); notify_size_watcher t >>= fun () -> if t.cur_size < t.max_size then begin match Lwt_dllist.take_opt_l t.writers with |None -> () |Some w -> Lwt.wakeup w () end; Lwt.return s end let cur_size t = t.cur_size let max_size t = t.max_size let monitor t mvar = t.watcher <- Some mvar end (* The transmit queue simply advertises how much data is allowed to be written, and a wakener for when it is full. It is up to the application to decide how to throttle or breakup its data production with this information. *) module Tx = struct module TXS = Segment.Tx type t = { wnd: Window.t; writers: unit Lwt.u Lwt_dllist.t; txq: TXS.t; buffer: Cstruct.t Lwt_dllist.t; max_size: int32; mutable bufbytes: int32; } let create ~max_size ~wnd ~txq = let buffer = Lwt_dllist.create () in let writers = Lwt_dllist.create () in let bufbytes = 0l in { wnd; writers; txq; buffer; max_size; bufbytes } let len data = Int32.of_int (Cstruct.length data) let lenv datav = match datav with |[] -> 0l |[d] -> Int32.of_int (Cstruct.length d) |ds -> Int32.of_int (List.fold_left (fun a b -> Cstruct.length b + a) 0 ds) (* Check how many bytes are available to write to output buffer *) let available t = let a = Int32.sub t.max_size t.bufbytes in match a < (Int32.of_int (Window.tx_mss t.wnd)) with | true -> 0l | false -> a (* Check how many bytes are available to write to wire *) let available_cwnd t = Window.tx_available t.wnd (* Wait until at least sz bytes are available in the window *) let rec wait_for t sz = if (available t) >= sz then begin Lwt.return_unit end else begin let th,u = Lwt.wait () in let node = Lwt_dllist.add_r u t.writers in Lwt.on_cancel th (fun _ -> Lwt_dllist.remove node); th >>= fun () -> wait_for t sz end let compactbufs bl = Cstruct.concat bl (* Wait until the user buffer is flushed *) let rec wait_for_flushed t = if Lwt_dllist.is_empty t.buffer then begin Lwt.return_unit end else begin let th,u = Lwt.wait () in let node = Lwt_dllist.add_r u t.writers in Lwt.on_cancel th (fun _ -> Lwt_dllist.remove node); th >>= fun () -> wait_for_flushed t end let rec clear_buffer t = let rec addon_more curr_data l = match Lwt_dllist.take_opt_l t.buffer with | None -> List.rev curr_data | Some s -> let s_len = len s in match s_len > l with | true -> lwt_sequence_add_l s t.buffer; List.rev curr_data | false -> t.bufbytes <- Int32.sub t.bufbytes s_len; addon_more (s::curr_data) (Int32.sub l s_len) in let get_pkt_to_send () = let avail_len = min (available_cwnd t) (Int32.of_int (Window.tx_mss t.wnd)) in let s = Lwt_dllist.take_l t.buffer in let s_len = len s in match s_len > avail_len with | true -> begin match avail_len with |0l -> (* return pkt to buffer *) lwt_sequence_add_l s t.buffer; None |_ -> (* split buffer into a partial write *) let to_send,remaining = Cstruct.split s (Int32.to_int avail_len) in (* queue remaining view *) lwt_sequence_add_l remaining t.buffer; t.bufbytes <- Int32.sub t.bufbytes avail_len; Some [to_send] end | false -> match s_len < avail_len with | true -> t.bufbytes <- Int32.sub t.bufbytes s_len; Some (addon_more (s::[]) (Int32.sub avail_len s_len)) | false -> t.bufbytes <- Int32.sub t.bufbytes s_len; Some [s] in match Lwt_dllist.is_empty t.buffer with | true -> Lwt.return_unit | false -> match get_pkt_to_send () with | None -> Lwt.return_unit | Some pkt -> let b = compactbufs pkt in TXS.output ~flags:Segment.Psh t.txq b >>= fun () -> clear_buffer t (* Chunk up the segments into MSS max for transmission *) let transmit_segments ~mss ~txq datav = let transmit acc = let b = compactbufs (List.rev acc) in TXS.output ~flags:Segment.Psh txq b in let rec chunk datav acc = match datav with |[] -> begin match acc with |[] -> Lwt.return_unit |_ -> transmit acc end |hd::tl -> let curlen = Cstruct.lenv acc in let tlen = Cstruct.length hd + curlen in if tlen > mss then begin let a,b = Cstruct.split hd (mss - curlen) in transmit (a::acc) >>= fun () -> chunk (b::tl) [] end else chunk tl (hd::acc) in chunk datav [] let write t datav = let l = lenv datav in let mss = Int32.of_int (Window.tx_mss t.wnd) in match Lwt_dllist.is_empty t.buffer && (l = mss || not (Window.tx_inflight t.wnd)) with | false -> t.bufbytes <- Int32.add t.bufbytes l; List.iter (fun data -> ignore(Lwt_dllist.add_r data t.buffer)) datav; if t.bufbytes < mss then Lwt.return_unit else clear_buffer t | true -> let avail_len = available_cwnd t in match avail_len < l with | true -> t.bufbytes <- Int32.add t.bufbytes l; List.iter (fun data -> ignore(Lwt_dllist.add_r data t.buffer)) datav; Lwt.return_unit | false -> let max_size = Window.tx_mss t.wnd in transmit_segments ~mss:max_size ~txq:t.txq datav let write_nodelay t datav = let l = lenv datav in match Lwt_dllist.is_empty t.buffer with | false -> t.bufbytes <- Int32.add t.bufbytes l; List.iter (fun data -> ignore(Lwt_dllist.add_r data t.buffer)) datav; Lwt.return_unit | true -> let avail_len = available_cwnd t in match avail_len < l with | true -> t.bufbytes <- Int32.add t.bufbytes l; List.iter (fun data -> ignore(Lwt_dllist.add_r data t.buffer)) datav; Lwt.return_unit | false -> let max_size = Window.tx_mss t.wnd in transmit_segments ~mss:max_size ~txq:t.txq datav let inform_app t = match Lwt_dllist.take_opt_l t.writers with | None -> Lwt.return_unit | Some w -> Lwt.wakeup w (); (* TODO: check if this should wake all writers not just one *) Lwt.return_unit (* Indicate that more bytes are available for waiting writers. Note that sz does not take window scaling into account, and so should be passed as unscaled (i.e. from the wire) here. Window will internally scale it up. *) let free t _sz = clear_buffer t >>= fun () -> inform_app t let reset t = (* FIXME: duplicated code with Segment.reset_seq *) let rec reset_seq segs = match Lwt_dllist.take_opt_l segs with | None -> () | Some _ -> reset_seq segs in reset_seq t.buffer; inform_app t end
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>