package eio
Effect-based direct-style IO API for OCaml
Install
Dune Dependency
Authors
Maintainers
Sources
eio-1.0.tbz
sha256=da260d9da38b3dde9f316652a20b13a261cf90b85a2498ac669b7d564e61942d
sha512=5886e1159f48ede237769baa1d8b5daafa0310e4192d7fe0e8c32aef70f2b6378cef72d0fbae308457e25d87a69802b9ee83a5e8f23e0591d83086a92d701c46
doc/src/eio.unix/thread_pool.ml.html
Source file thread_pool.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
module Zzz = Eio_utils.Zzz type job = | New | Exit | Job : { fn : unit -> 'a; enqueue : ('a, Eio.Exn.with_bt) result -> unit; } -> job (* Mailbox with blocking semaphore *) module Mailbox = struct type t = { available : Semaphore.Binary.t; mutable cell : job; } let create () = { available = Semaphore.Binary.make false; cell = New } let put mbox x = (* The Semaphore contains an atomic frontier, therefore [cell] does not need to be an atomic *) mbox.cell <- x; Semaphore.Binary.release mbox.available let take mbox = Semaphore.Binary.acquire mbox.available; mbox.cell end module Free_pool = struct type list = | Empty | Closed | Free of Mailbox.t * list type t = list Atomic.t let rec close_list = function | Free (x, xs) -> Mailbox.put x Exit; close_list xs | Empty | Closed -> () let close t = let items = Atomic.exchange t Closed in close_list items let rec drop t = match Atomic.get t with | Closed | Empty -> () | Free _ as items -> if Atomic.compare_and_set t items Empty then close_list items else drop t let rec put t mbox = match Atomic.get t with | Closed -> assert false | (Empty | Free _) as current -> let next = Free (mbox, current) in if not (Atomic.compare_and_set t current next) then put t mbox (* concurrent update, try again *) let make_thread t = let mbox = Mailbox.create () in let _thread : Thread.t = Thread.create (fun () -> while true do match Mailbox.take mbox with | New -> assert false | Exit -> raise Thread.Exit | Job { fn; enqueue } -> let result = try Ok (fn ()) with exn -> let bt = Printexc.get_raw_backtrace () in Error (exn, bt) in put t mbox; (* Ensure thread is in free-pool before enqueuing. *) enqueue result done ) () in mbox let rec get_thread t = match Atomic.get t with | Closed -> invalid_arg "Thread pool closed!" | Empty -> make_thread t | Free (mbox, next) as current -> if Atomic.compare_and_set t current next then mbox else get_thread t (* concurrent update, try again *) end type t = { free : Free_pool.t; sleep_q : Zzz.t; mutable timeout : Zzz.Key.t option; } type _ Effect.t += Run_in_systhread : (unit -> 'a) -> (('a, Eio.Exn.with_bt) result * t) Effect.t let terminate t = Free_pool.close t.free; Option.iter (fun key -> Zzz.remove t.sleep_q key; t.timeout <- None) t.timeout let create ~sleep_q = { free = Atomic.make Free_pool.Empty; sleep_q; timeout = None } let run t fn = match fn () with | x -> terminate t; x | exception ex -> let bt = Printexc.get_raw_backtrace () in terminate t; Printexc.raise_with_backtrace ex bt let submit t ~ctx ~enqueue fn = match Eio.Private.Fiber_context.get_error ctx with | Some e -> enqueue (Error (e, Eio.Exn.empty_backtrace)) | None -> let mbox = Free_pool.get_thread t.free in Mailbox.put mbox (Job { fn; enqueue }) let run_in_systhread ?(label="systhread") fn = Eio.Private.Trace.suspend_fiber label; let r, t = Effect.perform (Run_in_systhread fn) in if t.timeout = None then ( let time = Mtime.add_span (Mtime_clock.now ()) Mtime.Span.(20 * ms) |> Option.value ~default:Mtime.max_stamp in t.timeout <- Some (Zzz.add t.sleep_q time (Fn (fun () -> Free_pool.drop t.free; t.timeout <- None))) ); match r with | Ok x -> x | Error (ex, bt) -> Printexc.raise_with_backtrace ex bt
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>