Source file layer_1.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
(**
Errors
======
*)
type error += Cannot_find_predecessor of Block_hash.t
let () =
register_error_kind
~id:"lib_crawler.cannot_find_predecessor"
~title:"Cannot find block predecessor from L1"
~description:"A predecessor couldn't be found from the L1 node"
~pp:(fun ppf hash ->
Format.fprintf
ppf
"Block with hash %a has no predecessor on the L1 node."
Block_hash.pp
hash)
`Temporary
Data_encoding.(obj1 (req "hash" Block_hash.encoding))
(function Cannot_find_predecessor hash -> Some hash | _ -> None)
(fun hash -> Cannot_find_predecessor hash)
(**
State
=====
*)
type t = {
name : string;
protocols : Protocol_hash.t list option;
reconnection_delay : float;
heads : (Block_hash.t * Block_header.t) Lwt_stream.t;
cctxt : Client_context.full;
stopper : Tezos_rpc.Context.stopper;
mutable running : bool;
}
let rec connect ~name ?(count = 0) ~delay ~protocols cctxt =
let open Lwt_syntax in
let* () =
if count = 0 then return_unit
else
let fcount = float_of_int (count - 1) in
let delay = delay *. (1.5 ** fcount) in
let delay = min delay 3600. in
let randomization_factor = 0.5 in
let delay =
delay
+. Random.float (delay *. 2. *. randomization_factor)
-. (delay *. randomization_factor)
in
let* () = Layer1_event.wait_reconnect ~name delay in
Lwt_unix.sleep delay
in
let* res =
Tezos_shell_services.Monitor_services.heads ?protocols cctxt cctxt#chain
in
match res with
| Ok (heads, stopper) ->
let heads =
Lwt_stream.map_s
(fun ( hash,
(Tezos_base.Block_header.{shell = {level; _}; _} as ) ) ->
let+ () = Layer1_event.switched_new_head ~name hash level in
(hash, header))
heads
in
return (heads, stopper)
| Error e ->
let* () = Layer1_event.cannot_connect ~name ~count e in
connect ~name ~delay ~protocols ~count:(count + 1) cctxt
let start ~name ~reconnection_delay ?protocols (cctxt : #Client_context.full) =
let open Lwt_syntax in
let* () = Layer1_event.starting ~name in
let+ heads, stopper =
connect ~name ~delay:reconnection_delay ~protocols cctxt
in
{
name;
cctxt = (cctxt :> Client_context.full);
heads;
stopper;
reconnection_delay;
protocols;
running = true;
}
let reconnect l1_ctxt =
let open Lwt_syntax in
let* heads, stopper =
connect
~name:l1_ctxt.name
~count:1
~delay:l1_ctxt.reconnection_delay
~protocols:l1_ctxt.protocols
l1_ctxt.cctxt
in
return {l1_ctxt with heads; stopper}
let shutdown state =
state.stopper () ;
state.running <- false ;
Lwt.return_unit
let regexp_ocaml_exception_connection_error = Re.Str.regexp ".*in connect:.*"
let is_connection_error trace =
TzTrace.fold
(fun yes error ->
yes
||
match error with
| RPC_client_errors.(Request_failed {error = Connection_failed _; _}) ->
true
| RPC_client_errors.(Request_failed {error = OCaml_exception s; _}) ->
Re.Str.string_match regexp_ocaml_exception_connection_error s 0
| _ -> false)
false
trace
let iter_heads l1_ctxt f =
let exception Iter_error of tztrace in
let rec loop l1_ctxt =
let open Lwt_result_syntax in
let*! () =
Lwt_stream.iter_s
(fun head ->
let open Lwt_syntax in
let* res = f head in
match res with
| Ok () -> return_unit
| Error trace when is_connection_error trace ->
Format.eprintf
"@[<v 2>Connection error:@ %a@]@."
pp_print_trace
trace ;
l1_ctxt.stopper () ;
return_unit
| Error e -> raise (Iter_error e))
l1_ctxt.heads
in
when_ l1_ctxt.running @@ fun () ->
let*! () = Layer1_event.connection_lost ~name:l1_ctxt.name in
let*! l1_ctxt = reconnect l1_ctxt in
loop l1_ctxt
in
Lwt.catch
(fun () -> Lwt.no_cancel @@ loop l1_ctxt)
(function Iter_error e -> Lwt.return_error e | exn -> fail_with_exn exn)
let wait_first l1_ctxt =
let rec loop l1_ctxt =
let open Lwt_syntax in
let* head = Lwt_stream.peek l1_ctxt.heads in
match head with
| Some head -> return head
| None ->
let* l1_ctxt = reconnect l1_ctxt in
loop l1_ctxt
in
Lwt.no_cancel @@ loop l1_ctxt
(** [predecessors_of_blocks hashes] given a list of successive block hashes,
from newest to oldest, returns an associative list that associates a hash to
its predecessor in this list. *)
let predecessors_of_blocks hashes =
let rec aux next = function [] -> [] | x :: xs -> (next, x) :: aux x xs in
match hashes with [] -> [] | x :: xs -> aux x xs
(** [get_predecessor block_hash] returns the predecessor block hash of
some [block_hash] through an RPC to the Tezos node. To limit the
number of RPCs, this information is requested for a batch of hashes
and cached locally. *)
let get_predecessor =
let max_cached = 65536 in
let hard_max_read = max_cached in
let module HM =
Aches.Vache.Map (Aches.Vache.FIFO_Precise) (Aches.Vache.Strong) (Block_hash)
in
let cache = HM.create max_cached in
fun ~max_read
cctxt
(chain : Tezos_shell_services.Chain_services.chain)
ancestor ->
let open Lwt_result_syntax in
let max_read = min max_read hard_max_read in
let max_read = max max_read 2 in
match HM.find_opt cache ancestor with
| Some pred -> return_some pred
| None -> (
let* blocks =
Tezos_shell_services.Chain_services.Blocks.list
cctxt
~chain
~heads:[ancestor]
~length:max_read
()
in
match blocks with
| [ancestors] -> (
List.iter
(fun (h, p) -> HM.replace cache h p)
(predecessors_of_blocks ancestors) ;
match HM.find_opt cache ancestor with
| None ->
return_none
| Some predecessor -> return_some predecessor)
| _ -> return_none)
let get_predecessor_opt ?(max_read = 8) state (hash, level) =
let open Lwt_result_syntax in
if level = 0l then return_none
else
let level = Int32.pred level in
let+ hash = get_predecessor ~max_read state.cctxt state.cctxt#chain hash in
Option.map (fun hash -> (hash, level)) hash
let get_predecessor ?max_read state ((hash, _) as head) =
let open Lwt_result_syntax in
let* pred = get_predecessor_opt ?max_read state head in
match pred with
| None -> tzfail (Cannot_find_predecessor hash)
| Some pred -> return pred
let nth_predecessor ~get_predecessor n block =
let open Lwt_result_syntax in
assert (n >= 0) ;
let rec aux acc n block =
if n = 0 then return (block, acc)
else
let* pred = get_predecessor block in
(aux [@tailcall]) (block :: acc) (n - 1) pred
in
aux [] n block
let get_tezos_reorg_for_new_head l1_state
?(get_old_predecessor = get_predecessor l1_state) old_head new_head =
let open Lwt_result_syntax in
let rec aux reorg old_head new_head =
let old_head_hash, _ = old_head in
let new_head_hash, _ = new_head in
if Block_hash.(old_head_hash = new_head_hash) then return reorg
else
let* old_head_pred = get_old_predecessor old_head in
let* new_head_pred = get_predecessor l1_state new_head in
let reorg =
Reorg.
{
old_chain = old_head :: reorg.old_chain;
new_chain = new_head :: reorg.new_chain;
}
in
(aux [@tailcall]) reorg old_head_pred new_head_pred
in
let _, old_head_level = old_head in
let _, new_head_level = new_head in
let distance = Int32.(to_int @@ abs @@ sub new_head_level old_head_level) in
let* old_head, new_head, reorg =
if old_head_level = new_head_level then
return (old_head, new_head, Reorg.no_reorg)
else if old_head_level < new_head_level then
let max_read = distance + 1 in
let+ new_head, new_chain =
nth_predecessor
~get_predecessor:(get_predecessor ~max_read l1_state)
distance
new_head
in
(old_head, new_head, {Reorg.no_reorg with new_chain})
else
let+ old_head, old_chain =
nth_predecessor ~get_predecessor:get_old_predecessor distance old_head
in
(old_head, new_head, {Reorg.no_reorg with old_chain})
in
assert (snd old_head = snd new_head) ;
aux reorg old_head new_head
(** Returns the reorganization of L1 blocks (if any) for [new_head]. *)
let get_tezos_reorg_for_new_head l1_state ?get_old_predecessor old_head new_head
=
let open Lwt_result_syntax in
match old_head with
| `Level l ->
let _, new_head_level = new_head in
if new_head_level < l then return Reorg.no_reorg
else
let distance = Int32.sub new_head_level l |> Int32.to_int in
let max_read = distance + 1 in
let* _block_at_l, new_chain =
nth_predecessor
~get_predecessor:(get_predecessor ~max_read l1_state)
distance
new_head
in
return Reorg.{old_chain = []; new_chain}
| `Head old_head ->
get_tezos_reorg_for_new_head
l1_state
?get_old_predecessor
old_head
new_head
module Internal_for_tests = struct
let dummy cctxt =
let heads, _push = Lwt_stream.create () in
{
name = "dummy_layer_1_for_tests";
reconnection_delay = 5.0;
heads;
cctxt = (cctxt :> Client_context.full);
stopper = Fun.id;
protocols = None;
running = false;
}
end