package caqti

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file caqti_blocking.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
(* Copyright (C) 2018--2025  Petter A. Urkedal <paurkedal@gmail.com>
 *
 * This library is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version, with the LGPL-3.0 Linking Exception.
 *
 * This library is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
 * License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * and the LGPL-3.0 Linking Exception along with this library.  If not, see
 * <http://www.gnu.org/licenses/> and <https://spdx.org>, respectively.
 *)

open Caqti_platform

type Caqti_error.msg += Msg_unix of Unix.error * string * string

let () =
  let pp ppf = function
   | Msg_unix (err, func, arg) ->
      Format.fprintf ppf "%s in %s(%S)" (Unix.error_message err) func arg
   | _ -> assert false
  in
  Caqti_error.define_msg ~pp [%extension_constructor Msg_unix]

module Fiber = struct
  type 'a t = 'a
  module Infix = struct
    let (>>=) x f = f x
    let (>|=) x f = f x
  end
  let return x = x

  let catch f g = try f () with exn -> g exn

  let finally f g =
    (match f () with
     | y -> g (); y
     | exception exn -> g (); raise exn)

  let cleanup f g = try f () with exn -> g (); raise exn
end

module Stream = Caqti_platform.Stream.Make (Fiber)

module System_core = struct
  module Fiber = Fiber

  module Switch = Caqti_platform.Switch.Make (Fiber)

  let async ~sw:_ f = f ()

  module Stream = Stream

  module Mutex = Mutex

  module Condition = Condition

  module Log = struct
    type 'a log = 'a Logs.log
    let err ?(src = Logging.default_log_src) = Logs.err ~src
    let warn ?(src = Logging.default_log_src) = Logs.warn ~src
    let info ?(src = Logging.default_log_src) = Logs.info ~src
    let debug ?(src = Logging.default_log_src) = Logs.debug ~src
  end

  type stdenv = unit

  module Sequencer = struct
    type 'a t = 'a
    let create m = m
    let enqueue m f = f m
  end
end

module Pool = Caqti_platform.Pool.Make_without_alarm (System_core)

module System = struct
  include System_core

  module Net = struct

    module Sockaddr = struct
      type t = Unix.sockaddr
      let unix s = Unix.ADDR_UNIX s
      let tcp (addr, port) =
        Unix.ADDR_INET (Unix.inet_addr_of_string (Ipaddr.to_string addr), port)
    end

    let getaddrinfo ~stdenv:() host port =
      try
        let opts = Unix.[AI_SOCKTYPE SOCK_STREAM] in
        Unix.getaddrinfo (Domain_name.to_string host) (string_of_int port) opts
          |> List.map (fun ai -> ai.Unix.ai_addr) |> Result.ok
      with
       | Not_found -> Ok []
       | Unix.Unix_error (code, _, _) ->
          Error (`Msg ("Cannot resolve host name: " ^ Unix.error_message code))

    let convert_io_exception = function
     | Unix.Unix_error (err, fn, arg) -> Some (Msg_unix (err, fn, arg))
     | _ -> None

    module Socket = struct
      type t = Tcp of in_channel * out_channel

      let output_char (Tcp (_, oc)) = output_char oc
      let output_string (Tcp (_, oc)) = output_string oc
      let flush (Tcp (_, oc)) = flush oc
      let input_char (Tcp (ic, _)) = input_char ic
      let really_input (Tcp (ic, _)) = really_input ic
      let close (Tcp (_, oc)) = close_out oc
    end

    type tcp_flow = Socket.t
    type tls_flow = Socket.t

    let connect_tcp ~sw:_ ~stdenv:() sockaddr =
      try
        let ic, oc = Unix.open_connection sockaddr in
        Ok (Socket.Tcp (ic, oc))
      with
       | Unix.Unix_error (err, func, arg) -> Error (Msg_unix (err, func, arg))

    let tcp_flow_of_socket _ = None
    let socket_of_tls_flow ~sw:_ = Fun.id

    module type TLS_PROVIDER = System_sig.TLS_PROVIDER
      with type 'a fiber := 'a
       and type tcp_flow := Socket.t
       and type tls_flow := Socket.t

    let tls_providers_r : (module TLS_PROVIDER) list ref = ref []

    let register_tls_provider p = tls_providers_r := p :: !tls_providers_r

    let tls_providers _ =
      (* Try to load caqti-tls.unix here if/when implemented. *)
      !tls_providers_r
  end

end

module System_unix = struct

  module Unix = struct
    type file_descr = Unix.file_descr
    let wrap_fd f fd = f fd
    let poll ~stdenv:()
             ?(read = false) ?(write = false) ?(timeout = -1.0) fd =
      let read_fds = if read then [fd] else [] in
      let write_fds = if write then [fd] else [] in
      let read_fds, write_fds, _ = Unix.select read_fds write_fds [] timeout in
      (read_fds <> [], write_fds <> [], read_fds = [] && write_fds = [])
  end

  module Preemptive = struct
    let detach f x = f x
    let run_in_main f = f ()
  end

end

module Loader = Caqti_platform_unix.Driver_loader.Make (System) (System_unix)

include Connector.Make (System) (Pool) (Loader)

open System

module type CONNECTION = Caqti_connection_sig.S
  with type 'a fiber := 'a
   and type ('a, 'e) stream := ('a, 'e) Stream.t

type connection = (module CONNECTION)

let connect ?subst ?env ?config ?tweaks_version uri =
  let sw = Switch.create () in
  connect ?subst ?env ?config ?tweaks_version ~sw ~stdenv:() uri

let with_connection = with_connection ~stdenv:()

let connect_pool
      ?pool_config ?post_connect ?subst ?env ?config ?tweaks_version uri =
  let sw = Switch.create () in
  connect_pool
    ?pool_config ?post_connect ?subst ?env ?config ?tweaks_version
    ~sw ~stdenv:() uri

let or_fail = function
 | Ok x -> x
 | Error (#Caqti_error.t as err) -> raise (Caqti_error.Exn err)
OCaml

Innovation. Community. Security.