Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file layered_store.ml

* Copyright (c) 2018-2021 Tarides <contact@tarides.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)open!ImportmoduletypeS=Irmin_pack.Pack_store.Sletstats=function|"Contents"->Irmin_layers.Stats.copy_contents()|"Node"->Irmin_layers.Stats.copy_nodes()|"Commit"->Irmin_layers.Stats.copy_commits()|_->failwith"unexpected type in stats"moduleCopy(Key:Irmin.Hash.S)(SRC:Irmin_pack.Content_addressable.Swithtypekey:=Key.t)(DST:Irmin_pack.Content_addressable.Swithtypekey:=Key.tandtypevalue=SRC.value)=structletignore_lwt_=Lwt.return_unitletcopy~src~dststrk=Log.debug(funl->l"copy %s %a"str(Irmin.Type.ppKey.t)k);matchSRC.unsafe_find~check_integrity:falsesrckwith|None->Log.warn(funl->l"Attempt to copy %s %a not contained in upper."str(Irmin.Type.ppKey.t)k)|Somev->statsstr;DST.unsafe_append~ensure_unique:false~overcommit:truedstkvletcheck~src?(some=ignore_lwt)?(none=ignore_lwt)k=SRC.findsrck>>=functionNone->none()|Somev->somevendletpp_during_freezeppf=function|true->Fmt.stringppf" during freeze"|false->()letpp_layer_id=Irmin_layers.Layer_id.ppletpp_current_upperppft=pp_layer_idppf(iftthen`Upper1else`Upper0)letpp_next_upperppft=pp_layer_idppf(iftthen`Upper0else`Upper1)moduleContent_addressable(H:Irmin.Hash.S)(Index:Irmin_pack.Index.S)(U:Swithtypeindex:=Index.tandtypekey=H.t)(L:Swithtypeindex:=Index.tandtypekey=U.keyandtypevalue=U.value)=structtypeindex=Index.ttypekey=U.keytypevalue=U.valuetype'at={lower:readL.toption;mutableflip:bool;uppers:readU.t*readU.t;freeze_in_progress:unit->bool;mutablenewies:keylist;}moduleU=UmoduleL=Lletvupper1upper0lower~flip~freeze_in_progress=Log.debug(funl->l"v flip = %b"flip);{lower;flip;uppers=(upper1,upper0);freeze_in_progress;newies=[]}letnext_uppert=ift.flipthensndt.upperselsefstt.uppersletcurrent_uppert=ift.flipthenfstt.upperselsesndt.uppersletlowert=Option.gett.lowerletpp_current_upperppft=pp_current_upperppft.flipletpp_next_upperppft=pp_next_upperppft.flipletmem_lowertk=matcht.lowerwithNone->Lwt.returnfalse|Somelower->L.memlowerkletmem_nexttk=U.mem(next_uppert)kletconsume_newiest=letnewies=t.newiesint.newies<-[];newiesletaddtv=letfreeze=t.freeze_in_progress()inLog.debug(funl->l"add in %a%a"pp_current_uppertpp_during_freezefreeze);Irmin_layers.Stats.add();letupper=current_uppertinU.addupperv>|=funk->iffreezethent.newies<-k::t.newies;kletunsafe_addtkv=letfreeze=t.freeze_in_progress()inLog.debug(funl->l"unsafe_add in %a%a"pp_current_uppertpp_during_freezefreeze);Irmin_layers.Stats.add();letupper=current_uppertinU.unsafe_addupperkv>|=fun()->iffreezethent.newies<-k::t.newiesletunsafe_append~ensure_unique~overcommittkv=letfreeze=t.freeze_in_progress()inLog.debug(funl->l"unsafe_append in %a%a"pp_current_uppertpp_during_freezefreeze);Irmin_layers.Stats.add();letupper=current_uppertinU.unsafe_append~ensure_unique~overcommitupperkv;iffreezethent.newies<-k::t.newies(** Everything is in current upper, no need to look in next upper. *)letfindtk=letcurrent=current_uppertinLog.debug(funl->l"find in %a"pp_current_uppert);U.findcurrentk>>=function|Somev->Lwt.return_somev|None->(matcht.lowerwith|None->Lwt.return_none|Somelower->Log.debug(funl->l"find in lower");L.findlowerk)letunsafe_find~check_integritytk=letcurrent=current_uppertinLog.debug(funl->l"unsafe_find in %a"pp_current_uppert);matchU.unsafe_find~check_integritycurrentkwith|Somev->Somev|None->(matcht.lowerwith|None->None|Somelower->Log.debug(funl->l"unsafe_find in lower");L.unsafe_find~check_integritylowerk)letmemtk=letcurrent=current_uppertinU.memcurrentk>>=function|true->Lwt.return_true|false->(matcht.lowerwith|None->Lwt.return_false|Somelower->L.memlowerk)letunsafe_memtk=letcurrent=current_uppertinU.unsafe_memcurrentk||matcht.lowerwithNone->false|Somelower->L.unsafe_memlowerk(** Only flush current upper, to prevent concurrent flushing and appends
during copy. Next upper and lower are flushed at the end of a freeze. *)letflush?index?index_merget=letcurrent=current_uppertinU.flush?index?index_mergecurrentletflush_next_lowert=letnext=next_uppertinU.flush~index_merge:truenext;matcht.lowerwithNone->()|Somex->L.flush~index_merge:truexletcastt=(t:>read_writet)letbatchtf=f(castt)>|=funr->flush~index:truet;r(** If the generation changed, then the upper changed too. TODO: This
assumption is ok for now, but does not hold if:
- the RW store is opened after the RO,
- if RW is closed in the meantime,
- if the RW freezes an even number of times before an RO sync.
See https://github.com/mirage/irmin/issues/1225 *)letsync?on_generation_change?on_generation_change_next_uppert=Log.debug(funl->l"sync %a"pp_current_uppert);(* a first implementation where only the current upper is synced *)letcurrent=current_uppertinletformer_generation=U.generationcurrentinU.sync?on_generation_changecurrent;letgeneration=U.generationcurrentinifformer_generation<>generationthen(Log.debug(funl->l"generation change, RO updates upper");t.flip<-nott.flip;letcurrent=current_uppertinU.sync?on_generation_change:on_generation_change_next_uppercurrent;matcht.lowerwithNone->()|Somex->L.sync?on_generation_changex);t.flipletupdate_flip~flipt=t.flip<-flipletcloset=U.close(fstt.uppers)>>=fun()->U.close(sndt.uppers)>>=fun()->matcht.lowerwithNone->Lwt.return_unit|Somex->L.closexletintegrity_check~offset~length~layerkt=matchlayerwith|`Upper1->U.integrity_check~offset~lengthk(fstt.uppers)|`Upper0->U.integrity_check~offset~lengthk(sndt.uppers)|`Lower->L.integrity_check~offset~lengthk(lowert)letlayer_idtk=letcurrent,upper=ift.flipthen(fstt.uppers,`Upper1)else(sndt.uppers,`Upper0)inU.memcurrentk>>=function|true->Lwt.returnupper|false->(matcht.lowerwith|None->raiseNot_found|Somelower->(L.memlowerk>|=function|true->`Lower|false->raiseNot_found))letcleart=U.clear(fstt.uppers)>>=fun()->U.clear(sndt.uppers)>>=fun()->matcht.lowerwithNone->Lwt.return_unit|Somex->L.clearxletclear_keep_generationt=U.clear_keep_generation(fstt.uppers)>>=fun()->U.clear_keep_generation(sndt.uppers)>>=fun()->matcht.lowerwith|None->Lwt.return_unit|Somex->L.clear_keep_generationxletclear_cachest=letcurrent=current_uppertinU.clear_cachescurrentletclear_caches_next_uppert=letnext=next_uppertinU.clear_cachesnext(** After clearing the previous upper, we also needs to flush current upper to
disk, otherwise values are not found by the RO. *)letclear_previous_upper?keep_generationt=letprevious=next_uppertinletcurrent=current_uppertinU.flushcurrent;matchkeep_generationwith|Some()->U.clear_keep_generationprevious|None->U.clearpreviousletversiont=U.version(fstt.uppers)letgenerationt=letcurrent=current_uppertinU.generationcurrentletoffsett=letcurrent=current_uppertinU.offsetcurrentletflip_uppert=Log.debug(funl->l"flip_upper to %a"pp_next_uppert);t.flip<-nott.flipmoduleCopyUpper=Copy(H)(U)(U)moduleCopyLower=Copy(H)(U)(L)type'alayer_type=|Upper:readU.tlayer_type|Lower:readL.tlayer_typeletcopy_to_lowert~dststrk=CopyLower.copy~src:(current_uppert)~dststrkletcopy_to_nextt~dststrk=CopyUpper.copy~src:(current_uppert)~dststrkletcheckt?none?somek=CopyUpper.check~src:(current_uppert)?none?somekletcopy:typel.llayer_type*l->readt->string->key->unit=fun(ltype,dst)->matchltypewithLower->copy_to_lower~dst|Upper->copy_to_next~dst(** The object [k] can be in either lower or upper. If already in upper then
do not copy it. *)letcopy_from_lowert~dst?(aux=fun_->Lwt.return_unit)strk=(* FIXME(samoht): why does this function need to be different from the previous one? *)letlower=lowertinletcurrent=current_uppertinU.findcurrentk>>=function|Somev->auxv|None->(L.findlowerk>>=function|Somev->auxv>>=fun()->statsstr;U.unsafe_adddstkv|None->Fmt.failwith"%s %a not found"str(Irmin.Type.ppH.t)k)endmodulePack_maker(H:Irmin.Hash.S)(Index:Irmin_pack.Index.S)(P:Irmin_pack.Pack_store.Makerwithtypekey=H.tandtypeindex:=Index.t)=structtypeindex=Index.ttypekey=P.keymoduleMake(V:Irmin_pack.Pack_value.Swithtypehash:=key)=structmoduleUpper=P.Make(V)includeContent_addressable(H)(Index)(Upper)(Upper)endendmoduleAtomic_write(K:Irmin.Branch.S)(U:Irmin_pack.Atomic_write.Persistentwithtypekey=K.t)(L:Irmin_pack.Atomic_write.Persistentwithtypekey=U.keyandtypevalue=U.value)=structtypekey=U.keytypevalue=U.valuemoduleU=UmoduleL=Ltypet={lower:L.toption;mutableflip:bool;uppers:U.t*U.t;freeze_in_progress:unit->bool;mutablenewies:(key*valueoption)list;}letcurrent_uppert=ift.flipthenfstt.upperselsesndt.uppersletnext_uppert=ift.flipthensndt.upperselsefstt.uppersletpp_current_upperppft=pp_current_upperppft.flipletpp_next_upperppft=pp_next_upperppft.flipletpp_branch=Irmin.Type.ppK.tletmemtk=letcurrent=current_uppertinLog.debug(funl->l"[branches] mem %a in %a"pp_branchkpp_current_uppert);U.memcurrentk>>=function|true->Lwt.return_true|false->(matcht.lowerwith|None->Lwt.return_false|Somelower->Log.debug(funl->l"[branches] mem in lower");L.memlowerk)letfindtk=letcurrent=current_uppertinLog.debug(funl->l"[branches] find in %a"pp_current_uppert);U.findcurrentk>>=function|Somev->Lwt.return_somev|None->(matcht.lowerwith|None->Lwt.return_none|Somelower->Log.debug(funl->l"[branches] find in lower");L.findlowerk)letsettkv=letfreeze=t.freeze_in_progress()inLog.debug(funl->l"[branches] set %a in %a%a"pp_branchkpp_current_uppertpp_during_freezefreeze);letupper=current_uppertinU.setupperkv>|=fun()->iffreezethent.newies<-(k,Somev)::t.newies(** Copy back into upper the branch against we want to do test and set. *)lettest_and_settk~test~set=letfreeze=t.freeze_in_progress()inLog.debug(funl->l"[branches] test_and_set %a in %a%a"pp_branchkpp_current_uppertpp_during_freezefreeze);letcurrent=current_uppertinletfind_in_lower()=(matcht.lowerwith|None->Lwt.return_none|Somelower->L.findlowerk)>>=function|None->U.test_and_setcurrentk~test:None~set|Somev->U.setcurrentkv>>=fun()->U.test_and_setcurrentk~test~setin(U.memcurrentk>>=function|true->U.test_and_setcurrentk~test~set|false->find_in_lower())>|=funupdate->ifupdate&&freezethent.newies<-(k,set)::t.newies;updateletremovetk=letfreeze=t.freeze_in_progress()inLog.debug(funl->l"[branches] remove %a in %a%a"pp_branchkpp_current_uppertpp_during_freezefreeze);U.remove(fstt.uppers)k>>=fun()->U.remove(sndt.uppers)k>>=fun()->iffreezethent.newies<-(k,None)::t.newies;matcht.lowerwith|None->Lwt.return_unit|Somelower->L.removelowerkletlistt=letcurrent=current_uppertinU.listcurrent>>=funupper->(matcht.lowerwithNone->Lwt.return_nil|Somelower->L.listlower)>|=funlower->List.fold_left(funaccb->ifList.membaccthenaccelseb::acc)loweruppertypewatch=U.watchletwatcht=U.watch(current_uppert)letwatch_keyt=U.watch_key(current_uppert)letunwatcht=U.unwatch(current_uppert)letcloset=U.close(fstt.uppers)>>=fun()->U.close(sndt.uppers)>>=fun()->matcht.lowerwithNone->Lwt.return_unit|Somex->L.closexletvupper1upper0lower~flip~freeze_in_progress={lower;flip;uppers=(upper1,upper0);freeze_in_progress;newies=[]}letcleart=U.clear(fstt.uppers)>>=fun()->U.clear(sndt.uppers)>>=fun()->matcht.lowerwithNone->Lwt.return_unit|Somex->L.clearxletflusht=letcurrent=current_uppertinU.flushcurrent(** Do not copy branches that point to commits not copied. *)letcopy~mem_commit_lower~mem_commit_uppert=letnext=next_uppertinletcurrent=current_uppertinU.listcurrent>>=funbranches->Lwt_list.iter_p(funbranch->U.findcurrentbranch>>=function|None->Lwt.fail_with"branch not found in current upper"|Somehash->((matcht.lowerwith|None->Lwt.return_unit|Somelower->(mem_commit_lowerhash>>=function|true->Log.debug(funl->l"[branches] copy to lower %a"(Irmin.Type.ppK.t)branch);Irmin_layers.Stats.copy_branches();L.setlowerbranchhash|false->Lwt.return_unit))>>=fun()->mem_commit_upperhash>>=function|true->Log.debug(funl->l"[branches] copy to next %a"(Irmin.Type.ppK.t)branch);Irmin_layers.Stats.copy_branches();U.setnextbranchhash|false->Log.debug(funl->l"branch %a not copied"(Irmin.Type.ppK.t)branch);Lwt.return_unit))branchesletflip_uppert=Log.debug(funl->l"[branches] flip to %a"pp_next_uppert);t.flip<-nott.flip(** After clearing the previous upper, we also needs to flush current upper to
disk, otherwise values are not found by the RO. *)letclear_previous_upper?keep_generationt=letcurrent=current_uppertinletprevious=next_uppertinU.flushcurrent;matchkeep_generationwith|Some()->U.clear_keep_generationprevious|None->U.clearpreviousletflush_next_lowert=letnext=next_uppertinU.flushnext;matcht.lowerwithNone->()|Somex->L.flushxletcopy_newies_to_next_uppert=Log.debug(funl->l"[branches] copy %d newies to %a"(List.lengtht.newies)pp_next_uppert);letnext=next_uppertinletnewies=t.newiesint.newies<-[];Lwt_list.iter_s(fun(k,v)->matchvwithNone->U.removenextk|Somev->U.setnextkv)(List.revnewies)(** RO syncs the branch store at every find call, but it still needs to update
the upper in use.*)letupdate_flip~flipt=t.flip<-flipletclear_keep_generationt=U.clear_keep_generation(fstt.uppers)>>=fun()->U.clear_keep_generation(sndt.uppers)>>=fun()->matcht.lowerwith|None->Lwt.return_unit|Somex->L.clear_keep_generationxend