Source file caqti_blocking.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
open Caqti_platform
type Caqti_error.msg += Msg_unix of Unix.error * string * string
let () =
let pp ppf = function
| Msg_unix (err, func, arg) ->
Format.fprintf ppf "%s in %s(%S)" (Unix.error_message err) func arg
| _ -> assert false
in
Caqti_error.define_msg ~pp [%extension_constructor Msg_unix]
module Fiber = struct
type 'a t = 'a
module Infix = struct
let (>>=) x f = f x
let (>|=) x f = f x
end
let return x = x
let catch f g = try f () with exn -> g exn
let finally f g =
(match f () with
| y -> g (); y
| exception exn -> g (); raise exn)
let cleanup f g = try f () with exn -> g (); raise exn
end
module Stream = Caqti_platform.Stream.Make (Fiber)
module System_core = struct
module Fiber = Fiber
module Switch = Caqti_platform.Switch.Make (Fiber)
let async ~sw:_ f = f ()
module Stream = Stream
module Mutex = Mutex
module Condition = Condition
module Log = struct
type 'a log = 'a Logs.log
let err ?(src = Logging.default_log_src) = Logs.err ~src
let warn ?(src = Logging.default_log_src) = Logs.warn ~src
let info ?(src = Logging.default_log_src) = Logs.info ~src
let debug ?(src = Logging.default_log_src) = Logs.debug ~src
end
type stdenv = unit
module Sequencer = struct
type 'a t = 'a
let create m = m
let enqueue m f = f m
end
end
module Pool = Caqti_platform.Pool.Make_without_alarm (System_core)
module System = struct
include System_core
module Net = struct
module Sockaddr = struct
type t = Unix.sockaddr
let unix s = Unix.ADDR_UNIX s
let tcp (addr, port) =
Unix.ADDR_INET (Unix.inet_addr_of_string (Ipaddr.to_string addr), port)
end
let getaddrinfo ~stdenv:() host port =
try
let opts = Unix.[AI_SOCKTYPE SOCK_STREAM] in
Unix.getaddrinfo (Domain_name.to_string host) (string_of_int port) opts
|> List.map (fun ai -> ai.Unix.ai_addr) |> Result.ok
with
| Not_found -> Ok []
| Unix.Unix_error (code, _, _) ->
Error (`Msg ("Cannot resolve host name: " ^ Unix.error_message code))
let convert_io_exception = function
| Unix.Unix_error (err, fn, arg) -> Some (Msg_unix (err, fn, arg))
| _ -> None
module Socket = struct
type t = Tcp of in_channel * out_channel
let output_char (Tcp (_, oc)) = output_char oc
let output_string (Tcp (_, oc)) = output_string oc
let flush (Tcp (_, oc)) = flush oc
let input_char (Tcp (ic, _)) = input_char ic
let really_input (Tcp (ic, _)) = really_input ic
let close (Tcp (_, oc)) = close_out oc
end
type tcp_flow = Socket.t
type tls_flow = Socket.t
let connect_tcp ~sw:_ ~stdenv:() sockaddr =
try
let ic, oc = Unix.open_connection sockaddr in
Ok (Socket.Tcp (ic, oc))
with
| Unix.Unix_error (err, func, arg) -> Error (Msg_unix (err, func, arg))
let tcp_flow_of_socket _ = None
let socket_of_tls_flow ~sw:_ = Fun.id
module type TLS_PROVIDER = System_sig.TLS_PROVIDER
with type 'a fiber := 'a
and type tcp_flow := Socket.t
and type tls_flow := Socket.t
let tls_providers_r : (module TLS_PROVIDER) list ref = ref []
let register_tls_provider p = tls_providers_r := p :: !tls_providers_r
let tls_providers _ =
!tls_providers_r
end
end
module System_unix = struct
module Unix = struct
type file_descr = Unix.file_descr
let wrap_fd f fd = f fd
let poll ~stdenv:()
?(read = false) ?(write = false) ?(timeout = -1.0) fd =
let read_fds = if read then [fd] else [] in
let write_fds = if write then [fd] else [] in
let read_fds, write_fds, _ = Unix.select read_fds write_fds [] timeout in
(read_fds <> [], write_fds <> [], read_fds = [] && write_fds = [])
end
module Preemptive = struct
let detach f x = f x
let run_in_main f = f ()
end
end
module Loader = Caqti_platform_unix.Driver_loader.Make (System) (System_unix)
include Connector.Make (System) (Pool) (Loader)
open System
module type CONNECTION = Caqti_connection_sig.S
with type 'a fiber := 'a
and type ('a, 'e) stream := ('a, 'e) Stream.t
type connection = (module CONNECTION)
let connect ?subst ?env ?config ?tweaks_version uri =
let sw = Switch.create () in
connect ?subst ?env ?config ?tweaks_version ~sw ~stdenv:() uri
let with_connection = with_connection ~stdenv:()
let connect_pool
?pool_config ?post_connect ?subst ?env ?config ?tweaks_version uri =
let sw = Switch.create () in
connect_pool
?pool_config ?post_connect ?subst ?env ?config ?tweaks_version
~sw ~stdenv:() uri
let or_fail = function
| Ok x -> x
| Error (#Caqti_error.t as err) -> raise (Caqti_error.Exn err)