package octez-protocol-019-PtParisB-libs

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file baking_lib.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com>     *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

open Protocol
open Alpha_context
open Baking_state

let create_state cctxt ?synchronize ?monitor_node_mempool ~config
    ~current_proposal delegates =
  let open Lwt_result_syntax in
  let chain = cctxt#chain in
  let monitor_node_operations = monitor_node_mempool in
  let*! operation_worker =
    Operation_worker.create ?monitor_node_operations cctxt
  in
  Baking_scheduling.create_initial_state
    cctxt
    ?synchronize
    ~chain
    config
    operation_worker
    ~current_proposal
    delegates

let get_current_proposal cctxt ?cache () =
  let open Lwt_result_syntax in
  let* block_stream, _block_stream_stopper =
    Node_rpc.monitor_heads cctxt ?cache ~chain:cctxt#chain ()
  in
  let*! current_head = Lwt_stream.peek block_stream in
  match current_head with
  | Some current_head -> return (block_stream, current_head)
  | None -> failwith "head stream unexpectedly ended"

module Events = Baking_events.Lib

let preattest (cctxt : Protocol_client_context.full) ?(force = false) delegates
    =
  let open State_transitions in
  let open Lwt_result_syntax in
  let cache = Baking_cache.Block_cache.create 10 in
  let* _, current_proposal = get_current_proposal cctxt ~cache () in
  let config = Baking_configuration.make ~force () in
  let* state = create_state cctxt ~config ~current_proposal delegates in
  let proposal = state.level_state.latest_proposal in
  let*! () =
    Events.(
      emit
        attempting_to_vote_for_proposal
        (Preattestation, state.level_state.latest_proposal))
  in
  let* () =
    if force then return_unit
    else
      let*! proposal_acceptance =
        is_acceptable_proposal_for_current_level state proposal
      in
      match proposal_acceptance with
      | Invalid -> cctxt#error "Cannot preattest an invalid proposal"
      | Outdated_proposal -> cctxt#error "Cannot preattest an outdated proposal"
      | Valid_proposal -> return_unit
  in
  let consensus_batch =
    make_consensus_vote_batch state proposal Preattestation
  in
  let*! () =
    cctxt#message
      "@[<v 2>Preattesting for:@ %a@]"
      Format.(
        pp_print_list
          ~pp_sep:pp_print_space
          Baking_state.pp_consensus_key_and_delegate)
      (List.map
         (fun ({delegate; _} : unsigned_consensus_vote) -> delegate)
         consensus_batch.unsigned_consensus_votes)
  in
  let* signed_consensus_batch =
    Baking_actions.sign_consensus_votes state.global_state consensus_batch
  in
  Baking_actions.inject_consensus_votes state signed_consensus_batch

let attest (cctxt : Protocol_client_context.full) ?(force = false) delegates =
  let open State_transitions in
  let open Lwt_result_syntax in
  let cache = Baking_cache.Block_cache.create 10 in
  let* _, current_proposal = get_current_proposal cctxt ~cache () in
  let config = Baking_configuration.make ~force () in
  let* state = create_state cctxt ~config ~current_proposal delegates in
  let proposal = state.level_state.latest_proposal in
  let*! () =
    Events.(
      emit
        attempting_to_vote_for_proposal
        (Attestation, state.level_state.latest_proposal))
  in
  let* () =
    if force then return_unit
    else
      let*! proposal_acceptance =
        is_acceptable_proposal_for_current_level state proposal
      in
      match proposal_acceptance with
      | Invalid -> cctxt#error "Cannot attest an invalid proposal"
      | Outdated_proposal -> cctxt#error "Cannot attest an outdated proposal"
      | Valid_proposal -> return_unit
  in
  let consensus_batch = make_consensus_vote_batch state proposal Attestation in
  let*! () =
    cctxt#message
      "@[<v 2>Attesting for:@ %a@]"
      Format.(
        pp_print_list
          ~pp_sep:pp_print_space
          Baking_state.pp_consensus_key_and_delegate)
      (List.map
         (fun ({delegate; _} : unsigned_consensus_vote) -> delegate)
         consensus_batch.unsigned_consensus_votes)
  in
  let* signed_consensus_batch =
    Baking_actions.sign_consensus_votes state.global_state consensus_batch
  in
  let* () =
    Baking_state.may_record_new_state ~previous_state:state ~new_state:state
  in
  Baking_actions.inject_consensus_votes state signed_consensus_batch

let do_action (state, action) =
  let open Lwt_result_syntax in
  let* new_state = Baking_actions.perform_action state action in
  let* () =
    Baking_state.may_record_new_state ~previous_state:state ~new_state
  in
  return new_state

let bake_at_next_level_event state =
  let open Lwt_result_syntax in
  let cctxt = state.global_state.cctxt in
  let*! baking_time =
    Baking_scheduling.compute_next_potential_baking_time_at_next_level state
  in
  match baking_time with
  | None -> cctxt#error "No baking slot found for the delegates"
  | Some (timestamp, round) ->
      let*! () =
        cctxt#message
          "Waiting until %a for round %a"
          Timestamp.pp
          timestamp
          Round.pp
          round
      in
      let*! () =
        Option.value
          ~default:Lwt.return_unit
          (Baking_scheduling.sleep_until timestamp)
      in
      return
        (Baking_state.Timeout
           (Time_to_prepare_next_level_block {at_round = round}))

let bake_at_next_level state =
  let open Lwt_result_syntax in
  let* event = bake_at_next_level_event state in
  let*! state, action = State_transitions.step state event in
  match action with
  | Prepare_block {block_to_bake} ->
      let* prepared_block =
        Baking_actions.prepare_block state.global_state block_to_bake
      in
      let* new_state =
        do_action
          ( state,
            Inject_block
              {prepared_block; force_injection = false; asynchronous = false} )
      in
      return new_state
  | _ -> assert false

(* Simulate the end of the current round to bootstrap the automaton
   or attest the block if necessary *)
let first_automaton_event state =
  match state.level_state.elected_block with
  | None -> Lwt.return (Baking_scheduling.compute_bootstrap_event state)
  | Some _elected_block ->
      (* If there is an elected block we can directly bake at next
         level after waiting its date *)
      bake_at_next_level_event state

let attestations_attesting_power state attestations =
  let get_attestation_voting_power {slot; _} =
    match
      Delegate_slots.voting_power state.level_state.delegate_slots ~slot
    with
    | None -> 0 (* cannot happen *)
    | Some attesting_power -> attesting_power
  in
  List.sort_uniq compare attestations
  |> List.fold_left
       (fun power attestation ->
         power + get_attestation_voting_power attestation)
       0

let generic_attesting_power (filter : packed_operation list -> 'a list)
    (extract : 'a -> consensus_content) state =
  let current_mempool =
    Operation_worker.get_current_operations state.global_state.operation_worker
  in
  let latest_proposal = state.level_state.latest_proposal in
  let block_round = latest_proposal.block.round in
  let shell_level = latest_proposal.block.shell.level in
  let attestations =
    filter (Operation_pool.Operation_set.elements current_mempool.consensus)
  in
  let attestations_in_mempool =
    List.filter_map
      (fun v ->
        let consensus_content = extract v in
        if
          Round.(consensus_content.round = block_round)
          && Compare.Int32.(
               Raw_level.to_int32 consensus_content.level = shell_level)
        then Some consensus_content
        else None)
      attestations
  in
  let power = attestations_attesting_power state attestations_in_mempool in
  (power, attestations)

let state_attesting_power =
  generic_attesting_power
    Operation_pool.filter_attestations
    (fun
      ({
         protocol_data =
           {
             contents = Single (Attestation {consensus_content; dal_content = _});
             _;
           };
         _;
       } :
        Kind.attestation operation)
    -> consensus_content)

let propose_at_next_level ~minimal_timestamp state =
  let open Lwt_result_syntax in
  let cctxt = state.global_state.cctxt in
  assert (Option.is_some state.level_state.elected_block) ;
  if minimal_timestamp then
    let* minimal_round, delegate =
      match
        Baking_scheduling.first_potential_round_at_next_level
          state
          ~earliest_round:Round.zero
      with
      | None -> cctxt#error "No potential baking slot for the given delegates."
      | Some first_potential_round -> return first_potential_round
    in
    let pool =
      Operation_worker.get_current_operations
        state.global_state.operation_worker
    in
    let kind = Fresh pool in
    let block_to_bake =
      {
        predecessor = state.level_state.latest_proposal.block;
        round = minimal_round;
        delegate;
        kind;
        force_apply = state.global_state.config.force_apply;
      }
    in
    let* prepared_block =
      Baking_actions.prepare_block state.global_state block_to_bake
    in
    let* state =
      do_action
        ( state,
          Inject_block
            {
              prepared_block;
              force_injection = minimal_timestamp;
              asynchronous = false;
            } )
    in
    let*! () =
      cctxt#message
        "Proposed block at round %a on top of %a "
        Round.pp
        block_to_bake.round
        Block_hash.pp
        block_to_bake.predecessor.hash
    in
    return state
  else
    let* state = bake_at_next_level state in
    let*! () = cctxt#message "Proposal injected" in
    return state

let attestation_quorum state =
  let power, attestations = state_attesting_power state in
  if
    Compare.Int.(
      power >= state.global_state.constants.parametric.consensus_threshold)
  then Some (power, attestations)
  else None

(* Here's the sketch of the algorithm:
   Do I have an attestation quorum for the current block or an elected block?
   - Yes :: wait and propose at next level
   - No  ::
     Is the current proposal at the right round?
     - Yes :: fail propose
     - No  ::
       Is there a preattestation quorum or does the last proposal contain a prequorum?
       - Yes :: repropose block with right payload and preattestations for current round
       - No  :: repropose fresh block for current round *)
let propose (cctxt : Protocol_client_context.full) ?minimal_fees
    ?minimal_nanotez_per_gas_unit ?minimal_nanotez_per_byte ?force_apply
    ?(force = false) ?(minimal_timestamp = false) ?extra_operations
    ?context_path ?state_recorder delegates =
  let open Lwt_result_syntax in
  let cache = Baking_cache.Block_cache.create 10 in
  let* _block_stream, current_proposal = get_current_proposal cctxt ~cache () in
  let config =
    Baking_configuration.make
      ?minimal_fees
      ?minimal_nanotez_per_gas_unit
      ?minimal_nanotez_per_byte
      ?context_path
      ?force_apply
      ~force
      ?extra_operations
      ?state_recorder
      ()
  in
  let* state = create_state cctxt ~config ~current_proposal delegates in
  (* Make sure the operation worker is populated to avoid empty blocks
     being proposed. *)
  let* () =
    Operation_worker.retrieve_pending_operations
      cctxt
      state.global_state.operation_worker
  in
  let* _ =
    match state.level_state.elected_block with
    | Some _ -> propose_at_next_level ~minimal_timestamp state
    | None -> (
        match attestation_quorum state with
        | Some (_voting_power, attestation_qc) ->
            let state =
              {
                state with
                round_state =
                  {
                    state.round_state with
                    current_phase = Baking_state.Awaiting_attestations;
                  };
              }
            in
            let latest_proposal = state.level_state.latest_proposal.block in
            let candidate =
              {
                Operation_worker.hash = latest_proposal.hash;
                round_watched = latest_proposal.round;
                payload_hash_watched = latest_proposal.payload_hash;
              }
            in
            let* state =
              let*! action =
                State_transitions.step
                  state
                  (Baking_state.Quorum_reached (candidate, attestation_qc))
              in
              do_action action
              (* this will register the elected block *)
            in
            propose_at_next_level ~minimal_timestamp state
        | None -> (
            let*? event = Baking_scheduling.compute_bootstrap_event state in
            let*! state, _action = State_transitions.step state event in
            let latest_proposal = state.level_state.latest_proposal in
            let open State_transitions in
            let round = state.round_state.current_round in
            let*! proposal_acceptance =
              is_acceptable_proposal_for_current_level state latest_proposal
            in
            match proposal_acceptance with
            | Invalid | Outdated_proposal -> (
                match round_proposer state ~level:`Current round with
                | Some {consensus_key_and_delegate; _} ->
                    let*! action =
                      State_transitions.propose_block_action
                        state
                        consensus_key_and_delegate
                        round
                        ~last_proposal:state.level_state.latest_proposal
                    in
                    let* state =
                      match action with
                      | Prepare_block {block_to_bake} ->
                          let* prepared_block =
                            Baking_actions.prepare_block
                              state.global_state
                              block_to_bake
                          in
                          let* state =
                            do_action
                              ( state,
                                Inject_block
                                  {
                                    prepared_block;
                                    force_injection = force;
                                    asynchronous = false;
                                  } )
                          in
                          return state
                      | Inject_block {prepared_block; _} ->
                          let* state =
                            do_action
                              ( state,
                                Inject_block
                                  {
                                    prepared_block;
                                    force_injection = force;
                                    asynchronous = false;
                                  } )
                          in
                          return state
                      | _ -> assert false
                    in
                    let*! () =
                      cctxt#message
                        "Reproposed block at level %ld on round %a"
                        state.level_state.current_level
                        Round.pp
                        state.round_state.current_round
                    in
                    return state
                | None -> cctxt#error "No slots for current round")
            | Valid_proposal ->
                cctxt#error
                  "Cannot propose: there's already a valid proposal for the \
                   current round %a"
                  Round.pp
                  round))
  in
  return_unit

let repropose (cctxt : Protocol_client_context.full) ?(force = false)
    ?force_round delegates =
  let open Lwt_result_syntax in
  let open Baking_state in
  let cache = Baking_cache.Block_cache.create 10 in
  let* _block_stream, current_proposal = get_current_proposal cctxt ~cache () in
  let config = Baking_configuration.make ~force () in
  let* state = create_state cctxt ~config ~current_proposal delegates in
  (* Make sure the operation worker is populated to avoid empty blocks
     being proposed. *)
  let*? event = Baking_scheduling.compute_bootstrap_event state in
  let*! state, _action = State_transitions.step state event in
  let latest_proposal = state.level_state.latest_proposal in
  let open State_transitions in
  let round =
    match force_round with
    | Some x -> x
    | None -> state.round_state.current_round
  in
  let*! proposal_validity =
    is_acceptable_proposal_for_current_level state latest_proposal
  in
  match proposal_validity with
  | Invalid | Outdated_proposal -> (
      match Baking_state.round_proposer state ~level:`Current round with
      | Some {consensus_key_and_delegate; _} ->
          let*! action =
            State_transitions.propose_block_action
              state
              consensus_key_and_delegate
              round
              ~last_proposal:state.level_state.latest_proposal
          in
          let* signed_block =
            match action with
            | Prepare_block {block_to_bake} ->
                let* signed_block =
                  Baking_actions.prepare_block state.global_state block_to_bake
                in
                let* _state =
                  do_action
                    ( state,
                      Inject_block
                        {
                          prepared_block = signed_block;
                          force_injection = force;
                          asynchronous = false;
                        } )
                in
                return signed_block
            | _ -> assert false
          in
          let*! () =
            cctxt#message
              "Reproposed block at level %ld on round %a"
              signed_block.signed_block_header.shell.level
              Round.pp
              signed_block.round
          in
          return_unit
      | None -> cctxt#error "No slots for current round")
  | Valid_proposal ->
      cctxt#error
        "Cannot propose: there's already a valid proposal for the current \
         round %a"
        Round.pp
        round

let bake_using_automaton ~count config state heads_stream =
  let open Lwt_result_syntax in
  let cctxt = state.global_state.cctxt in
  let* initial_event = first_automaton_event state in
  let current_level = state.level_state.latest_proposal.block.shell.level in
  let forge_event_stream =
    state.global_state.forge_worker_hooks.get_forge_event_stream ()
  in
  let loop_state =
    Baking_scheduling.create_loop_state
      ~heads_stream
      ~forge_event_stream
      state.global_state.operation_worker
  in
  let stop_on_next_level_block = function
    | New_head_proposal proposal ->
        Compare.Int32.(
          proposal.block.shell.level >= Int32.(add current_level (of_int count)))
    | _ -> false
  in
  let* event_opt =
    Baking_scheduling.automaton_loop
      ~stop_on_event:stop_on_next_level_block
      ~config
      ~on_error:(fun err -> Lwt.return (Error err))
      loop_state
      state
      initial_event
  in
  match event_opt with
  | Some (New_head_proposal proposal) ->
      let*! () =
        cctxt#message
          "Last injected block: %a (level %ld)"
          Block_hash.pp
          proposal.block.hash
          proposal.block.shell.level
      in
      return_unit
  | _ -> cctxt#error "Baking loop unexpectedly ended"

(* attest the latest proposal and bake with it *)
let rec baking_minimal_timestamp ~count state
    (block_stream : proposal Lwt_stream.t) =
  let open Lwt_result_syntax in
  let cctxt = state.global_state.cctxt in
  let latest_proposal = state.level_state.latest_proposal in
  let own_attestations =
    State_transitions.make_consensus_vote_batch
      state
      latest_proposal
      Attestation
  in
  let current_mempool =
    Operation_worker.get_current_operations state.global_state.operation_worker
  in
  let attestations_in_mempool =
    Operation_pool.(
      filter_attestations (Operation_set.elements current_mempool.consensus))
    |> List.filter_map
         (fun
           ({
              protocol_data =
                {contents = Single (Attestation {consensus_content; _}); _};
              _;
            } :
             Kind.attestation operation)
         ->
           if
             Round.(consensus_content.round = latest_proposal.block.round)
             && Compare.Int32.(
                  Raw_level.to_int32 consensus_content.level
                  = latest_proposal.block.shell.level)
           then Some consensus_content
           else None)
  in
  let total_voting_power =
    List.fold_left
      (fun attestations own ->
        own.Baking_state.vote_consensus_content :: attestations)
      attestations_in_mempool
      own_attestations.unsigned_consensus_votes
    |> attestations_attesting_power state
  in
  let consensus_threshold =
    state.global_state.constants.parametric.consensus_threshold
  in
  let* () =
    if Compare.Int.(total_voting_power < consensus_threshold) then
      cctxt#error
        "Delegates do not have enough voting power. Only %d is available while \
         %d is required."
        total_voting_power
        consensus_threshold
    else return_unit
  in
  let* minimal_round, delegate =
    match
      Baking_scheduling.first_potential_round_at_next_level
        state
        ~earliest_round:Round.zero
    with
    | None -> cctxt#error "No potential baking slot for the given delegates."
    | Some first_potential_round -> return first_potential_round
  in
  let* signed_attestations =
    let*! own_attestations_with_dal =
      dal_content_map_p
        (Baking_actions.may_get_dal_content state)
        own_attestations
    in
    Baking_actions.sign_consensus_votes
      state.global_state
      own_attestations_with_dal
  in
  let pool =
    Operation_pool.add_operations
      current_mempool
      (List.map
         (fun signed_consensus -> signed_consensus.signed_operation)
         signed_attestations.signed_consensus_votes)
  in
  let kind = Fresh pool in
  let block_to_bake =
    {
      predecessor = latest_proposal.block;
      round = minimal_round;
      delegate;
      kind;
      force_apply = state.global_state.config.force_apply;
    }
  in
  let* prepared_block =
    Baking_actions.prepare_block state.global_state block_to_bake
  in
  let* new_state =
    do_action
      ( state,
        Inject_block
          {prepared_block; force_injection = true; asynchronous = false} )
  in
  let*! () = cctxt#message "Injected block at minimal timestamp" in
  if count <= 1 then return_unit
  else
    let*! () =
      let attestation_level = Int32.succ latest_proposal.block.shell.level in
      Lwt_stream.junk_while_s
        (fun proposal ->
          Lwt.return
            Compare.Int32.(
              proposal.Baking_state.block.shell.level <> attestation_level))
        block_stream
    in
    let*! next_level_proposal =
      let*! r = Lwt_stream.get block_stream in
      match r with
      | None -> cctxt#error "Stream unexpectedly ended"
      | Some b -> Lwt.return b
    in
    let*! new_state, action =
      State_transitions.step new_state (New_head_proposal next_level_proposal)
    in
    let* new_state =
      match action with
      | Update_to_level update ->
          let* new_state, _preattest_action =
            Baking_actions.update_to_level new_state update
          in
          return
            {
              new_state with
              round_state =
                {
                  Baking_state.current_round = Round.zero;
                  current_phase = Idle;
                  delayed_quorum = None;
                  early_attestations = [];
                  awaiting_unlocking_pqc = false;
                };
            }
      | _ ->
          (* Algorithmically, this will always be an update_to_level
             action. *)
          assert false
    in
    baking_minimal_timestamp ~count:(pred count) new_state block_stream

let bake (cctxt : Protocol_client_context.full) ?minimal_fees
    ?minimal_nanotez_per_gas_unit ?minimal_nanotez_per_byte ?force_apply ?force
    ?(minimal_timestamp = false) ?extra_operations
    ?(monitor_node_mempool = true) ?context_path ?dal_node_endpoint ?(count = 1)
    ?votes ?state_recorder delegates =
  let open Lwt_result_syntax in
  let config =
    Baking_configuration.make
      ?minimal_fees
      ?minimal_nanotez_per_gas_unit
      ?minimal_nanotez_per_byte
      ?context_path
      ?force_apply
      ?force
      ?extra_operations
      ?dal_node_endpoint
      ?votes
      ?state_recorder
      ()
  in
  let cache = Baking_cache.Block_cache.create 10 in
  let* block_stream, current_proposal = get_current_proposal cctxt ~cache () in
  let* state =
    create_state
      cctxt
      ~monitor_node_mempool
      ~synchronize:(not minimal_timestamp)
      ~config
      ~current_proposal
      delegates
  in
  let* () =
    when_ monitor_node_mempool (fun () ->
        (* Make sure the operation worker is populated to avoid empty
           blocks being baked *)
        Operation_worker.retrieve_pending_operations
          cctxt
          state.global_state.operation_worker)
  in
  if not minimal_timestamp then
    bake_using_automaton ~count config state block_stream
  else baking_minimal_timestamp ~count state block_stream
OCaml

Innovation. Community. Security.