package octez-protocol-alpha-libs

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file baking_vdf.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2022 Nomadic Labs <contact@nomadic-labs.com>                *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

open Protocol
open Alpha_context
open Client_baking_blocks
module Events = Baking_events.VDF

type vdf_solution = Seed_repr.vdf_solution

type vdf_setup = Seed_repr.vdf_setup

type forked_process = {pid : int; ch_in : Lwt_io.input_channel}

type status =
  | Not_started
  | Started of vdf_setup * forked_process
  | Finished of vdf_setup * vdf_solution
  | Injected
  | Invalid

type 'a state = {
  cctxt : Protocol_client_context.full;
  constants : Constants.t;
  mutable block_stream : (block_info, 'a) result Lwt_stream.t;
  mutable stream_stopper : Tezos_rpc.Context.stopper option;
  mutable cycle : Cycle.t option;
  mutable computation_status : status;
}

let init_block_stream_with_stopper cctxt chain =
  Client_baking_blocks.monitor_heads
    cctxt
    ~next_protocols:(Some [Protocol.hash])
    chain

let stop_block_stream state =
  Option.iter
    (fun stopper ->
      stopper () ;
      state.stream_stopper <- None)
    state.stream_stopper

let emit_with_level msg level =
  let level_i32 = Raw_level.to_int32 level in
  Events.(emit vdf_info) (Printf.sprintf "%s (level %ld)" msg level_i32)

let emit_revelation_not_injected cycle =
  let open Lwt_result_syntax in
  let*! () =
    Events.(emit vdf_info)
      (Printf.sprintf
         "VDF revelation was NOT injected for cycle %ld"
         (Cycle.to_int32 cycle))
  in
  return_unit

let log_errors_and_continue ~name p =
  let open Lwt_syntax in
  let* p in
  match p with
  | Ok () -> return_unit
  | Error errs -> Events.(emit vdf_daemon_error) (name, errs)

let get_seed_computation cctxt chain_id hash =
  let chain = `Hash chain_id in
  let block = `Hash (hash, 0) in
  Alpha_services.Seed_computation.get cctxt (chain, block)

let get_level_info cctxt level =
  let open Lwt_result_syntax in
  let level = Raw_level.to_int32 level in
  let* {protocol_data = {level_info; _}; _} =
    Protocol_client_context.Alpha_block_services.metadata
      cctxt
      ~chain:cctxt#chain
      ~block:(`Level level)
      ()
  in
  return level_info

let is_in_nonce_revelation_stage constants (level : Level.t) =
  let open Lwt_result_syntax in
  let {Constants.parametric = {nonce_revelation_threshold; _}; _} = constants in
  return
    (Vdf_helpers.is_in_nonce_revelation_stage
       ~nonce_revelation_threshold
       ~level)

(* Checks if the VDF setup saved in the state is equal to the one computed
   from a seed *)
let eq_vdf_setup vdf_setup seed_discriminant seed_challenge =
  let open Environment.Vdf in
  let saved_discriminant, saved_challenge = vdf_setup in
  let discriminant, challenge =
    Seed.generate_vdf_setup ~seed_discriminant ~seed_challenge
  in
  Bytes.equal
    (discriminant_to_bytes discriminant)
    (discriminant_to_bytes saved_discriminant)
  && Bytes.equal
       (challenge_to_bytes challenge)
       (challenge_to_bytes saved_challenge)

(* Forge the VDF revelation operation and inject it if:
 *   - it is correct wrt the VDF setup for the current cycle
 *   - we are still in the VDF revelation stage
 * If successful or if the seed no longer needs to be injected,
 * update the computation status. *)
let inject_vdf_revelation cctxt state setup solution chain_id hash
    (level_info : Level.t) =
  let open Lwt_result_syntax in
  let chain = `Hash chain_id in
  let block = `Hash (hash, 0) in
  let level = level_info.level in
  let* seed_computation = get_seed_computation cctxt chain_id hash in
  match seed_computation with
  | Vdf_revelation_stage {seed_discriminant; seed_challenge} ->
      if eq_vdf_setup setup seed_discriminant seed_challenge then (
        let* op_bytes =
          Plugin.RPC.Forge.vdf_revelation
            cctxt
            (chain, block)
            ~branch:hash
            ~solution
            ()
        in
        let op_bytes = Tezos_crypto.Signature.V_latest.(concat op_bytes zero) in
        let* op_hash =
          Shell_services.Injection.operation cctxt ~chain op_bytes
        in
        (* If injection is successful, update the status to [Injected]. *)
        state.computation_status <- Injected ;
        let*! () =
          Events.(emit vdf_revelation_injected)
            ( Cycle.to_int32 level_info.cycle,
              Chain_services.to_string chain,
              op_hash )
        in
        return_unit)
      else (
        (* The VDF setup saved in the state is different from the one computed
         * from the on-chain seed. In practice this would indicate a bug, since
         * it would either mean that the cycle has changed and we have not
         * detected it or that the VDF setup changed mid-cycle. *)
        state.computation_status <- Invalid ;
        let*! () =
          emit_with_level "Error injecting VDF: setup has been updated" level
        in
        return_unit)
  | Nonce_revelation_stage ->
      state.computation_status <- Not_started ;
      let*! () = emit_with_level "Not injecting VDF: new cycle started" level in
      return_unit
  | Computation_finished ->
      state.computation_status <- Injected ;
      let*! () = emit_with_level "Not injecting VDF: already injected" level in
      return_unit

(* Launch the heavy VDF computation as a separate process. This is done in order
 * to not block the main process, allowing it to continue monitoring blocks and
 * to cancel or restart the VDF computation if needed. *)
let fork_vdf_computation state ((discriminant, challenge) as setup) level =
  let open Lwt_syntax in
  let ch_in, forked_out = Lwt_io.pipe () in
  match Lwt_unix.fork () with
  | 0 -> (
      (* In the forked process, try to compute the VDF solution, write it
       * to [forked_out], then exit. *)
      let* () = Lwt_io.close ch_in in
      let solution =
        Environment.Vdf.prove
          discriminant
          challenge
          state.constants.parametric.vdf_difficulty
      in
      match
        Data_encoding.Binary.to_bytes Seed.vdf_solution_encoding solution
      with
      | Ok encoded ->
          let* () = Lwt_io.write_value forked_out encoded in
          exit 0
      | Error _ ->
          let* () = Events.(emit vdf_info) "Error encoding VDF solution" in
          exit 1)
  | pid ->
      (* In the main process, change the computation status to [Started],
         record the forked process data, and continue. *)
      let* () = Lwt_io.close forked_out in
      state.computation_status <- Started (setup, {pid; ch_in}) ;
      let* () =
        emit_with_level
          (Printf.sprintf "Started to compute VDF, pid: %d" pid)
          level
      in
      return_unit

(* Check whether the VDF computation process has exited and read the result.
 * Update the computation status accordingly. *)
let get_vdf_solution_if_ready cctxt state proc setup chain_id hash
    (level_info : Level.t) =
  let open Lwt_result_syntax in
  let level = level_info.level in
  let*! status = Lwt_unix.waitpid [WNOHANG] proc.pid in
  match status with
  | 0, _ ->
      (* If the process is still running, continue *)
      let*! () = emit_with_level "Skipping, VDF computation launched" level in
      return_unit
  | _, WEXITED 0 -> (
      (* If the process has exited normally, read the solution, update
       * the status to [Finished], and attempt to inject the VDF
       * revelation. *)
      let*! encoded_solution = Lwt_io.read_value proc.ch_in in
      match
        Data_encoding.Binary.of_bytes
          Seed.vdf_solution_encoding
          encoded_solution
      with
      | Ok solution ->
          let*! () = Lwt_io.close proc.ch_in in
          state.computation_status <- Finished (setup, solution) ;
          let*! () = emit_with_level "Finished VDF computation" level in
          inject_vdf_revelation
            cctxt
            state
            setup
            solution
            chain_id
            hash
            level_info
      | Error _ ->
          let*! () = Events.(emit vdf_info) "Error decoding VDF solution" in
          state.computation_status <- Not_started ;
          return_unit)
  | _, WEXITED _ | _, WSIGNALED _ | _, WSTOPPED _ ->
      (* If process has exited abnormally, reset the computation status to
       * [Not_started] and continue *)
      state.computation_status <- Not_started ;
      let*! () =
        Events.(emit vdf_info) "VDF computation process exited abnormally"
      in
      return_unit

let kill_forked_process {pid; _} =
  let open Lwt_syntax in
  let* () =
    match Unix.kill pid Sys.sigterm with
    | () ->
        Events.(emit vdf_info)
          (Printf.sprintf
             "Sent SIGTERM to VDF computation process (pid %d)"
             pid)
    | exception Unix.Unix_error (err, _, _) ->
        let msg = Printf.sprintf "%s (pid %d)" (Unix.error_message err) pid in
        Events.(emit vdf_daemon_cannot_kill_computation) msg
  in
  let* pid, status = Lwt_unix.waitpid [] pid in
  let status =
    match status with
    | WEXITED n -> Printf.sprintf "WEXITED %d" n
    | WSIGNALED n -> Printf.sprintf "WSIGNALED %d" n
    | WSTOPPED n -> Printf.sprintf "WSTOPPED %d" n
  in
  Events.(emit vdf_info)
    (Printf.sprintf
       "Exit status for child VDF computation process %d: %s"
       pid
       status)

(* Kill the VDF computation process if one was launched. *)
let maybe_kill_running_vdf_computation state =
  let open Lwt_syntax in
  match state.computation_status with
  | Started (_, proc) ->
      let* () = kill_forked_process proc in
      return_unit
  | _ -> return_unit

(* Checks if the cycle of the last processed block is different from the cycle
 * of the block at [level_info]. *)
let check_new_cycle state (level_info : Level.t) =
  let open Lwt_result_syntax in
  let current_cycle = level_info.cycle in
  match state.cycle with
  | None ->
      (* First processed block, initialise [state.cycle] *)
      state.cycle <- Some current_cycle ;
      return_unit
  | Some cycle ->
      if Cycle.(cycle < current_cycle) then (
        (* The cycle of this block is different from the cycle of the last
         * processed block. Emit an event if the VDF for the previous cycle
         * has not been injected, kill any running VDF computation, and
         * reset the computation status. *)
        let* () =
          match state.computation_status with
          | Injected -> return_unit
          | Started ((_ : vdf_setup), proc) ->
              let*! () = kill_forked_process proc in
              emit_revelation_not_injected cycle
          | Not_started | Finished _ | Invalid ->
              emit_revelation_not_injected cycle
        in
        state.cycle <- Some current_cycle ;
        state.computation_status <- Not_started ;
        return_unit)
      else return_unit

(* The daemon's main job is to launch the VDF computation as soon as it
 * can (i.e., when the nonce revelation stage ends) and to inject
 * the VDF solution as soon as it finishes computing it.
 * Additionally, it must cancel a running VDF computation if its result
 * is no longer required and restart a computation if it failed.
 * The daemon processes the stream of blocks and monitors both
 * the level of the head within a cycle and the [Seed_computation] RPC.
 * The core of this function is a pattern match on the product of
 * [seed_computation] (the on-chain status of the seed computation)
 * and [state.computation_status] (the internal status of the daemon).
 *
 * [seed_computation] is reset at the beginning of a cycle to
 * [Nonce_revelation_stage], mirroring the on-chain change of the computation
 * status. No action is taken while in this state.
 * After [nonce_revelation_threshold] blocks, the status becomes
 * [Vdf_revelation_stage]. A call to the RPC confirms this and provides the seed
 * required to launch the VDF computation.
 * If a VDF revelation operation is injected before the end of the cycle,
 * the status is updated to [Computation_finished]. If a VDF computation is
 * running at that point (i.e., another daemon injected first),
 * it is canceled. *)
let process_new_block (cctxt : #Protocol_client_context.full) state
    {hash; chain_id; protocol; next_protocol; level; _} =
  let open Lwt_result_syntax in
  if Protocol_hash.(protocol <> next_protocol) then
    (* If the protocol has changed, emit an event on every new block and take
     * no further action. It is expected that the daemon corresponding to
     * the new protocol is used instead. *)
    let*! () = Delegate_events.Denunciator.(emit protocol_change_detected) () in
    return_unit
  else
    let* level_info = get_level_info cctxt level in
    (* If head is in a new cycle record it in [state.cycle] and reset
     * [state.computation_status] to [Not_started]. *)
    let* () = check_new_cycle state level_info in
    (* If the chain is in the nonce revelation stage, there is nothing to do. *)
    let* out = is_in_nonce_revelation_stage state.constants level_info in
    if out then
      let*! () =
        emit_with_level "Skipping, still in nonce revelation stage" level
      in
      return_unit
    else
      (* Enter main loop if we are not in the nonce revelation stage and
       * the expected protocol has been activated. *)
      match state.computation_status with
      | Not_started -> (
          let* seed_computation = get_seed_computation cctxt chain_id hash in
          match seed_computation with
          | Vdf_revelation_stage {seed_discriminant; seed_challenge} ->
              (* The chain is in the VDF revelation stage and the computation
               * has not been started, so it is started here, in a separate
               * process. The computation status is updated to [Started]. *)
              let setup =
                Seed.generate_vdf_setup ~seed_discriminant ~seed_challenge
              in
              let*! () = fork_vdf_computation state setup level in
              return_unit
          | Computation_finished ->
              let*! () =
                emit_with_level
                  "Skipping, VDF solution has already been injected"
                  level
              in
              return_unit
          | Nonce_revelation_stage ->
              (* At this point the chain cannot be in the nonce revelation
               * stage. This is checked in [is_in_nonce_revelation_stage]. *)
              assert false)
      | Started (setup, proc) -> (
          let* seed_computation = get_seed_computation cctxt chain_id hash in
          match seed_computation with
          | Vdf_revelation_stage _ ->
              (* The chain is in the VDF computation stage and we have
               * previously started the computation. Check whether it is
               * finished and, if so, update the computation status to
               * [Finished] and immediately inject the solution. *)
              let* () =
                get_vdf_solution_if_ready
                  cctxt
                  state
                  proc
                  setup
                  chain_id
                  hash
                  level_info
              in
              return_unit
          | Computation_finished ->
              (* The chain is no longer in the VDF revelation stage because
               * the solution has already been injected: abort the running
               * computation. *)
              let*! () = kill_forked_process proc in
              let*! () =
                emit_with_level
                  "VDF solution already injected, aborting VDF computation"
                  level
              in
              state.computation_status <- Injected ;
              return_unit
          | Nonce_revelation_stage ->
              (* At this point the chain cannot be in the nonce revelation
               * stage. This is checked in [is_in_nonce_revelation_stage]. *)
              assert false)
      | Finished (setup, solution) ->
          (* VDF solution computed, but not injected. We are only in this case
           * if the first attempt to inject, right after getting the solution,
           * was unsuccessful. While the chain is in the VDF revelation stage,
           * and the solution has not been injected (computation status is
           * [Finished]), we try to inject. If successful, the computation
           * status is updated to [Injected]. *)
          inject_vdf_revelation
            cctxt
            state
            setup
            solution
            chain_id
            hash
            level_info
      | Injected ->
          let*! () =
            emit_with_level "Skipping, VDF solution already injected" level
          in
          return_unit
      | Invalid ->
          let*! () = emit_with_level "Skipping, failed to compute VDF" level in
          return_unit

let start_vdf_worker (cctxt : Protocol_client_context.full) ~canceler constants
    chain =
  let open Lwt_result_syntax in
  let* block_stream, stream_stopper =
    init_block_stream_with_stopper cctxt chain
  in
  let state =
    {
      cctxt;
      constants;
      block_stream;
      stream_stopper = Some stream_stopper;
      cycle = None;
      computation_status = Not_started;
    }
  in
  Lwt_canceler.on_cancel canceler (fun () ->
      let*! () = maybe_kill_running_vdf_computation state in
      stop_block_stream state ;
      Lwt.return_unit) ;
  let rec worker_loop () =
    let*! b =
      Lwt.choose
        [
          (let*! _ = Lwt_exit.clean_up_starts in
           Lwt.return `Termination);
          (let*! e = Lwt_stream.get state.block_stream in
           Lwt.return (`Block e));
        ]
    in
    match b with
    | `Termination -> return_unit
    | `Block (None | Some (Error _)) ->
        (* Exit when the node is unavailable *)
        stop_block_stream state ;
        let*! () = Events.(emit vdf_daemon_connection_lost) name in
        tzfail Baking_errors.Node_connection_lost
    | `Block (Some (Ok bi)) ->
        let*! () =
          log_errors_and_continue ~name @@ process_new_block cctxt state bi
        in
        worker_loop ()
  in
  worker_loop ()
OCaml

Innovation. Community. Security.