package irmin-pack
Irmin backend which stores values in a pack file
Install
Dune Dependency
Authors
Maintainers
Sources
irmin-2.6.0.tbz
sha256=1db134221e82c424260a0e206b640fcb82902be35eea4137af2bcd9c98d3ac0f
sha512=b334e5b909563787e58790e4665f78a9f21e0f9f976eb7344cb76cbe7db870506bab193cec206e338ba74457896b2176000c936397cf3d44326507300a8193d6
doc/src/irmin-pack/store.ml.html
Source file store.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
include Store_intf let src = Logs.Src.create "irmin.pack" ~doc:"irmin-pack backend" module Log = (val Logs.src_log src : Logs.LOG) exception RO_Not_Allowed = IO.Unix.RO_Not_Allowed exception Unsupported_version of IO.version let ( ++ ) = Int64.add module Cache = IO.Cache open! Import module Pack = Pack module Dict = Pack_dict module Index = Pack_index module Table (K : Irmin.Type.S) = Hashtbl.Make (struct type t = K.t let hash = Irmin.Type.(unstage (short_hash K.t)) ?seed:None let equal = Irmin.Type.(unstage (equal K.t)) end) let pp_version = IO.pp_version module Atomic_write (K : Irmin.Type.S) (V : Irmin.Hash.S) (IO_version : IO.VERSION) = struct let current_version = IO_version.io_version module Tbl = Table (K) module W = Irmin.Private.Watch.Make (K) (V) module IO = IO.Unix type key = K.t type value = V.t type watch = W.watch type t = { index : int64 Tbl.t; cache : V.t Tbl.t; mutable block : IO.t; w : W.t; mutable open_instances : int; } let decode_bin = Irmin.Type.(unstage (decode_bin int32)) let read_length32 ~off block = let buf = Bytes.create 4 in let n = IO.read block ~off buf in assert (n = 4); let n, v = decode_bin (Bytes.unsafe_to_string buf) 0 in assert (n = 4); Int32.to_int v let entry = Irmin.Type.(pair (string_of `Int32) V.t) let key_to_bin_string = Irmin.Type.(unstage (to_bin_string K.t)) let key_of_bin_string = Irmin.Type.(unstage (of_bin_string K.t)) let entry_to_bin_string = Irmin.Type.(unstage (to_bin_string entry)) let value_of_bin_string = Irmin.Type.(unstage (of_bin_string V.t)) let value_decode_bin = Irmin.Type.(unstage (decode_bin V.t)) let set_entry t ?off k v = let k = key_to_bin_string k in let buf = entry_to_bin_string (k, v) in match off with | None -> IO.append t.block buf | Some off -> IO.set t.block buf ~off let pp_branch = Irmin.Type.pp K.t let zero = match value_of_bin_string (String.make V.hash_size '\000') with | Ok x -> x | Error _ -> assert false let equal_val = Irmin.Type.(unstage (equal V.t)) let refill t ~to_ ~from = let rec aux offset = if offset >= to_ then () else let len = read_length32 ~off:offset t.block in let buf = Bytes.create (len + V.hash_size) in let off = offset ++ 4L in let n = IO.read t.block ~off buf in assert (n = Bytes.length buf); let buf = Bytes.unsafe_to_string buf in let h = let h = String.sub buf 0 len in match key_of_bin_string h with | Ok k -> k | Error (`Msg e) -> failwith e in let n, v = value_decode_bin buf len in assert (n = String.length buf); if not (equal_val v zero) then Tbl.add t.cache h v; Tbl.add t.index h offset; (aux [@tailcall]) (off ++ Int64.(of_int @@ (len + V.hash_size))) in aux from let sync_offset t = let former_offset = IO.offset t.block in let former_generation = IO.generation t.block in let h = IO.force_headers t.block in if former_generation <> h.generation then ( Log.debug (fun l -> l "[branches] generation changed, refill buffers"); IO.close t.block; let io = IO.v ~fresh:false ~readonly:true ~version:(Some current_version) (IO.name t.block) in t.block <- io; Tbl.clear t.cache; Tbl.clear t.index; refill t ~to_:h.offset ~from:0L) else if h.offset > former_offset then refill t ~to_:h.offset ~from:former_offset let unsafe_find t k = Log.debug (fun l -> l "[branches] find %a" pp_branch k); if IO.readonly t.block then sync_offset t; try Some (Tbl.find t.cache k) with Not_found -> None let find t k = Lwt.return (unsafe_find t k) let unsafe_mem t k = Log.debug (fun l -> l "[branches] mem %a" pp_branch k); try Tbl.mem t.cache k with Not_found -> false let mem t v = Lwt.return (unsafe_mem t v) let unsafe_remove t k = Tbl.remove t.cache k; try let off = Tbl.find t.index k in set_entry t ~off k zero with Not_found -> () let remove t k = Log.debug (fun l -> l "[branches] remove %a" pp_branch k); unsafe_remove t k; W.notify t.w k None let unsafe_clear ?keep_generation t = Lwt.async (fun () -> W.clear t.w); match current_version with | `V1 -> IO.truncate t.block | `V2 -> IO.clear ?keep_generation t.block; Tbl.clear t.cache; Tbl.clear t.index let clear t = Log.debug (fun l -> l "[branches] clear"); unsafe_clear t; Lwt.return_unit let clear_keep_generation t = Log.debug (fun l -> l "[branches] clear"); unsafe_clear ~keep_generation:() t; Lwt.return_unit let watches = W.v () let valid t = if t.open_instances <> 0 then ( t.open_instances <- t.open_instances + 1; true) else false let unsafe_v ~fresh ~readonly file = let block = IO.v ~fresh ~version:(Some current_version) ~readonly file in let cache = Tbl.create 997 in let index = Tbl.create 997 in let t = { cache; index; block; w = watches; open_instances = 1 } in let h = IO.force_headers block in refill t ~to_:h.offset ~from:0L; t let Cache.{ v = unsafe_v } = Cache.memoize ~clear:unsafe_clear ~valid ~v:(fun () -> unsafe_v) Layout.branch let v ?fresh ?readonly file = Lwt.return (unsafe_v () ?fresh ?readonly file) let unsafe_set t k v = try let off = Tbl.find t.index k in Tbl.replace t.cache k v; set_entry t ~off k v with Not_found -> let offset = IO.offset t.block in set_entry t k v; Tbl.add t.cache k v; Tbl.add t.index k offset let set t k v = Log.debug (fun l -> l "[branches %s] set %a" (IO.name t.block) pp_branch k); unsafe_set t k v; W.notify t.w k (Some v) let equal_v_opt = Irmin.Type.(unstage (equal (option V.t))) let unsafe_test_and_set t k ~test ~set = let v = try Some (Tbl.find t.cache k) with Not_found -> None in if not (equal_v_opt v test) then Lwt.return_false else let return () = Lwt.return_true in match set with | None -> unsafe_remove t k |> return | Some v -> unsafe_set t k v |> return let test_and_set t k ~test ~set = Log.debug (fun l -> l "[branches] test-and-set %a" pp_branch k); unsafe_test_and_set t k ~test ~set >>= function | true -> W.notify t.w k set >|= fun () -> true | false -> Lwt.return_false let list t = Log.debug (fun l -> l "[branches] list"); let keys = Tbl.fold (fun k _ acc -> k :: acc) t.cache [] in Lwt.return keys let watch_key t = W.watch_key t.w let watch t = W.watch t.w let unwatch t = W.unwatch t.w let unsafe_close t = t.open_instances <- t.open_instances - 1; if t.open_instances = 0 then ( Tbl.reset t.index; Tbl.reset t.cache; if not (IO.readonly t.block) then IO.flush t.block; IO.close t.block; W.clear t.w) else Lwt.return_unit let close t = unsafe_close t let flush t = IO.flush t.block end module IO = IO.Unix let latest_version = `V2 (** Migrate data from the IO [src] (with [name] in path [root_old]) into the temporary dir [root_tmp], then swap in the replaced version. *) let migrate_io_to_v2 ~progress src = IO.migrate ~progress src `V2 |> function | Ok () -> IO.close src | Error (`Msg s) -> invalid_arg s let migrate config = if Config.readonly config then raise RO_Not_Allowed; Log.debug (fun l -> l "[%s] migrate" (Config.root config)); Layout.stores ~root:(Config.root config) |> List.map (fun store -> let io = IO.v ~version:None ~fresh:false ~readonly:true store in let version = IO.version io in (store, io, version)) |> List.partition (fun (_, _, v) -> v = latest_version) |> function | migrated, [] -> Log.info (fun l -> l "Store at %s is already in current version (%a)" (Config.root config) pp_version latest_version); List.iter (fun (_, io, _) -> IO.close io) migrated | migrated, to_migrate -> List.iter (fun (_, io, _) -> IO.close io) migrated; (match migrated with | [] -> () | _ :: _ -> let pp_ios = Fmt.(Dump.list (using (fun (n, _, _) -> n) string)) in Log.warn (fun l -> l "Store is in an inconsistent state: files %a have already been \ upgraded, but %a have not. Upgrading the remaining files now." pp_ios migrated pp_ios to_migrate)); let total = to_migrate |> List.map (fun (_, io, _) -> IO.offset io) |> List.fold_left Int64.add 0L in let bar, progress = Utils.Progress.counter ~total ~sampling_interval:100 ~message:"Migrating store" ~pp_count:Utils.pp_bytes () in List.iter (fun (_, io, _) -> migrate_io_to_v2 ~progress io) to_migrate; Utils.Progress.finalise bar module Checks (Index : Pack_index.S) = struct let null = match Sys.os_type with | "Unix" | "Cygwin" -> "/dev/null" | "Win32" -> "NUL" | _ -> invalid_arg "invalid os type" let integrity_check ?ppf ~auto_repair ~check index = let ppf = match ppf with | Some p -> p | None -> open_out null |> Format.formatter_of_out_channel in Fmt.pf ppf "Running the integrity_check.\n%!"; let nb_absent = ref 0 in let nb_corrupted = ref 0 in let exception Cannot_fix in let bar, (progress_contents, progress_nodes, progress_commits) = Utils.Progress.increment () in let f (k, (offset, length, m)) = match m with | 'B' -> progress_contents (); check ~kind:`Contents ~offset ~length k | 'N' | 'I' -> progress_nodes (); check ~kind:`Node ~offset ~length k | 'C' -> progress_commits (); check ~kind:`Commit ~offset ~length k | _ -> invalid_arg "unknown content type" in let result = if auto_repair then try Index.filter index (fun binding -> match f binding with | Ok () -> true | Error `Wrong_hash -> raise Cannot_fix | Error `Absent_value -> incr nb_absent; false); if !nb_absent = 0 then Ok `No_error else Ok (`Fixed !nb_absent) with Cannot_fix -> Error (`Cannot_fix "Not implemented") else ( Index.iter (fun k v -> match f (k, v) with | Ok () -> () | Error `Wrong_hash -> incr nb_corrupted | Error `Absent_value -> incr nb_absent) index; if !nb_absent = 0 && !nb_corrupted = 0 then Ok `No_error else Error (`Corrupted (!nb_corrupted + !nb_absent))) in Utils.Progress.finalise bar; result end
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>