Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file pack_store.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553(*
* Copyright (c) 2018-2022 Tarides <contact@tarides.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)openImportincludePack_store_intfexceptionInvalid_readofstringexceptionCorrupted_storeofstringexceptionDangling_hashletinvalid_readfmt=Fmt.kstr(funs->raise(Invalid_reads))fmtletcorrupted_storefmt=Fmt.kstr(funs->raise(Corrupted_stores))fmtmoduleTable(K:Irmin.Hash.S)=Hashtbl.Make(structtypet=K.tlethash=K.short_hashletequal=Irmin.Type.(unstage(equalK.t))end)moduleMake_without_close_checks(Fm:File_manager.S)(Dict:Dict.S)(Dispatcher:Dispatcher.SwithmoduleFm=Fm)(Hash:Irmin.Hash.Swithtypet=Fm.Index.key)(Val:Pack_value.Persistentwithtypehash:=Hash.tandtypekey:=Hash.tPack_key.t)(Errs:Io_errors.SwithmoduleIo=Fm.Io)=structmoduleTbl=Table(Hash)moduleControl=Fm.ControlmoduleSuffix=Fm.SuffixmoduleIndex=Fm.IndexmoduleKey=Pack_key.Make(Hash)moduleLru=structincludeLruletaddtkv=Val.to_kindedv|>addtk(Val.weightv)letfindtk=findtk|>Val.of_kindedendtypefile_manager=Fm.ttypedict=Dict.ttypedispatcher=Dispatcher.ttype'at={lru:Lru.t;staging:Val.tTbl.t;indexing_strategy:Irmin_pack.Indexing_strategy.t;fm:Fm.t;dict:Dict.t;dispatcher:Dispatcher.t;}typehash=Hash.t[@@derivingirmin~pp~equal~decode_bin]typekey=Key.t[@@derivingirmin~pp]typevalue=Val.t[@@derivingirmin~pp]letget_locationtk=matchPack_key.inspectkwith|Indexedhash->(matchIndex.find(Fm.indext.fm)hashwith|None->raiseDangling_hash|Some(off,len,_kind)->Pack_key.promote_exnk~offset:off~length:len;(off,len,None))|Direct{offset;length;volume_identifier;_}->(offset,length,volume_identifier)letget_offsettk=matchPack_key.to_offsetkwith|Someoff->off|None->letoff,_,_=get_locationtkinoffletget_lengthtk=matchPack_key.to_lengthkwith|Somelen->len|None->let_,len,_=get_locationtkinlenletlen_of_direct_keyk=matchPack_key.inspectkwith|Indexed_->assertfalse|Direct{length;_}->lengthletoff_of_direct_keyk=matchPack_key.to_offsetkwith|None->assertfalse|Someoffset->offsetletindex_direct_with_kindthash=[%log.debug"index %a"pp_hashhash];matchIndex.find(Fm.indext.fm)hashwith|None->None|Some(offset,length,kind)->letkey=Pack_key.v_direct~offset~lengthhashinSome(key,kind)letindex_directthash=index_direct_with_kindthash|>Option.map(fun(key,_)->key)letindexthash=Lwt.return(index_directthash)letv~config~fm~dict~dispatcher~lru=letindexing_strategy=Conf.indexing_strategyconfiginletstaging=Tbl.create127inFm.register_suffix_consumerfm~after_flush:(fun()->Tbl.clearstaging);Fm.register_prefix_consumerfm~after_reload:(fun()->Ok(Lru.clearlru));{lru;staging;indexing_strategy;fm;dict;dispatcher}moduleEntry_prefix=structtypet={hash:hash;kind:Pack_value.Kind.t;size_of_value_and_length_header:intoption;(** Remaining bytes in the entry after reading the hash and the kind
(i.e. the length of the length header + the value of the length
header), if the entry has a length header (otherwise [None]).
NOTE: the length stored in the index and in direct pack keys is
the {!total_entry_length} (including the hash and the kind). See
[pack_value.mli] for a description. *)}[@@derivingirmin~pp_dump]letmin_length=Hash.hash_size+1letmax_length=Hash.hash_size+1+Varint.max_encoded_sizelettotal_entry_lengtht=Option.map(funlen->min_length+len)t.size_of_value_and_length_headerendletread_and_decode_entry_prefix~off?volume_identifierdispatcher=letbuf=Bytes.createEntry_prefix.max_lengthinlet_len,_volume=try(* We may read fewer then [Entry_prefix.max_length] bytes when reading the
final entry in the pack file (if the data section of the entry is
shorter than [Varint.max_encoded_size]. In this case, an invalid read
may be discovered below when attempting to decode the length header. *)Dispatcher.read_range_exndispatcher?volume_identifier~off~min_len:Entry_prefix.min_length~max_len:Entry_prefix.max_lengthbufwithErrors.Pack_error`Read_out_of_bounds->invalid_read"Attempted to read an entry at offset %a in the pack file, but got \
less than %d bytes"Int63.ppoffEntry_prefix.max_lengthinlethash=(* Bytes.unsafe_to_string usage: buf is created locally, so we have unique
ownership; we assume Dispatcher.read_at_most_exn returns unique
ownership; use of Bytes.unsafe_to_string converts buffer to shared
ownership; the rest of the code seems to require only shared ownership
(buffer is read, but not mutated). This is safe. *)decode_bin_hash(Bytes.unsafe_to_stringbuf)(ref0)inletkind=Pack_value.Kind.of_magic_exn(Bytes.getbufHash.hash_size)inletsize_of_value_and_length_header=matchVal.length_headerkindwith|None->None|Some`Varint->letlength_header_start=Entry_prefix.min_lengthin(* The bytes starting at [length_header_start] are a
variable-length length field (if they exist / were read
correctly): *)letpos_ref=reflength_header_startin(* Bytes.unsafe_to_string usage: buf is shared at this point; we assume
Varint.decode_bin requires only shared ownership. This usage is safe. *)letlength_header=Varint.decode_bin(Bytes.unsafe_to_stringbuf)pos_refinletlength_header_length=!pos_ref-length_header_startinSome(length_header_length+length_header)in{Entry_prefix.hash;kind;size_of_value_and_length_header}(* This function assumes magic is written at hash_size + 1 for every
object. *)letgcedtbuf=letkind=Pack_value.Kind.of_magic_exn(Bytes.getbufHash.hash_size)inmatch(kind,Fm.gc_behaviourt.fm)with|kind,`Delete->kind=Pack_value.Kind.Dangling_parent_commit|_,`Archive->falseletpack_file_contains_keytk=letoff,_,volume_identifier=get_locationtkinletlen=Hash.hash_size+1inletbuf=Bytes.createleninlet(_volume:Lower.volume_identifieroption)=Dispatcher.read_exnt.dispatcher~off~len?volume_identifierbufinifgcedtbufthenfalseelse(* Bytes.unsafe_to_string usage: [buf] is local and never reused after
the call to [decode_bin_hash]. *)lethash=decode_bin_hash(Bytes.unsafe_to_stringbuf)(ref0)inifnot(equal_hashhash(Key.to_hashk))theninvalid_read"invalid key %a checked for membership (read hash %a at this offset \
instead)"pp_keykpp_hashhash;(* At this point we consider the key to be contained in the pack
file. However, we could also be in the presence of a forged (or
unlucky) key that points to an offset that mimics a real pack
entry (e.g. in the middle of a blob). *)trueletpack_file_contains_keytk=trypack_file_contains_keytkwith|Dangling_hash->false|Errors.Pack_error`Read_out_of_bounds->(* Can't fit an entry into this suffix of the store, so this key
isn't (yet) valid. If we're a read-only instance, the key may
become valid on [reload]; otherwise we know that this key wasn't
constructed for this store. *)(ifnot(Control.readonly(Fm.controlt.fm))thenletio_offset=Dispatcher.end_offsett.dispatcherininvalid_read"invalid key %a checked for membership (IO offset = %a)"pp_keykInt63.ppio_offset);false|Errors.Pack_error(`Invalid_sparse_read_)->false|Errors.Pack_error(`Invalid_prefix_read_)->falseletunsafe_memtk=[%log.debug"[pack] mem %a"pp_keyk];matchPack_key.inspectkwith|Indexedhash->(* The key doesn't contain an offset, let's skip the lookup in [lru] and
go straight to disk read. *)Tbl.memt.staginghash||pack_file_contains_keytk|Direct{offset;hash;_}->Tbl.memt.staginghash||Lru.memt.lruoffset||pack_file_contains_keytkletmemtk=letb=unsafe_memtkinLwt.returnbletcheck_hashhv=leth'=Val.hashvinifequal_hashhh'thenOk()elseError(h,h')letcheck_keykv=check_hash(Key.to_hashk)v(** Produce a key from an offset in the context of decoding inode and commit
children. *)letkey_of_offset?volume_identifiertoffset=[%log.debug"key_of_offset: %a"Int63.ppoffset];(* Attempt to eagerly read the length at the same time as reading the
hash in order to save an extra IO read when dereferencing the key: *)letentry_prefix=read_and_decode_entry_prefix?volume_identifier~off:offsett.dispatcherin(* This function is called on the parents of a commit when deserialising
it. Dangling_parent_commit are usually treated as removed objects,
except here, where in order to correctly deserialise the gced commit,
they are treated as kept commits.
Volume identifier is excplicitly set to [None] for dangling parent commits
so that its data will not be read from the volume (or prefix). When it is
read, the routing will try to find its location in a previous volume.
*)letkind,volume_identifier=matchentry_prefix.kindwith|Pack_value.Kind.Dangling_parent_commit->(Pack_value.Kind.Commit_v2,None)|kind->(kind,volume_identifier)inletkey=letentry_prefix={entry_prefixwithkind}inmatchEntry_prefix.total_entry_lengthentry_prefixwith|Somelength->Pack_key.v_direct~offset~length?volume_identifierentry_prefix.hash|None->(* NOTE: we could store [offset] in this key, but since we know the
entry doesn't have a length header we'll need to check the index
when dereferencing this key anyway. {i Not} storing the offset
avoids doing another failed check in the pack file for the length
header during [find]. *)Pack_key.v_indexedentry_prefix.hashinkeyletfind_in_pack_file~key_of_offsettkey=letoff,len,volume_identifier=get_locationtkeyinletbuf=Bytes.createleninletvolume_identifier=Dispatcher.read_exnt.dispatcher~off~len?volume_identifierbufinifgcedtbufthenNoneelselet()=Pack_key.set_volume_identifier_exn~volume_identifierkeyinletkey_of_offset=key_of_offset?volume_identifiertinletkey_of_hash=Pack_key.v_indexedinletdict=Dict.findt.dictinletv=(* Bytes.unsafe_to_string usage: buf created, uniquely owned; after
creation, we assume Dispatcher.read_if_not_gced returns unique
ownership; we give up unique ownership in call to
[Bytes.unsafe_to_string]. This is safe. *)Val.decode_bin~key_of_offset~key_of_hash~dict(Bytes.unsafe_to_stringbuf)(ref0)inSomevletfind_in_pack_file~key_of_offsettkey=tryfind_in_pack_file~key_of_offsettkeywith|Dangling_hash->None|Errors.Pack_error`Read_out_of_bounds->((* Can't fit an entry into this suffix of the store, so this key
* isn't (yet) valid. If we're a read-only instance, the key may
* become valid on [reload]; otherwise we know that this key wasn't
* constructed for this store. *)letio_offset=Dispatcher.end_offsett.dispatcherinmatchControl.readonly(Fm.controlt.fm)with|false->invalid_read"attempt to dereference invalid key %a (IO offset = %a)"pp_keykeyInt63.ppio_offset|true->[%log.debug"Direct store key references an unknown starting offset %a \
(length = %d, IO offset = %a)"Int63.pp(off_of_direct_keykey)(len_of_direct_keykey)Int63.ppio_offset];None)|Errors.Pack_error(`Invalid_sparse_read_)->None|Errors.Pack_error(`Invalid_prefix_read_)ase->raiseeletunsafe_find~check_integritytk=[%log.debug"[pack] find %a"pp_keyk];letfind_location=refStats.Pack_store.Not_foundinletfind_in_pack_file_guarded~is_indexed=letres=find_in_pack_file~key_of_offsettkinOption.iter(funv->ifis_indexedthenfind_location:=Stats.Pack_store.Pack_indexedelsefind_location:=Stats.Pack_store.Pack_direct;Lru.addt.lru(off_of_direct_keyk)v;ifcheck_integritythencheck_keykv|>function|Ok()->()|Error(expected,got)->corrupted_store"Got hash %a, expecting %a (for val: %a)."pp_hashgotpp_hashexpectedpp_valuev)res;resinletvalue_opt=matchPack_key.inspectkwith|Indexedhash->(matchTbl.findt.staginghashwith|v->(* Hit in staging, but we don't have offset to put in LRU *)find_location:=Stats.Pack_store.Staging;Somev|exceptionNot_found->find_in_pack_file_guarded~is_indexed:true)|Direct{offset;hash;_}->(matchTbl.findt.staginghashwith|v->Lru.addt.lruoffsetv;find_location:=Stats.Pack_store.Staging;Somev|exceptionNot_found->(matchLru.findt.lruoffsetwith|v->find_location:=Stats.Pack_store.Lru;Somev|exceptionNot_found->find_in_pack_file_guarded~is_indexed:false))inStats.report_pack_store~field:!find_location;value_optletunsafe_find_no_prefetchtkey=letkey_of_offset?volume_identifier:__=Pack_key.v_offsetinfind_in_pack_file~key_of_offsettkeyletfindtk=letv=unsafe_find~check_integrity:truetkinLwt.returnvletintegrity_check~offset~lengthhasht=letk=Pack_key.v_direct~offset~lengthhashin(* TODO: new error for reading gced objects. *)matchfind_in_pack_file~key_of_offsettkwith|exceptionErrors.Pack_error(`Invalid_prefix_read_)->Error`Absent_value|exceptionInvalid_read_->Error`Absent_value|None->Error`Wrong_hash|Somevalue->(matchcheck_hashhashvaluewith|Ok()->Ok()|Error_->Error`Wrong_hash)letcastt=(t:>read_writet)(** [batch] is required by the [Backend] signature of irmin core, but
irmin-pack is really meant to be used using the [batch] of the repo (in
[ext.ml]). The following batch exists only for compatibility, but it is
very tempting to replace the implementation by an [assert false]. *)letbatchtf=[%log.warn"[pack] calling batch directory on a store is not recommended. Use \
repo.batch instead."];leton_successres=Fm.flusht.fm|>Errs.raise_if_error;Lwt.returnresinleton_failexn=[%log.info"[pack] batch failed. calling flush. (%s)"(Printexc.to_stringexn)];let()=matchFm.flusht.fmwith|Ok()->()|Errorerr->[%log.err"[pack] batch failed and flush failed. Silencing flush fail. (%a)"(Irmin.Type.ppErrs.t)err]inraiseexninLwt.try_bind(fun()->f(castt))on_successon_failletunsafe_append~ensure_unique~overcommitthashv=letkind=Val.kindvinletuse_index=(* the index is required for non-minimal indexing strategies and
for commits. *)(not(Irmin_pack.Indexing_strategy.is_minimalt.indexing_strategy))||kind=Commit_v1||kind=Commit_v2inletunguarded_append()=letoffset_of_keyk=matchPack_key.inspectkwith|Direct{offset;_}->Stats.incr_appended_offsets();Someoffset|Indexedhash->((* TODO: Why don't we promote the key here? *)matchIndex.find(Fm.indext.fm)hashwith|None->Stats.incr_appended_hashes();None|Some(offset,_,_)->Stats.incr_appended_offsets();Someoffset)inletdict=Dict.indext.dictinletoff=Dispatcher.end_offsett.dispatcherin(* [encode_bin] will most likely call [append] several time. One of these
call may trigger an auto flush. *)letappend=Suffix.append_exn(Fm.suffixt.fm)inVal.encode_bin~offset_of_key~dicthashvappend;letopenInt63.Syntaxinletlen=Int63.to_int(Dispatcher.end_offsett.dispatcher-off)inletkey=Pack_key.v_direct~offset:off~length:lenhashinlet()=letshould_index=t.indexing_strategy~value_length:lenkindinifshould_indexthenIndex.add~overcommit(Fm.indext.fm)hash(off,len,kind)inTbl.addt.staginghashv;Lru.addt.lruoffv;[%log.debug"[pack] append %a"pp_keykey];keyinmatchensure_unique&&use_indexwith|false->unguarded_append()|true->letkey=Pack_key.v_indexedhashinifunsafe_memtkeythenkeyelseunguarded_append()letunsafe_addthashv=unsafe_append~ensure_unique:true~overcommit:falsethashv|>Lwt.returnletaddtv=unsafe_addt(Val.hashv)v(** This close is a noop.
Closing the file manager would be inadequate because it is passed to [v].
The caller should close the file manager.
We could clear the caches here but that really is not necessary. *)letclose_=Lwt.return()letpurge_lrut=Lru.cleart.lruendmoduleMake(Fm:File_manager.S)(Dict:Dict.S)(Dispatcher:Dispatcher.SwithmoduleFm=Fm)(Hash:Irmin.Hash.Swithtypet=Fm.Index.key)(Val:Pack_value.Persistentwithtypehash:=Hash.tandtypekey:=Hash.tPack_key.t)(Errs:Io_errors.SwithmoduleIo=Fm.Io)=structmoduleInner=Make_without_close_checks(Fm)(Dict)(Dispatcher)(Hash)(Val)(Errs)includeInnerincludeIndexable.Closeable(Inner)letv~config~fm~dict~dispatcher~lru=Inner.v~config~fm~dict~dispatcher~lru|>make_closeableletcastt=Inner.cast(get_if_open_exnt)|>make_closeableletintegrity_check~offset~lengthkt=Inner.integrity_check~offset~lengthk(get_if_open_exnt)moduleEntry_prefix=Inner.Entry_prefixletread_and_decode_entry_prefix~offdispatcher=Inner.read_and_decode_entry_prefix~offdispatcherletindex_direct_with_kindt=Inner.index_direct_with_kind(get_if_open_exnt)letpurge_lrut=Inner.purge_lru(get_if_open_exnt)letkey_of_offsettoffset=Inner.key_of_offset(get_if_open_exnt)offsetletunsafe_find_no_prefetchtkey=Inner.unsafe_find_no_prefetch(get_if_open_exnt)keyletget_offsettkey=Inner.get_offset(get_if_open_exnt)keyletget_lengthtkey=Inner.get_length(get_if_open_exnt)keyend