package index
A platform-agnostic multi-level index for OCaml
Install
Dune Dependency
Authors
Maintainers
Sources
index-1.6.2.tbz
sha256=9388835098a4ed44eeced070ed86855c049df12a98311d4980b9b724ecab8860
sha512=2e3052aac2a3ee4190e5cbc914d37904d589997463b22023d31e6b75e21d779342088324a9b42d1854bf7131f32f3e75f6f9cc2cb214d79dd2baa0b4cc2eaad3
doc/src/index.unix/index_unix.ml.html
Source file index_unix.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432
(* The MIT License Copyright (c) 2019 Craig Ferguson <craig@tarides.com> Thomas Gazagnaire <thomas@tarides.com> Ioana Cristescu <ioana@tarides.com> Clément Pascutto <clement@tarides.com> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. *) open! Import let src = Logs.Src.create "index_unix" ~doc:"Index_unix" module Log = (val Logs.src_log src : Logs.LOG) exception RO_not_allowed let current_version = "00000001" module Stats = Index.Stats module IO : Index.Platform.IO = struct let ( ++ ) = Int63.add let ( -- ) = Int63.sub type t = { mutable file : string; mutable header : int63; mutable raw : Raw.t; mutable offset : int63; mutable flushed : int63; mutable fan_size : int63; readonly : bool; buf : Buffer.t; flush_callback : unit -> unit; } let flush ?no_callback ?(with_fsync = false) t = if t.readonly then raise RO_not_allowed; if not (Buffer.is_empty t.buf) then ( let buf_len = Buffer.length t.buf in let offset = t.offset in (match no_callback with Some () -> () | None -> t.flush_callback ()); Log.debug (fun l -> l "[%s] flushing %d bytes" t.file buf_len); Buffer.write_with (Raw.unsafe_write t.raw ~off:t.flushed) t.buf; Buffer.clear t.buf; Raw.Offset.set t.raw offset; assert (t.flushed ++ Int63.of_int buf_len = t.header ++ offset); t.flushed <- offset ++ t.header); if with_fsync then Raw.fsync t.raw let rename ~src ~dst = flush ~with_fsync:true src; Raw.close dst.raw; Unix.rename src.file dst.file; Buffer.clear dst.buf; src.file <- dst.file; dst.header <- src.header; dst.fan_size <- src.fan_size; dst.offset <- src.offset; dst.flushed <- src.flushed; dst.raw <- src.raw let close t = if not t.readonly then Buffer.clear t.buf; Raw.close t.raw let auto_flush_limit = Int63.of_int 1_000_000 let append_substring t buf ~off ~len = if t.readonly then raise RO_not_allowed; Buffer.add_substring t.buf buf ~off ~len; let len = Int63.of_int len in t.offset <- t.offset ++ len; if t.offset -- t.flushed > auto_flush_limit then flush t let append t buf = append_substring t buf ~off:0 ~len:(String.length buf) let read t ~off ~len buf = let off = t.header ++ off in let end_of_value = off ++ Int63.of_int len in if not t.readonly then assert ( let total_length = t.flushed ++ Int63.of_int (Buffer.length t.buf) in (* NOTE: we don't require that [end_of_value <= total_length] in order to support short reads on read-write handles (see comment about this case below). *) off <= total_length); if t.readonly || end_of_value <= t.flushed then (* Value is entirely on disk *) Raw.unsafe_read t.raw ~off ~len buf else (* Must read some data not yet flushed to disk *) let requested_from_disk = max 0 (Int63.to_int (t.flushed -- off)) in let requested_from_buffer = len - requested_from_disk in let read_from_disk = if requested_from_disk > 0 then ( let read = Raw.unsafe_read t.raw ~off ~len:requested_from_disk buf in assert (read = requested_from_disk); read) else 0 in let read_from_buffer = let src_off = max 0 (Int63.to_int (off -- t.flushed)) in let len = (* The user may request more bytes than actually exist, in which case we read to the end of the write buffer and return a size less than [len]. *) let available_length = Buffer.length t.buf - src_off in min available_length requested_from_buffer in Buffer.blit ~src:t.buf ~src_off ~dst:buf ~dst_off:requested_from_disk ~len; len in read_from_disk + read_from_buffer let offset t = t.offset let get_generation t = let i = Raw.Generation.get t.raw in Log.debug (fun m -> m "get_generation: %a" Int63.pp i); i let get_fanout t = Raw.Fan.get t.raw let get_fanout_size t = Raw.Fan.get_size t.raw let set_fanout t buf = assert (Int63.(equal (of_int (String.length buf)) t.fan_size)); Raw.Fan.set t.raw buf module Header = struct type header = { offset : int63; generation : int63 } let pp ppf { offset; generation } = Format.fprintf ppf "{ offset = %a; generation = %a }" Int63.pp offset Int63.pp generation let get t = let Raw.Header.{ offset; generation; _ } = Raw.Header.get t.raw in t.offset <- offset; let headers = { offset; generation } in Log.debug (fun m -> m "[%s] get_headers: %a" t.file pp headers); headers let set t { offset; generation } = let version = current_version in Log.debug (fun m -> m "[%s] set_header %a" t.file pp { offset; generation }); Raw.Header.(set t.raw { offset; version; generation }) end let protect_unix_exn = function | Unix.Unix_error _ as e -> failwith (Printexc.to_string e) | e -> raise e let ignore_enoent = function | Unix.Unix_error (Unix.ENOENT, _, _) -> () | e -> raise e let protect f x = try f x with e -> protect_unix_exn e let safe f x = try f x with e -> ignore_enoent e let mkdir dirname = let rec aux dir k = if Sys.file_exists dir && Sys.is_directory dir then k () else ( if Sys.file_exists dir then safe Unix.unlink dir; (aux [@tailcall]) (Filename.dirname dir) (fun () -> protect (Unix.mkdir dir) 0o755; k ())) in (aux [@tailcall]) dirname (fun () -> ()) let raw_file ~flags ~version ~offset ~generation file = let x = Unix.openfile file flags 0o644 in let raw = Raw.v x in let header = { Raw.Header.offset; version; generation } in Log.debug (fun m -> m "[%s] raw set_header %a" file Header.pp { offset; generation }); Raw.Header.set raw header; Raw.Fan.set raw ""; Raw.fsync raw; raw let clear ~generation ?(hook = fun () -> ()) ~reopen t = t.offset <- Int63.zero; t.flushed <- t.header; Buffer.clear t.buf; let old = t.raw in if reopen then ( (* Open a fresh file and rename it to ensure atomicity: concurrent readers should never see the file disapearing. *) let tmp_file = t.file ^ "_tmp" in t.raw <- raw_file ~version:current_version ~generation ~offset:Int63.zero ~flags:Unix.[ O_CREAT; O_RDWR; O_CLOEXEC ] tmp_file; Unix.rename tmp_file t.file) else (* Remove the file current file. This allows a fresh file to be created, before writing the new generation in the old file. *) Unix.unlink t.file; hook (); (* Set new generation in the old file. *) Raw.Header.set old { Raw.Header.offset = Int63.zero; generation; version = current_version }; Raw.close old let () = assert (String.length current_version = 8) let v_instance ?(flush_callback = fun () -> ()) ~readonly ~fan_size ~offset file raw = let eight = Int63.of_int 8 in let header = eight ++ eight ++ eight ++ eight ++ fan_size in { header; file; offset; raw; readonly; fan_size; buf = Buffer.create (if readonly then 0 else 4 * 1024); flushed = header ++ offset; flush_callback; } let v ?flush_callback ~fresh ~generation ~fan_size file = let v = v_instance ?flush_callback ~readonly:false file in mkdir (Filename.dirname file); let header = { Raw.Header.offset = Int63.zero; version = current_version; generation } in match Sys.file_exists file with | false -> let x = Unix.openfile file Unix.[ O_CREAT; O_CLOEXEC; O_RDWR ] 0o644 in let raw = Raw.v x in Raw.Header.set raw header; Raw.Fan.set_size raw fan_size; v ~fan_size ~offset:Int63.zero raw | true -> let x = Unix.openfile file Unix.[ O_EXCL; O_CLOEXEC; O_RDWR ] 0o644 in let raw = Raw.v x in if fresh then ( Raw.Header.set raw header; Raw.Fan.set_size raw fan_size; Raw.fsync raw; v ~fan_size ~offset:Int63.zero raw) else let version = Raw.Version.get raw in if version <> current_version then Fmt.failwith "Io.v: unsupported version %s (current version is %s)" version current_version; let offset = Raw.Offset.get raw in let fan_size = Raw.Fan.get_size raw in v ~fan_size ~offset raw let v_readonly file = let v = v_instance ~readonly:true file in mkdir (Filename.dirname file); try let x = Unix.openfile file Unix.[ O_EXCL; O_CLOEXEC; O_RDONLY ] 0o644 in let raw = Raw.v x in try let version = Raw.Version.get raw in if version <> current_version then Fmt.failwith "Io.v: unsupported version %s (current version is %s)" version current_version; let offset = Raw.Offset.get raw in let fan_size = Raw.Fan.get_size raw in Ok (v ~fan_size ~offset raw) with Raw.Not_written -> (* The readonly instance cannot read a file that does not have a header.*) Raw.close raw; Error `No_file_on_disk with | Unix.Unix_error (Unix.ENOENT, _, _) -> (* The readonly instance cannot open a non existing file. *) Error `No_file_on_disk | e -> raise e let exists = Sys.file_exists let size { raw; _ } = (Raw.fstat raw).st_size let size_header t = t.header |> Int63.to_int module Lock = struct type t = { path : string; fd : Unix.file_descr } exception Locked of string let unsafe_lock op f = mkdir (Filename.dirname f); let fd = Unix.openfile f [ Unix.O_CREAT; Unix.O_RDWR ] 0o600 and pid = string_of_int (Unix.getpid ()) in let pid_len = String.length pid in try Unix.lockf fd op 0; if Unix.single_write_substring fd pid 0 pid_len <> pid_len then ( Unix.close fd; failwith "Unable to write PID to lock file") else Some fd with | Unix.Unix_error (Unix.EAGAIN, _, _) -> Unix.close fd; None | e -> Unix.close fd; raise e let with_ic path f = let ic = open_in path in let a = f ic in close_in ic; a let err_rw_lock path = let line = with_ic path input_line in let pid = int_of_string line in Log.err (fun l -> l "Cannot lock %s: index is already opened in write mode by PID %d. \ Current PID is %d." path pid (Unix.getpid ())); raise (Locked path) let lock path = Log.debug (fun l -> l "Locking %s" path); match unsafe_lock Unix.F_TLOCK path with | Some fd -> { path; fd } | None -> err_rw_lock path let unlock { path; fd } = Log.debug (fun l -> l "Unlocking %s" path); Unix.close fd let pp_dump path = match Sys.file_exists path with | false -> None | true -> let contents = with_ic path (fun ic -> really_input_string ic (in_channel_length ic)) in Some (fun ppf -> Fmt.string ppf contents) end end module Semaphore = struct module S = Semaphore_compat.Semaphore.Binary let is_held t = let acquired = S.try_acquire t in if acquired then S.release t; not acquired include S let acquire n t = let x = Mtime_clock.counter () in S.acquire t; let y = Mtime_clock.count x in if Mtime.span_to_s y > 1. then Log.warn (fun l -> l "Semaphore %s was blocked for %a" n Mtime.Span.pp y) let with_acquire n t f = acquire n t; Fun.protect ~finally:(fun () -> S.release t) f end module Thread = struct type 'a t = | Async of { thread : Thread.t; result : ('a, exn) result option ref } | Value of 'a let async f = let result = ref None in let protected_f x = try result := Some (Ok (f x)) with exn -> result := Some (Error exn); raise exn in let thread = Thread.create protected_f () in Async { thread; result } let yield = Thread.yield let return a = Value a let await t = match t with | Value v -> Ok v | Async { thread; result } -> ( let () = Thread.join thread in match !result with | Some (Ok _ as o) -> o | Some (Error exn) -> Error (`Async_exn exn) | None -> assert false) end module Platform = struct module IO = IO module Semaphore = Semaphore module Thread = Thread module Clock = Mtime_clock end module Make (K : Index.Key.S) (V : Index.Value.S) = Index.Make (K) (V) (Platform) module Syscalls = Syscalls module Private = struct module IO = IO module Raw = Raw module Make (K : Index.Key.S) (V : Index.Value.S) = Index.Private.Make (K) (V) (Platform) end
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>