Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file distributed_wrapper.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229(*****************************************************************************)(* *)(* MIT License *)(* Copyright (c) 2022 Nomadic Labs <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)(** Wrapper for the Distributed library.
The new API is designed for the distribution of protocols based on a
1 master N workers architecture, in which at each step of the protocol:
{ul
{li
The master asks all the workers to compute the same function over
different inputs. We call these messages {e requests}. Each protocol
step will define a [request] (see {Message} module), whose payload
will change for every worker.}
{li
Each worker computes the {e reply} to the received request, and sends
it back to the master. Each protocol step will define a [reply]
(see {Message} module).
}
{li
The master waits to receive all the replies from the workers.
}
}
Instead of directly using the [send] and [receive], the new API provides
two abstractions, `dmap` and `handle_request`, to implement this
interaction from the master and worker respectively. These abstractions
will enforce via the type-system that the [request] and [reply] used
correspond to the same protocol step.
*)moduleLogger=structletlog_src=Logs.Src.create"distributed"~doc:"logs events related to the distributed library"moduleLog=(valLogs_lwt.src_loglog_src:Logs_lwt.LOG)letmsg=Log.msg(* slightly modified version of reporter defined in Logs_lwt manual : http://erratique.ch/software/logs/doc/Logs_lwt.html#report_ex*)letlwt_reporter()=letbuf_fmt()=letb=Buffer.create512in(Format.formatter_of_bufferb,fun()->letm=Buffer.contentsbinBuffer.resetb;m)inletapp,app_flush=buf_fmt()inletreporter=Logs.format_reporter~app~dst:app()inletreportsrclevel~overkmsgf=letk'()=letwrite()=Lwt_io.writeLwt_io.stdout(app_flush())inletunblock()=over();Lwt.return_unitinLwt.finalizewriteunblock|>ignore;k()inreporter.Logs.reportsrclevel~over:(fun()->())k'msgfin{Logs.report}end(**
Messages are refined into either a [request] or a [reply].
Both these types are parameterized by their protocol [step].
The ground type of messages ([t]) is enforced to be [bytes],
to avoid relying on the Marshalling performed by the Distributed library.
*)moduletypeEnriched_message_type=sigincludeDistributed.Message_typewithtypet=bytestype'asteptype'steprequesttype'stepreplyvalrequest_step:'steprequest->'stepstepvalof_request:'arequest->tvalof_reply:'areply->tvalto_request:'stepstep->t->'steprequestoptionvalto_reply:'stepstep->t->'stepreplyoptionvalindex:t->intendmoduletypeEnriched_process=sigincludeDistributed.ProcessmoduleM:Enriched_message_type(** Additional monadic interface *)val(let*):'at->('a->'bt)->'btval(let+):'at->('a->'b)->'btvalmapM:('a->'bt)->'alist->'blistt(** [dmap ~pids ~request ~reply l] sends requests built by applying
[request] to the elements of [l] to the workers [pids] and waits
to receive a valid [reply] from each worker.
*)valdmap:pids:Distributed.Process_id.tlist->request:('a->index:int->'stepM.request)->reply:('stepM.reply->(unit->'bt)option)->'alist->'blistt(** [handle_request master_pid ~setp ~handler l] waits to receive a request
for a given [step], process it through [handler] and sends the reply to
[master_pid].
The [handler] might also return some additional data ('b) that isn't
meant to be sent back to the master, but rather kept by the worker for
future computation.
*)valhandle_request:Distributed.Process_id.t->step:'stepM.step->handler:('stepM.request->(unit->('stepM.reply*'b)t)option)->'btendmoduleMake(A:Enriched_message_type):Enriched_processwithtypemessage_type=A.tandtypeM.t=A.tandtype'aM.step='aA.stepandtype'aM.request='aA.requestandtype'aM.reply='aA.replyandtype'aio='aLwt.t=structincludeDistributed_lwt.Make(A)(Logger)moduleM=Alet(let*)=(>>=)let(let+):'at->('a->'b)->'bt=funmf->m>>=funx->return(fx)letmapM:('a->'bt)->'alist->'blistt=funf->letrecgoacc=function|[]->return(List.revacc)|x::xs->let*y=fxingo(y::acc)xsingo[]letdmap~pids~request~replyl=letmoduleIMap=Map.Make(Int)inletexpected=List.lengthpidsinletreplies=refIMap.emptyinmapM(fun((index,pid),x)->let*()=sendpid(M.of_request@@requestx~index)inreturn(M.request_step @@requestx~index))List.(combine(combine(init(lengthpids)Fun.id)pids)l)>>=funsteps->letstep=List.hdstepsinreceive_loop(case(functionm->Option.bind(M.to_replystepm)@@funr->Option.map(funf()->lift_io@@Lwt_io.printlf"got message %s from remote node\n"@@M.string_of_messagem>>=fun()->let*()=lift_io@@Lwt_io.flush_all()inf()>>=funy->replies:=IMap.add(M.indexm)y!replies;return(IMap.cardinal!replies<expected))(replyr)))>>=fun_->return(List.mapsnd@@IMap.bindings!replies)lethandle_request:Distributed.Process_id.t->step:'stepM.step->handler:('stepM.request->(unit->('stepM.reply*'b)t)option)->'bt=funpid~step~handler->let*x=receive(case(funm->Option.map(funf()->let*()=lift_io@@Lwt_io.printlf"got message %s from remote node\n"@@M.string_of_messageminf())@@Option.bind(M.to_requeststepm)handler))inletrepl,v=Option.getxinlet*()=sendpid(M.of_replyrepl)inreturn vend