package caqti

  1. Overview
  2. Docs

Source file caqti1_pool.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
(* Copyright (C) 2017  Petter A. Urkedal <paurkedal@gmail.com>
 *
 * This library is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version, with the OCaml static compilation exception.
 *
 * This library is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
 * License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this library.  If not, see <http://www.gnu.org/licenses/>.
 *)

let default_max_size =
  try int_of_string (Sys.getenv "CAQTI_POOL_MAX_SIZE") with Not_found -> 8

module Make (System : Caqti1_system_sig.S) = struct
  open System

  module Task = struct
    type t = {priority : float; mvar : unit Mvar.t}
    let wake {mvar; _} = Mvar.store () mvar
    let compare {priority = pA; _} {priority = pB; _} = Pervasives.compare pB pA
  end

  module Taskq = Caqti_heap.Make (Task)

  type 'a t = {
    p_create : unit -> 'a io;
    p_free : 'a -> unit io;
    p_check : 'a -> (bool -> unit) -> unit;
    p_validate : 'a -> bool io;
    p_max_size : int;
    mutable p_cur_size : int;
    p_pool : 'a Queue.t;
    mutable p_waiting : Taskq.t;
  }

  let create
        ?(max_size = default_max_size)
        ?(check = fun _ f -> f true)
        ?(validate = fun _ -> return true)
        create free =
    { p_create = create; p_free = free;
      p_check = check; p_validate = validate;
      p_max_size = max_size;
      p_cur_size = 0; p_pool = Queue.create (); p_waiting = Taskq.empty }

  let size {p_cur_size; _} = p_cur_size

  let wait ~priority p =
    let mvar = Mvar.create () in
    p.p_waiting <- Taskq.push Task.({priority; mvar}) p.p_waiting;
    Mvar.fetch mvar

  let rec acquire ~priority p =
    if Queue.is_empty p.p_pool then begin
      if p.p_cur_size < p.p_max_size
      then (p.p_cur_size <- p.p_cur_size + 1; p.p_create ())
      else (wait ~priority p >>= fun () -> acquire ~priority p)
    end else begin
      let e = Queue.take p.p_pool in
      catch (fun () -> p.p_validate e)
            (fun xc -> Queue.add e p.p_pool; fail xc) >>= fun ok ->
      if ok then return e else
      catch p.p_create (fun xc -> p.p_cur_size <- p.p_cur_size - 1; fail xc)
    end

  let release p e =
    p.p_check e @@ fun ok ->
    if ok then Queue.add e p.p_pool
          else p.p_cur_size <- p.p_cur_size - 1;
    if not (Taskq.is_empty p.p_waiting) then
      let task, taskq = Taskq.pop_e p.p_waiting in
      p.p_waiting <- taskq;
      Task.wake task

  let use ?(priority = 0.0) f p =
    acquire ~priority p >>= fun e ->
    catch (fun () -> f e >>= fun r -> release p e; return r)
          (fun xc -> release p e; fail xc)

  let dispose p e =
    p.p_free e >>= fun () -> p.p_cur_size <- p.p_cur_size - 1; return ()

  let rec drain p =
    if p.p_cur_size = 0 then return () else
    ( if Queue.is_empty p.p_pool
      then wait ~priority:0.0 p
      else dispose p (Queue.take p.p_pool) ) >>= fun () ->
    drain p

end
OCaml

Innovation. Community. Security.