package octez-libs

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file worker.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com>     *)
(* Copyright (c) 2018-2021 Nomadic Labs, <contact@nomadic-labs.com>          *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

(** An error returned when trying to communicate with a worker that
    has been closed.*)
type worker_name = {base : string; name : string}

module type T = sig
  module Name : Worker_intf.NAME

  module Request : Worker_intf.REQUEST

  module Types : Worker_intf.TYPES

  (** A handle to a specific worker, parameterized by the type of
      internal message buffer. *)
  type 'kind t

  (** A handle to a table of workers. *)
  type 'kind table

  (** Internal buffer kinds used as parameters to {!t}. *)
  type 'a queue

  and bounded

  and infinite

  type dropbox

  type 'a message_error =
    | Closed of error list option
    | Request_error of 'a
    | Any of exn

  (** Supported kinds of internal buffers. *)
  type _ buffer_kind =
    | Queue : infinite queue buffer_kind
    | Bounded : {size : int} -> bounded queue buffer_kind
    | Dropbox : {
        merge :
          dropbox t -> any_request -> any_request option -> any_request option;
      }
        -> dropbox buffer_kind

  and any_request = Any_request : _ Request.t -> any_request

  (** Create a table of workers. *)
  val create_table : 'kind buffer_kind -> 'kind table

  (** The callback handlers specific to each worker instance. *)
  module type HANDLERS = sig
    (** Placeholder replaced with {!t} with the right parameters
        provided by the type of buffer chosen at {!launch}.*)
    type self

    type launch_error

    (** Builds the initial internal state of a worker at launch.
        It is possible to initialize the message queue.
        Of course calling {!state} will fail at that point. *)
    val on_launch :
      self ->
      Name.t ->
      Types.parameters ->
      (Types.state, launch_error) result Lwt.t

    (** The main request processor, i.e. the body of the event loop. *)
    val on_request :
      self ->
      ('a, 'request_error) Request.t ->
      ('a, 'request_error) result Lwt.t

    (** Called when no request has been made before the timeout, if
        the parameter has been passed to {!launch}. *)
    val on_no_request : self -> unit Lwt.t

    (** A function called when terminating a worker. *)
    val on_close : self -> unit Lwt.t

    (** A function called at the end of the worker loop in case of an
        abnormal error. This function can handle the error by
        returning [Ok ()], or leave the default unexpected error
        behaviour by returning its parameter. A possibility is to
        handle the error for ad-hoc logging, and still use
        {!trigger_shutdown} to kill the worker. *)
    val on_error :
      self ->
      Worker_types.request_status ->
      ('a, 'request_error) Request.t ->
      'request_error ->
      unit tzresult Lwt.t

    (** A function called at the end of the worker loop in case of a
        successful treatment of the current request. *)
    val on_completion :
      self ->
      ('a, 'request_error) Request.t ->
      'a ->
      Worker_types.request_status ->
      unit Lwt.t
  end

  (** Creates a new worker instance.
      Parameter [queue_size] not passed means unlimited queue. *)
  val launch :
    'kind table ->
    ?timeout:Time.System.Span.t ->
    Name.t ->
    Types.parameters ->
    (module HANDLERS
       with type self = 'kind t
        and type launch_error = 'launch_error) ->
    ('kind t, 'launch_error) result Lwt.t

  (** Triggers a worker termination and waits for its completion.
      Cannot be called from within the handlers.  *)
  val shutdown : _ t -> unit Lwt.t

  module type BOX = sig
    type t

    val put_request : t -> ('a, 'request_error) Request.t -> unit

    val put_request_and_wait :
      t ->
      ('a, 'request_error) Request.t ->
      ('a, 'request_error message_error) result Lwt.t
  end

  module type QUEUE = sig
    type 'a t

    val push_request_and_wait :
      'q t ->
      ('a, 'request_error) Request.t ->
      ('a, 'request_error message_error) result Lwt.t

    val push_request : 'q t -> ('a, 'request_error) Request.t -> bool Lwt.t

    val pending_requests : 'a t -> (Time.System.t * Request.view) list

    val pending_requests_length : 'a t -> int
  end

  module Dropbox : sig
    include BOX with type t := dropbox t
  end

  module Queue : sig
    include QUEUE with type 'a t := 'a queue t

    (** Adds a message to the queue immediately. *)
    val push_request_now :
      infinite queue t -> ('a, 'request_error) Request.t -> unit
  end

  (** Exports the canceler to allow cancellation of other tasks when this
      worker is shut down or when it dies. *)
  val canceler : _ t -> Lwt_canceler.t

  (** Triggers a worker termination. *)
  val trigger_shutdown : _ t -> unit

  (** Access the internal state, once initialized. *)
  val state : _ t -> Types.state

  val with_state :
    _ t -> (Types.state -> (unit, 'b) result Lwt.t) -> (unit, 'b) result Lwt.t

  (** Introspect the message queue, gives the times requests were pushed. *)
  val pending_requests : _ queue t -> (Time.System.t * Request.view) list

  (** Get the running status of a worker. *)
  val status : _ t -> Worker_types.worker_status

  (** Get the request being treated by a worker.
      Gives the time the request was pushed, and the time its
      treatment started. *)
  val current_request :
    _ t -> (Time.System.t * Time.System.t * Request.view) option

  val information : _ t -> Worker_types.worker_information

  (** Lists the running workers in this group. *)
  val list : 'a table -> (Name.t * 'a t) list

  (** [find_opt table n] is [Some worker] if the [worker] is in the [table] and
      has name [n]. *)
  val find_opt : 'a table -> Name.t -> 'a t option
end

module Make_internal
    (Name : Worker_intf.NAME)
    (Request : Worker_intf.REQUEST)
    (Types : Worker_intf.TYPES)
    (Worker_events : Worker_events.S
                       with type view = Request.view
                        and type critical_error = tztrace) =
struct
  module Name = Name
  module Request = Request
  module Types = Types

  module Nametbl = Hashtbl.MakeSeeded (struct
    type t = Name.t

    (* See [src/lib_base/tzPervasives.ml] for an explanation *)
    [@@@ocaml.warning "-32"]

    let hash = Hashtbl.seeded_hash

    let seeded_hash = Hashtbl.seeded_hash

    [@@@ocaml.warning "+32"]

    let equal = Name.equal
  end)

  let base_name = String.concat "-" Name.base

  type 'a message_error =
    | Closed of error list option
    | Request_error of 'a
    | Any of exn

  type message =
    | Message :
        ('a, 'b) Request.t * ('a, 'b message_error) result Lwt.u option
        -> message

  type 'a queue

  and bounded

  and infinite

  type dropbox

  type _ buffer_kind =
    | Queue : infinite queue buffer_kind
    | Bounded : {size : int} -> bounded queue buffer_kind
    | Dropbox : {
        merge :
          dropbox t -> any_request -> any_request option -> any_request option;
      }
        -> dropbox buffer_kind

  and any_request = Any_request : _ Request.t -> any_request

  and _ buffer =
    | Queue_buffer :
        (Time.System.t * message) Lwt_pipe.Unbounded.t
        -> infinite queue buffer
    | Bounded_buffer :
        (Time.System.t * message) Lwt_pipe.Bounded.t
        -> bounded queue buffer
    | Dropbox_buffer : (Time.System.t * message) Lwt_dropbox.t -> dropbox buffer

  and 'kind t = {
    timeout : Time.System.Span.t option;
    parameters : Types.parameters;
    mutable (* only for init *) worker : unit Lwt.t;
    mutable (* only for init *) state : Types.state option;
    buffer : 'kind buffer;
    canceler : Lwt_canceler.t;
    name : Name.t;
    id : int;
    mutable status : Worker_types.worker_status;
    mutable current_request :
      (Time.System.t * Time.System.t * Request.view) option;
    table : 'kind table;
  }

  and 'kind table = {
    buffer_kind : 'kind buffer_kind;
    mutable last_id : int;
    instances : 'kind t Nametbl.t;
  }

  let extract_status_errors w =
    match w.status with
    | Worker_types.Launching _ | Running _ | Closing _ -> None
    | Closed (_, _, errs) -> errs

  let queue_item ?u r = (Time.System.now (), Message (r, u))

  let drop_request w merge message_box request =
    try
      match
        match Lwt_dropbox.peek message_box with
        | None -> merge w (Any_request request) None
        | Some (_, Message (old, _)) ->
            Lwt.ignore_result (Lwt_dropbox.take message_box) ;
            merge w (Any_request request) (Some (Any_request old))
      with
      | None -> ()
      | Some (Any_request neu) ->
          Lwt_dropbox.put message_box (Time.System.now (), Message (neu, None))
    with Lwt_dropbox.Closed -> ()

  let drop_request_and_wait w message_box request =
    let t, u = Lwt.wait () in
    Lwt.catch
      (fun () ->
        Lwt_dropbox.put message_box (queue_item ~u request) ;
        t)
      (function
        | Lwt_dropbox.Closed ->
            Lwt.return_error (Closed (extract_status_errors w))
        | exn ->
            (* [Lwt_dropbox.put] can only raise [Closed] which is caught above. *)
            Lwt.return_error (Any exn))

  module type BOX = sig
    type t

    val put_request : t -> ('a, 'request_error) Request.t -> unit

    val put_request_and_wait :
      t ->
      ('a, 'request_error) Request.t ->
      ('a, 'request_error message_error) result Lwt.t
  end

  module type QUEUE = sig
    type 'a t

    val push_request_and_wait :
      'q t ->
      ('a, 'request_error) Request.t ->
      ('a, 'request_error message_error) result Lwt.t

    val push_request : 'q t -> ('a, 'request_error) Request.t -> bool Lwt.t

    val pending_requests : 'a t -> (Time.System.t * Request.view) list

    val pending_requests_length : 'a t -> int
  end

  module Dropbox = struct
    let put_request (w : dropbox t) request =
      let (Dropbox {merge}) = w.table.buffer_kind in
      let (Dropbox_buffer message_box) = w.buffer in
      drop_request w merge message_box request

    let put_request_and_wait (w : dropbox t) request =
      let (Dropbox_buffer message_box) = w.buffer in
      drop_request_and_wait w message_box request
  end

  module Queue = struct
    let push_request (type a) (w : a queue t) request =
      match w.buffer with
      | Queue_buffer message_queue ->
          if Lwt_pipe.Unbounded.is_closed message_queue then Lwt.return_false
          else (
            Lwt_pipe.Unbounded.push message_queue (queue_item request) ;
            (* because pushing on an unbounded pipe is immediate, we return within
               Lwt explicitly for compatibility with the other case *)
            Lwt.return_true)
      | Bounded_buffer message_queue ->
          if Lwt_pipe.Bounded.is_closed message_queue then Lwt.return_false
          else
            let open Lwt_syntax in
            let* () =
              Lwt_pipe.Bounded.push message_queue (queue_item request)
            in
            Lwt.return_true

    let push_request_now (w : infinite queue t) request =
      let (Queue_buffer message_queue) = w.buffer in
      if Lwt_pipe.Unbounded.is_closed message_queue then ()
      else Lwt_pipe.Unbounded.push message_queue (queue_item request)

    let push_request_and_wait (type a) (w : a queue t) request =
      match w.buffer with
      | Queue_buffer message_queue -> (
          try
            let t, u = Lwt.wait () in
            Lwt_pipe.Unbounded.push message_queue (queue_item ~u request) ;
            t
          with Lwt_pipe.Closed ->
            Lwt.return_error (Closed (extract_status_errors w)))
      | Bounded_buffer message_queue ->
          let t, u = Lwt.wait () in
          Lwt.try_bind
            (fun () ->
              Lwt_pipe.Bounded.push message_queue (queue_item ~u request))
            (fun () -> t)
            (function
              | Lwt_pipe.Closed ->
                  Lwt.return_error (Closed (extract_status_errors w))
              | exn -> Lwt.return_error (Any exn))

    let pending_requests (type a) (w : a queue t) =
      let peeked =
        try
          match w.buffer with
          | Queue_buffer message_queue ->
              Lwt_pipe.Unbounded.peek_all_now message_queue
          | Bounded_buffer message_queue ->
              Lwt_pipe.Bounded.peek_all_now message_queue
        with Lwt_pipe.Closed -> []
      in
      List.map (function t, Message (req, _) -> (t, Request.view req)) peeked

    let pending_requests_length (type a) (w : a queue t) =
      let pipe_length (type a) (q : a buffer) =
        match q with
        | Queue_buffer queue -> Lwt_pipe.Unbounded.length queue
        | Bounded_buffer queue -> Lwt_pipe.Bounded.length queue
        | Dropbox_buffer _ -> 1
      in
      pipe_length w.buffer
  end

  let close (type a) (w : a t) =
    let wakeup = function
      | _, Message (_, Some u) ->
          Lwt.wakeup_later u (Error (Closed (extract_status_errors w)))
      | _, Message (_, None) -> ()
    in
    let close_queue message_queue =
      let messages = Lwt_pipe.Bounded.pop_all_now message_queue in
      List.iter wakeup messages ;
      Lwt_pipe.Bounded.close message_queue
    in
    let close_unbounded_queue message_queue =
      let messages = Lwt_pipe.Unbounded.pop_all_now message_queue in
      List.iter wakeup messages ;
      Lwt_pipe.Unbounded.close message_queue
    in
    match w.buffer with
    | Queue_buffer message_queue -> close_unbounded_queue message_queue
    | Bounded_buffer message_queue -> close_queue message_queue
    | Dropbox_buffer message_box ->
        (try Option.iter wakeup (Lwt_dropbox.peek message_box)
         with Lwt_dropbox.Closed -> ()) ;
        Lwt_dropbox.close message_box

  let pop (type a) (w : a t) =
    let open Lwt_syntax in
    let pop_queue message_queue =
      match w.timeout with
      | None ->
          let* m = Lwt_pipe.Bounded.pop message_queue in
          return_some m
      | Some timeout ->
          Lwt_pipe.Bounded.pop_with_timeout
            (Systime_os.sleep timeout)
            message_queue
    in
    let pop_unbounded_queue message_queue =
      match w.timeout with
      | None ->
          let* m = Lwt_pipe.Unbounded.pop message_queue in
          return_some m
      | Some timeout ->
          Lwt_pipe.Unbounded.pop_with_timeout
            (Systime_os.sleep timeout)
            message_queue
    in
    match w.buffer with
    | Queue_buffer message_queue -> pop_unbounded_queue message_queue
    | Bounded_buffer message_queue -> pop_queue message_queue
    | Dropbox_buffer message_box -> (
        match w.timeout with
        | None ->
            let* m = Lwt_dropbox.take message_box in
            return_some m
        | Some timeout ->
            Lwt_dropbox.take_with_timeout (Systime_os.sleep timeout) message_box
        )

  let trigger_shutdown w = Lwt.ignore_result (Lwt_canceler.cancel w.canceler)

  let canceler {canceler; _} = canceler

  module type HANDLERS = sig
    type self

    type launch_error

    val on_launch :
      self ->
      Name.t ->
      Types.parameters ->
      (Types.state, launch_error) result Lwt.t

    val on_request :
      self ->
      ('a, 'request_error) Request.t ->
      ('a, 'request_error) result Lwt.t

    val on_no_request : self -> unit Lwt.t

    val on_close : self -> unit Lwt.t

    val on_error :
      self ->
      Worker_types.request_status ->
      ('a, 'request_error) Request.t ->
      'request_error ->
      unit tzresult Lwt.t

    val on_completion :
      self ->
      ('a, 'request_error) Request.t ->
      'a ->
      Worker_types.request_status ->
      unit Lwt.t
  end

  let create_table buffer_kind =
    {buffer_kind; last_id = 0; instances = Nametbl.create ~random:true 10}

  let close (type kind) handlers (w : kind t) errs =
    (* FIXME: https://gitlab.com/tezos/tezos/-/issues/3264
              close should be called only once in a worker lifetime *)
    let (module Handlers : HANDLERS with type self = kind t) = handlers in
    let open Lwt_syntax in
    match w.status with
    (* Launching is not accessible from here, as the only occurrence
       in form [launch] and happens before the call to [worker_loop] *)
    | Closed _ | Closing _ | Launching _ -> Lwt.return_unit
    | Running t0 ->
        w.status <- Closing (t0, Time.System.now ()) ;
        close w ;
        let* () = Error_monad.cancel_with_exceptions w.canceler in
        w.status <- Closed (t0, Time.System.now (), errs) ;
        let* () = Handlers.on_close w in
        Nametbl.remove w.table.instances w.name ;
        w.state <- None ;
        return_unit

  let worker_loop (type kind) handlers (w : kind t) =
    let (module Handlers : HANDLERS with type self = kind t) = handlers in
    let open Lwt_syntax in
    let rec loop () =
      (* The call to [protect] here allows the call to [pop] (responsible
         for fetching the next request) to be canceled by the use of the
         [canceler].

         These cancellations cannot affect the processing of ongoing requests.
         This is due to the limited scope of the argument of [protect]. As a
         result, ongoing requests are never canceled by this mechanism.

         In the case when the [canceler] is canceled whilst a request is being
         processed, the processing eventually resolves, at which point a
         recursive call to this [loop] at which point this call to [protect]
         fails immediately with [Canceled]. *)
      let* popped = protect_result (fun () -> pop w) in
      match popped with
      | Error exn -> raise exn
      | Ok None ->
          let* () = Handlers.on_no_request w in
          loop ()
      | Ok (Some (pushed, Message (request, u))) -> (
          let current_request = Request.view request in
          let treated = Time.System.now () in
          w.current_request <- Some (pushed, treated, current_request) ;
          let* r =
            match u with
            | None -> (
                let open Lwt_result_syntax in
                let*! res = Handlers.on_request w request in
                match res with
                | Error err -> Lwt.return_error err
                | Ok res ->
                    let completed = Time.System.now () in
                    w.current_request <- None ;
                    let status = Worker_types.{pushed; treated; completed} in
                    let*! () = Handlers.on_completion w request res status in
                    let*! () =
                      Worker_events.(emit request_no_errors)
                        (current_request, status)
                    in
                    return_unit)
            | Some u -> (
                (* [res] is a result. But the side effect [wakeup]
                   needs to happen regardless of success (Ok) or failure
                   (Error). To that end, we treat it locally like a regular
                   promise (which happens to carry a [result]) within the Lwt
                   monad. *)
                let* res = Handlers.on_request w request in
                match res with
                | Error err ->
                    Lwt.wakeup_later u (Error (Request_error err)) ;
                    Lwt.return (Error err)
                | Ok res ->
                    Lwt.wakeup_later u (Ok res) ;
                    let completed = Time.System.now () in
                    let status = Worker_types.{pushed; treated; completed} in
                    w.current_request <- None ;
                    let* () = Handlers.on_completion w request res status in
                    let* () =
                      Worker_events.(emit request_no_errors)
                        (current_request, status)
                    in
                    return (Ok ()))
          in
          match r with
          | Ok () -> loop ()
          | Error err -> (
              let* r =
                match w.current_request with
                | Some (pushed, treated, _request_view) ->
                    let completed = Time.System.now () in
                    w.current_request <- None ;
                    Handlers.on_error
                      w
                      Worker_types.{pushed; treated; completed}
                      request
                      err
                | None -> assert false
              in
              match r with
              | Ok () -> loop ()
              | Error errs ->
                  let* () = Worker_events.(emit crashed) errs in
                  close handlers w (Some errs)))
    in
    let* r = protect_result ~canceler:w.canceler (fun () -> loop ()) in
    match r with
    | Ok () -> Lwt.return_unit
    | Error Lwt.Canceled | Error Lwt_pipe.Closed | Error Lwt_dropbox.Closed ->
        let* () = Worker_events.(emit terminated) () in
        close handlers w None
    | Error exn ->
        let* () = Worker_events.(emit crashed) [Exn exn] in
        raise exn

  let launch :
      type kind launch_error.
      kind table ->
      ?timeout:Time.System.Span.t ->
      Name.t ->
      Types.parameters ->
      (module HANDLERS
         with type self = kind t
          and type launch_error = launch_error) ->
      (kind t, launch_error) result Lwt.t =
   fun table ?timeout name parameters (module Handlers) ->
    let name_s = Format.asprintf "%a" Name.pp name in
    let full_name =
      if name_s = "" then base_name
      else Format.asprintf "%s_%s" base_name name_s
    in
    if Nametbl.mem table.instances name then
      invalid_arg
        (Format.asprintf "Worker.launch: duplicate worker %s" full_name)
    else
      let id =
        table.last_id <- table.last_id + 1 ;
        table.last_id
      in
      let id_name =
        if name_s = "" then base_name else Format.asprintf "%s_%d" base_name id
      in
      let canceler = Lwt_canceler.create () in
      let buffer : kind buffer =
        match table.buffer_kind with
        | Queue -> Queue_buffer (Lwt_pipe.Unbounded.create ())
        | Bounded {size} ->
            Bounded_buffer
              (Lwt_pipe.Bounded.create
                 ~max_size:size
                 ~compute_size:(fun _ -> 1)
                 ())
        | Dropbox _ -> Dropbox_buffer (Lwt_dropbox.create ())
      in
      let w =
        {
          parameters;
          name;
          canceler;
          table;
          buffer;
          state = None;
          id;
          worker = Lwt.return_unit;
          timeout;
          current_request = None;
          status = Launching (Time.System.now ());
        }
      in
      Nametbl.add table.instances name w ;
      let open Lwt_result_syntax in
      let*! () =
        if id_name = base_name then Worker_events.(emit started) ()
        else Worker_events.(emit started_for) name_s
      in
      let* state = Handlers.on_launch w name parameters in
      w.status <- Running (Time.System.now ()) ;
      w.state <- Some state ;
      w.worker <-
        Lwt_utils.worker
          full_name
          ~on_event:Internal_event.Lwt_worker_logger.on_event
          ~run:(fun () -> worker_loop (module Handlers) w)
          ~cancel:(fun () -> Error_monad.cancel_with_exceptions w.canceler) ;
      return w

  let shutdown w =
    (* The actual cancellation ([Lwt_canceler.cancel w.canceler]) resolves
       immediately because no hooks are registered on the canceler. However, the
       worker ([w.worker]) resolves only once the ongoing request has resolved
       (if any) and some clean-up operations have completed. *)
    let open Lwt_syntax in
    let* () = Worker_events.(emit triggering_shutdown) () in
    let* () = Error_monad.cancel_with_exceptions w.canceler in
    w.worker

  let state w =
    match (w.state, w.status) with
    | None, Launching _ ->
        invalid_arg
          (Format.asprintf
             "Worker.state (%s[%a]): state called before worker was initialized"
             base_name
             Name.pp
             w.name)
    | None, (Closing _ | Closed _) ->
        invalid_arg
          (Format.asprintf
             "Worker.state (%s[%a]): state called after worker was terminated"
             base_name
             Name.pp
             w.name)
    | None, _ -> assert false
    | Some state, _ -> state

  let with_state :
      _ t -> (Types.state -> (unit, 'b) result Lwt.t) -> (unit, 'b) result Lwt.t
      =
   fun w f ->
    match w.state with
    | Some state -> f state
    | None -> Lwt_result_syntax.return_unit

  let pending_requests q = Queue.pending_requests q

  let status {status; _} = status

  let current_request {current_request; _} = current_request

  let information (type a) (w : a t) =
    {
      Worker_types.instances_number = Nametbl.length w.table.instances;
      wstatus = w.status;
      queue_length =
        (match w.buffer with
        | Queue_buffer pipe -> Lwt_pipe.Unbounded.length pipe
        | Bounded_buffer pipe -> Lwt_pipe.Bounded.length pipe
        | Dropbox_buffer _ -> 1);
    }

  let list {instances; _} =
    Nametbl.fold (fun n w acc -> (n, w) :: acc) instances []

  let find_opt {instances; _} = Nametbl.find instances

  let () =
    Internal_event.register_section
      (Internal_event.Section.make_sanitized Name.base)
end

module MakeGroup (Name : Worker_intf.NAME) (Request : Worker_intf.REQUEST) =
struct
  module Events =
    Worker_events.Make (Name) (Request)
      (struct
        type t = tztrace

        let encoding = Error_monad.trace_encoding

        let pp = Error_monad.pp_print_trace
      end)

  module MakeWorker (Types : Worker_intf.TYPES) = struct
    include Make_internal (Name) (Request) (Types) (Events)
  end
end

module MakeSingle
    (Name : Worker_intf.NAME)
    (Request : Worker_intf.REQUEST)
    (Types : Worker_intf.TYPES) =
struct
  module WG = MakeGroup (Name) (Request)
  include WG.MakeWorker (Types)
end
OCaml

Innovation. Community. Security.