package irmin-pack
Irmin backend which stores values in a pack file
Install
Dune Dependency
Authors
Maintainers
Sources
irmin-3.4.2.tbz
sha256=964512f77697947a73dd1875c634d6804e98f14167c5aadd3c4f85b5f30ffb53
sha512=d56d3285daa2dd36f1eaff867af5c8f44c61bba4de1a5c7bb5347652dbc460c343483ab61de1a9302f664af357eacc964dc5c8b4477f6d01372dc028b162e98b
doc/src/irmin-pack.unix/gc.ml.html
Source file gc.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647
(* * Copyright (c) 2022-2022 Tarides <contact@tarides.com> * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above * copyright notice and this permission notice appear in all copies. * * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) open! Import include Gc_intf (** Code for running in an async GC worker thread. *) module Worker = struct module Payload = Control_file.Latest_payload let buffer_size = 8192 exception Pack_error = Errors.Pack_error module type S = sig module Args : Args val run_and_output_result : generation:int -> string -> Args.key -> int63 val transfer_append_exn : dispatcher:Args.Dispatcher.t -> append_exn:(string -> unit) -> off:int63 -> len:int63 -> bytes -> unit type gc_output = (int63, Args.Errs.t) result [@@deriving irmin] end module Make (Args : Args) : S with module Args := Args = struct open Args module Io = Fm.Io module Mapping_file = Dispatcher.Mapping_file module Ao = struct include Append_only_file.Make (Fm.Io) (Errs) let create_rw_exn ~path = create_rw ~path ~overwrite:true ~auto_flush_threshold:1_000_000 ~auto_flush_procedure:`Internal |> Errs.raise_if_error end module X = struct type t = int63 [@@deriving irmin] let equal = Irmin.Type.(unstage (equal t)) let hash = Irmin.Type.(unstage (short_hash t)) let hash (t : t) : int = hash t end module Table = Hashtbl.Make (X) let string_of_key = Irmin.Type.to_string key_t let transfer_append_exn ~dispatcher ~append_exn ~(off : int63) ~(len : int63) buffer = let read_exn = Dispatcher.read_in_prefix_and_suffix_exn dispatcher in let buffer_size = Bytes.length buffer |> Int63.of_int in let rec aux off len_remaining = let open Int63.Syntax in let min a b = if a < b then a else b in let len = min buffer_size len_remaining in let len' = Int63.to_int len in read_exn ~off ~len:len' buffer; let () = if len = buffer_size then append_exn (Bytes.to_string buffer) else append_exn (Bytes.sub_string buffer 0 len') in let len_remaining = len_remaining - len in if len_remaining > Int63.zero then aux (off + len) len_remaining in aux off len (** [iter_from_node_key node_key _ _ ~f] calls [f] with the key of the node and iterates over its children. [f k] returns [Follow] or [No_follow], indicating the iteration algorithm if the children of [k] should be traversed or skiped. *) let iter node_key node_store ~f k = let marks = Table.create 1024 in let mark offset = Table.add marks offset () in let has_mark offset = Table.mem marks offset in let rec iter_from_node_key_exn node_key node_store ~f k = match Node_store.unsafe_find ~check_integrity:false node_store node_key with | None -> raise (Pack_error (`Dangling_key (string_of_key node_key))) | Some node -> iter_from_node_children_exn node_store ~f (Node_value.pred node) k and iter_from_node_children_exn node_store ~f children k = match children with | [] -> k () | (_step, kinded_key) :: tl -> ( let k () = iter_from_node_children_exn node_store ~f tl k in match kinded_key with | `Contents key -> let (_ : int63) = f key in k () | `Inode key | `Node key -> let offset = f key in if has_mark offset then k () else ( mark offset; iter_from_node_key_exn key node_store ~f k)) in iter_from_node_key_exn node_key node_store ~f k (* Dangling_parent_commit are the parents of the gced commit. They are kept on disk in order to correctly deserialised the gced commit. *) let magic_parent = Pack_value.Kind.to_magic Pack_value.Kind.Dangling_parent_commit (* Transfer the commit with a different magic. Note that this is modifying existing written data. *) let transfer_parent_commit_exn ~read_exn ~write_exn ~mapping key = let off, len = match Pack_key.inspect key with | Indexed _ -> (* As this is the second time we are reading this key, this case is unreachable. *) assert false | Direct { offset; length; _ } -> (offset, length) in let buffer = Bytes.create len in read_exn ~off ~len buffer; let accessor = Dispatcher.create_accessor_to_prefix_exn mapping ~off ~len in Bytes.set buffer Hash.hash_size magic_parent; (* Bytes.unsafe_to_string usage: We assume read_exn returns unique ownership of buffer to this function. Then at the call to Bytes.unsafe_to_string we give up unique ownership to buffer (we do not modify it thereafter) in return for ownership of the resulting string, which we pass to write_exn. This usage is safe. *) write_exn ~off:accessor.poff ~len (Bytes.unsafe_to_string buffer) let create_new_suffix ~root ~generation = let path = Irmin_pack.Layout.V3.suffix ~root ~generation in Ao.create_rw_exn ~path let run ~generation root commit_key = let open Result_syntax in let config = Irmin_pack.Conf.init ~fresh:false ~readonly:true ~lru_size:0 root in (* Step 1. Open the files *) [%log.debug "GC: opening files in RO mode"]; let fm = Fm.open_ro config |> Errs.raise_if_error in Errors.finalise_exn (fun _outcome -> Fm.close fm |> Errs.log_if_error "GC: Close File_manager") @@ fun () -> let dict = Dict.v fm |> Errs.raise_if_error in let dispatcher = Dispatcher.v fm |> Errs.raise_if_error in let node_store = Node_store.v ~config ~fm ~dict ~dispatcher in let commit_store = Commit_store.v ~config ~fm ~dict ~dispatcher in (* Step 2. Load commit which will make [commit_key] [Direct] if it's not already the case. *) let commit = match Commit_store.unsafe_find ~check_integrity:false commit_store commit_key with | None -> Errs.raise_error (`Commit_key_is_dangling (string_of_key commit_key)) | Some commit -> commit in let commit_offset, _ = let state : _ Pack_key.state = Pack_key.inspect commit_key in match state with | Indexed _ -> assert false | Direct x -> (x.offset, x.length) in (* Step 3. Create the new mapping. *) let mapping = (* Step 3.1 Start [Mapping_file] routine which will create the reachable file. *) (fun f -> Mapping_file.create ~root ~generation ~register_entries:f |> Errs.raise_if_error) @@ fun ~register_entry -> (* Step 3.2 Put the commit parents in the reachable file. The parent(s) of [commit_key] must be included in the iteration because, when decoding the [Commit_value.t] at [commit_key], the parents will have to be read in order to produce a key for them. *) let register_object_exn key = match Pack_key.inspect key with | Indexed _ -> raise (Pack_error (`Commit_parent_key_is_indexed (string_of_key key))) | Direct { offset; length; _ } -> register_entry ~off:offset ~len:length in List.iter register_object_exn (Commit_value.parents commit); (* Step 3.3 Put the nodes and contents in the reachable file. *) let register_object_exn key = match Pack_key.inspect key with | Indexed _ -> raise (Pack_error (`Node_or_contents_key_is_indexed (string_of_key key))) | Direct { offset; length; _ } -> register_entry ~off:offset ~len:length; offset in let node_key = Commit_value.node commit in let (_ : int63) = register_object_exn node_key in iter node_key node_store ~f:register_object_exn (fun () -> ()); (* Step 3.4 Return and let the [Mapping_file] routine create the mapping file. *) () in let () = (* Step 4. Create the new prefix. *) let prefix = let path = Irmin_pack.Layout.V3.prefix ~root ~generation in Ao.create_rw_exn ~path in let () = Errors.finalise_exn (fun _outcome -> Ao.close prefix |> Errs.log_if_error "GC: Close prefix") @@ fun () -> (); (* Step 5. Transfer to the new prefix, flush and close. *) [%log.debug "GC: transfering to the new prefix"]; let buffer = Bytes.create buffer_size in (* Step 5.1. Transfer all. *) let append_exn = Ao.append_exn prefix in let f ~off ~len = let len = Int63.of_int len in transfer_append_exn ~dispatcher ~append_exn ~off ~len buffer in let () = Mapping_file.iter_exn mapping f in Ao.flush prefix |> Errs.raise_if_error in (* Step 5.2. Transfer again the parent commits but with a modified magic. Reopen the new prefix, this time _not_ in append-only as we have to modify data inside the file. *) let read_exn ~off ~len buf = let accessor = Dispatcher.create_accessor_exn dispatcher ~off ~len in Dispatcher.read_exn dispatcher accessor buf in let prefix = let path = Irmin_pack.Layout.V3.prefix ~root ~generation in Io.open_ ~path ~readonly:false |> Errs.raise_if_error in Errors.finalise_exn (fun _outcome -> Io.fsync prefix >>= (fun _ -> Io.close prefix) |> Errs.log_if_error "GC: Close prefix after parent rewrite") @@ fun () -> let write_exn = Io.write_exn prefix in List.iter (fun key -> transfer_parent_commit_exn ~read_exn ~write_exn ~mapping key) (Commit_value.parents commit) in (* Step 6. Create the new suffix and prepare 2 functions for read and write operations. *) let buffer = Bytes.create buffer_size in [%log.debug "GC: creating new suffix"]; let suffix = create_new_suffix ~root ~generation in Errors.finalise_exn (fun _outcome -> Ao.fsync suffix >>= (fun _ -> Ao.close suffix) |> Errs.log_if_error "GC: Close suffix") @@ fun () -> let append_exn = Ao.append_exn suffix in let transfer_exn = transfer_append_exn ~dispatcher ~append_exn buffer in (* Step 7. Transfer to the next suffix. *) [%log.debug "GC: transfering to the new suffix"]; let num_iterations = 7 in (* [transfer_loop] is needed because after garbage collection there may be new objects at the end of the suffix file that need to be copied over *) let rec transfer_loop i ~off = if i = 0 then off else let () = Fm.reload fm |> Errs.raise_if_error in let pl : Payload.t = Fm.Control.payload (Fm.control fm) in let end_offset = Dispatcher.offset_of_suffix_poff dispatcher pl.suffix_end_poff in let len = Int63.Syntax.(end_offset - off) in [%log.debug "GC: transfer_loop iteration %d, offset %a, length %a" (num_iterations - i + 1) Int63.pp off Int63.pp len]; let () = transfer_exn ~off ~len in (* Check how many bytes are left, [4096*5] is selected because it is roughly the number of bytes that requires a read from the block device on ext4 *) if Int63.to_int len < 4096 * 5 then end_offset else let off = Int63.Syntax.(off + len) in transfer_loop ~off (i - 1) in let new_suffix_end_offset = transfer_loop ~off:commit_offset num_iterations in Ao.flush suffix |> Errs.raise_if_error; (* Step 8. Inform the caller of the end_offset copied. *) new_suffix_end_offset type gc_output = (int63, Args.Errs.t) result [@@deriving irmin] let write_gc_output ~root ~generation output = let open Result_syntax in let path = Irmin_pack.Layout.V3.gc_result ~root ~generation in let* io = Io.create ~path ~overwrite:true in let out = Irmin.Type.to_json_string gc_output_t output in let* () = Io.write_string io ~off:Int63.zero out in let* () = Io.fsync io in Io.close io (* No one catches errors when this function terminates. Write the result in a file and terminate the process with an exception, if needed. *) let run_and_output_result ~generation root commit_key = let result = Errs.catch (fun () -> run ~generation root commit_key) in let write_result = write_gc_output ~root ~generation result in write_result |> Errs.raise_if_error; result |> Errs.raise_if_error end end (** Maker for a module that can manage GC processes. *) module Make (Args : Args) = struct module Args = Args open Args module Io = Fm.Io module Ao = Append_only_file.Make (Io) (Errs) module Worker = Worker.Make (Args) type t = { root : string; generation : int; task : Async.t; unlink : bool; offset : int63; elapsed : unit -> float; resolver : (stats, Errs.t) result Lwt.u; promise : (stats, Errs.t) result Lwt.t; dispatcher : Dispatcher.t; fm : Fm.t; contents : read Contents_store.t; node : read Node_store.t; commit : read Commit_store.t; mutable stats : stats option; } let v ~root ~generation ~unlink ~offset ~dispatcher ~fm ~contents ~node ~commit commit_key = let unlink_result_file () = let result_file = Irmin_pack.Layout.V3.gc_result ~root ~generation in match Io.unlink result_file with | Ok () -> () | Error (`Sys_error msg as err) -> if msg <> Fmt.str "%s: No such file or directory" result_file then [%log.warn "Unlinking temporary files from previous failed gc. Failed with \ error %a" (Irmin.Type.pp Errs.t) err] in (* Unlink next gc's result file, in case it is on disk, for instance after a failed gc. *) unlink_result_file (); (* function to track durations *) let elapsed = let c0 = Mtime_clock.counter () in fun () -> Mtime_clock.count c0 |> Mtime.Span.to_s in (* internal promise for gc *) let promise, resolver = Lwt.wait () in (* start worker task *) let task = Async.async (fun () -> let (_ : int63) = Worker.run_and_output_result root commit_key ~generation in ()) in { root; generation; unlink; offset; task; promise; resolver; elapsed; dispatcher; fm; contents; node; commit; stats = None; } let open_new_suffix ~end_poff { root; generation; _ } = let open Result_syntax in let path = Irmin_pack.Layout.V3.suffix ~root ~generation in (* As the new suffix is necessarily in V3, the dead_header_size is 0. *) let dead_header_size = 0 in let auto_flush_threshold = 1_000_000 in let* suffix = Ao.open_rw ~path ~end_poff ~dead_header_size ~auto_flush_procedure:`Internal ~auto_flush_threshold in Ok suffix let transfer_latest_newies ~new_suffix_start_offset ~new_suffix_end_offset t = [%log.debug "Gc in main: transfer latest newies"]; let open Result_syntax in let open Int63.Syntax in let old_suffix_end_offset = Dispatcher.end_offset t.dispatcher in let remaining = old_suffix_end_offset - new_suffix_end_offset in (* When opening the suffix we need to provide a physical offset. We compute it from the global ones. *) let suffix_end_poff = new_suffix_end_offset - new_suffix_start_offset in let* new_suffix = open_new_suffix ~end_poff:suffix_end_poff t in Errors.finalise (fun _ -> Ao.close new_suffix |> Errs.log_if_error "GC: Close suffix after copy latest newies") @@ fun () -> let buffer = Bytes.create 8192 in let append_exn = Ao.append_exn new_suffix in let flush_and_raise () = Ao.flush new_suffix |> Errs.raise_if_error in let* () = Errs.catch (fun () -> Worker.transfer_append_exn ~dispatcher:t.dispatcher ~append_exn ~off:new_suffix_end_offset ~len:remaining buffer; flush_and_raise ()) in Ok old_suffix_end_offset let swap_and_purge ~new_suffix_start_offset ~new_suffix_end_offset t = let open Result_syntax in let c0 = Mtime_clock.counter () in let* () = Fm.swap t.fm ~generation:t.generation ~new_suffix_start_offset ~new_suffix_end_offset in let span1 = Mtime_clock.count c0 |> Mtime.Span.to_s in (* No need to purge dict here, as it is global to the store. *) (* No need to purge index here. It is global too, but some hashes may not point to valid offsets anymore. Pack_store will just say that such keys are not member of the store. *) Contents_store.purge_lru t.contents; Node_store.purge_lru t.node; Commit_store.purge_lru t.commit; let span2 = Mtime_clock.count c0 |> Mtime.Span.to_s in [%log.debug "Gc swap and purge: %.6fs, %.6fs" span1 (span2 -. span1)]; [%log.info "GC: end"]; Ok () let unlink_all { root; generation; _ } = let result = let open Result_syntax in (* Unlink previous suffix. *) let suffix = Irmin_pack.Layout.V3.suffix ~root ~generation:(generation - 1) in let* () = Io.unlink suffix in let* () = if generation >= 2 then (* Unlink previous prefix. *) let prefix = Irmin_pack.Layout.V3.prefix ~root ~generation:(generation - 1) in let* () = Io.unlink prefix in (* Unlink previous mapping. *) let mapping = Irmin_pack.Layout.V3.mapping ~root ~generation:(generation - 1) in let* () = Io.unlink mapping in Ok () else Ok () in (* Unlink current gc's result.*) let result = Irmin_pack.Layout.V3.gc_result ~root ~generation in Io.unlink result in match result with | Error e -> [%log.warn "Unlinking temporary files after gc, failed with error %a" (Irmin.Type.pp Errs.t) e] | Ok () -> () let gc_errors status gc_output = let extend_error s = function | `Gc_process_error str -> `Gc_process_error (Fmt.str "%s %s" s str) | `Corrupted_gc_result_file str -> `Gc_process_died_without_result_file (Fmt.str "%s %s" s str) in match (status, gc_output) with | `Failure s, Error e -> Error (extend_error s e) | `Cancelled, Error e -> Error (extend_error "cancelled" e) | `Success, Error e -> Error (extend_error "success" e) | `Cancelled, Ok _ -> Error (`Gc_process_error "cancelled") | `Failure s, Ok _ -> Error (`Gc_process_error s) | `Success, Ok _ -> assert false let read_gc_output ~root ~generation = let open Result_syntax in let read_file () = let path = Irmin_pack.Layout.V3.gc_result ~root ~generation in let* io = Io.open_ ~path ~readonly:true in let* len = Io.read_size io in let len = Int63.to_int len in let* string = Io.read_to_string io ~off:Int63.zero ~len in let* () = Io.close io in Ok string in let read_error err = `Corrupted_gc_result_file (Irmin.Type.to_string Errs.t err) in let gc_error err = `Gc_process_error (Irmin.Type.to_string Errs.t err) in let* s = read_file () |> Result.map_error read_error in match Irmin.Type.of_json_string Worker.gc_output_t s with | Error (`Msg error) -> Error (`Corrupted_gc_result_file error) | Ok ok -> ok |> Result.map_error gc_error let clean_after_abort t = Fm.cleanup ~root:t.root ~generation:(t.generation - 1) let finalise ~wait t = match t.stats with | Some stats -> Lwt.return_ok (`Finalised stats) | None -> ( let go status = let start = t.elapsed () in let s = ref { duration = 0.; finalisation_duration = 0.; read_gc_output_duration = 0.; transfer_latest_newies_duration = 0.; swap_duration = 0.; unlink_duration = 0.; } in let time on_end f = let counter = Mtime_clock.counter () in let res = f () in on_end (Mtime_clock.count counter |> Mtime.Span.to_s); res in let gc_output = time (fun t -> s := { !s with read_gc_output_duration = t }) @@ fun () -> read_gc_output ~root:t.root ~generation:t.generation in let result = let open Result_syntax in match (status, gc_output) with | `Success, Ok new_suffix_end_offset_before -> let* new_suffix_end_offset = time (fun t -> s := { !s with transfer_latest_newies_duration = t }) @@ fun () -> transfer_latest_newies ~new_suffix_start_offset:t.offset ~new_suffix_end_offset:new_suffix_end_offset_before t in let* () = time (fun t -> s := { !s with swap_duration = t }) @@ fun () -> swap_and_purge ~new_suffix_start_offset:t.offset ~new_suffix_end_offset t in (if t.unlink then time (fun t -> s := { !s with unlink_duration = t }) @@ fun () -> unlink_all t); let duration = t.elapsed () in s := { !s with duration; finalisation_duration = duration -. start; }; t.stats <- Some !s; [%log.debug "Gc ended. %a, newies bytes:%a" pp_stats !s Int63.pp (Int63.sub new_suffix_end_offset new_suffix_end_offset_before)]; let () = Lwt.wakeup_later t.resolver (Ok !s) in Ok (`Finalised !s) | _ -> clean_after_abort t; let err = gc_errors status gc_output in let () = Lwt.wakeup_later t.resolver err in err in Lwt.return result in if wait then let* status = Async.await t.task in go status else match Async.status t.task with | `Running -> Lwt.return_ok `Running | #Async.outcome as status -> go status) let on_finalise t f = (* Ignore returned promise since the purpose of this function is to add asynchronous callbacks to the GC process -- this promise binding is an internal implementation detail. This is safe since the callback [f] is attached to [t.running_gc.promise], which is referenced for the lifetime of a GC process. *) let _ = Lwt.bind t.promise f in () let cancel t = let cancelled = Async.cancel t.task in if cancelled then clean_after_abort t; cancelled end
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>