package caqti

  1. Overview
  2. Docs

Source file caqti_pool.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
(* Copyright (C) 2014--2019  Petter A. Urkedal <paurkedal@gmail.com>
 *
 * This library is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version, with the OCaml static compilation exception.
 *
 * This library is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
 * License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this library.  If not, see <http://www.gnu.org/licenses/>.
 *)

open Caqti_compat [@@warning "-33"]

let default_max_size =
  try int_of_string (Sys.getenv "CAQTI_POOL_MAX_SIZE") with Not_found -> 8

module Make (System : Caqti_driver_sig.System_common) = struct
  open System

  let (>>=?) m mf = m >>= (function Ok x -> mf x | Error e -> return (Error e))

  module Task = struct
    type t = {priority : float; mvar : unit Mvar.t}
    let wake {mvar; _} = Mvar.store () mvar
    let compare {priority = pA; _} {priority = pB; _} = Float.compare pB pA
  end

  module Taskq = Caqti_heap.Make (Task)

  type ('a, +'e) t = {
    p_create : unit -> ('a, 'e) result future;
    p_free : 'a -> unit future;
    p_check : 'a -> (bool -> unit) -> unit;
    p_validate : 'a -> bool future;
    p_max_size : int;
    mutable p_cur_size : int;
    p_pool : 'a Queue.t;
    mutable p_waiting : Taskq.t;
  }

  let create
        ?(max_size = default_max_size)
        ?(check = fun _ f -> f true)
        ?(validate = fun _ -> return true)
        create free =
    { p_create = create; p_free = free;
      p_check = check; p_validate = validate;
      p_max_size = max_size;
      p_cur_size = 0; p_pool = Queue.create (); p_waiting = Taskq.empty }

  let size {p_cur_size; _} = p_cur_size

  let wait ~priority p =
    let mvar = Mvar.create () in
    p.p_waiting <- Taskq.push Task.({priority; mvar}) p.p_waiting;
    Mvar.fetch mvar

  let rec acquire ~priority p =
    if Queue.is_empty p.p_pool then begin
      if p.p_cur_size < p.p_max_size
      then begin
        p.p_cur_size <- p.p_cur_size + 1;
        p.p_create () >|=
        (function
         | Ok e -> Ok e
         | Error err -> p.p_cur_size <- p.p_cur_size - 1; Error err)
      end else
        (wait ~priority p >>= fun () -> acquire ~priority p)
    end else begin
      let e = Queue.take p.p_pool in
      p.p_validate e >>= fun ok ->
      if ok then
        return (Ok e)
      else
        p.p_create () >|=
        (function
         | Ok e -> Ok e
         | Error err -> p.p_cur_size <- p.p_cur_size - 1; Error err)
    end

  let release p e =
    p.p_check e @@ fun ok ->
    if ok then Queue.add e p.p_pool
          else p.p_cur_size <- p.p_cur_size - 1;
    if not (Taskq.is_empty p.p_waiting) then
      let task, taskq = Taskq.pop_e p.p_waiting in
      p.p_waiting <- taskq;
      Task.wake task

  let use ?(priority = 0.0) f p =
    acquire ~priority p >>=? fun e ->
    try
      f e >>=
      (function
       | Ok y -> release p e; return (Ok y)
       | Error err -> release p e; return (Error err))
    with exn ->
      release p e; raise exn

  let dispose p e = p.p_free e >|= fun () -> p.p_cur_size <- p.p_cur_size - 1

  let rec drain p =
    if p.p_cur_size = 0 then return () else
    (if Queue.is_empty p.p_pool
     then wait ~priority:0.0 p
     else dispose p (Queue.take p.p_pool)) >>= fun () ->
    drain p

end
OCaml

Innovation. Community. Security.