Source file layer1.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
(**
Errors
======
*)
type error += Cannot_find_block of Block_hash.t
let () =
register_error_kind
~id:"sc_rollup.node.cannot_find_block"
~title:"Cannot find block from L1"
~description:"A block couldn't be found from the L1 node"
~pp:(fun ppf hash ->
Format.fprintf
ppf
"Block with hash %a was not found on the L1 node."
Block_hash.pp
hash)
`Temporary
Data_encoding.(obj1 (req "hash" Block_hash.encoding))
(function Cannot_find_block hash -> Some hash | _ -> None)
(fun hash -> Cannot_find_block hash)
(**
State
=====
*)
let =
let open Data_encoding in
conv
(fun {hash; level = _; } -> (hash, header))
(fun (hash, ) -> {hash; level = header.level; header})
(merge_objs
(obj1 (req "hash" Block_hash.encoding))
Block_header.shell_header_encoding)
type head = {hash : Block_hash.t; level : int32}
let head_encoding =
let open Data_encoding in
conv
(fun {hash; level} -> (hash, level))
(fun (hash, level) -> {hash; level})
(obj2 (req "hash" Block_hash.encoding) (req "level" Data_encoding.int32))
let {hash; level; header = _} = {hash; level}
module Blocks_cache =
Aches_lwt.Lache.Make_result
(Aches.Rache.Transfer (Aches.Rache.LRU) (Block_hash))
type block = ..
type fetch_block_rpc =
Client_context.full ->
?metadata:[`Always | `Never] ->
?chain:Tezos_shell_services.Block_services.chain ->
?block:Tezos_shell_services.Block_services.block ->
unit ->
block tzresult Lwt.t
type blocks_cache = (block, tztrace) Blocks_cache.t
open Octez_crawler.Layer_1
type nonrec t = {
l1 : t;
cctxt : Client_context.full;
blocks_cache : blocks_cache;
(** Global blocks cache for the smart rollup node. *)
headers_cache : headers_cache;
(** Global block headers cache for the smart rollup node. *)
prefetch_blocks : int; (** Number of blocks to prefetch by default. *)
}
let raw_l1_connection {l1; _} = l1
let start ~name ~reconnection_delay ~l1_blocks_cache_size ?protocols
?(prefetch_blocks = l1_blocks_cache_size) cctxt =
let open Lwt_result_syntax in
let*? () =
if prefetch_blocks > l1_blocks_cache_size then
error_with
"Blocks to prefetch must be less than the cache size: %d"
l1_blocks_cache_size
else Ok ()
in
let*! l1 = start ~name ~reconnection_delay ?protocols cctxt in
let blocks_cache = Blocks_cache.create l1_blocks_cache_size in
let = Blocks_cache.create l1_blocks_cache_size in
let cctxt = (cctxt :> Client_context.full) in
return {l1; cctxt; blocks_cache; headers_cache; prefetch_blocks}
let create ~name ~reconnection_delay ~l1_blocks_cache_size ?protocols
?(prefetch_blocks = l1_blocks_cache_size) cctxt =
let open Result_syntax in
let* () =
if prefetch_blocks > l1_blocks_cache_size then
error_with
"Blocks to prefetch must be less than the cache size: %d"
l1_blocks_cache_size
else Ok ()
in
let l1 = create ~name ~reconnection_delay ?protocols cctxt in
let blocks_cache = Blocks_cache.create l1_blocks_cache_size in
let = Blocks_cache.create l1_blocks_cache_size in
let cctxt = (cctxt :> Client_context.full) in
return {l1; cctxt; blocks_cache; headers_cache; prefetch_blocks}
let shutdown {l1; _} = shutdown l1
let {; _} hash =
Blocks_cache.put headers_cache hash (Lwt.return_ok header)
let client_context {cctxt; _} = cctxt
let iter_heads ?name l1_ctxt f =
iter_heads ?name l1_ctxt.l1
@@ fun (hash, {shell = {level; _} as ; _}) ->
cache_shell_header l1_ctxt hash header ;
f {hash; level; header}
let wait_first l1_ctxt =
let open Lwt_syntax in
let+ hash, {shell = {level; _} as ; _} = wait_first l1_ctxt.l1 in
{hash; level; header}
let get_latest_head l1_ctxt =
Option.map
(fun (hash, {Tezos_base.Block_header.shell = {level; _} as ; _}) ->
{hash; level; header})
(get_latest_head l1_ctxt.l1)
let get_predecessor_opt ?max_read {l1; _} = get_predecessor_opt ?max_read l1
let get_predecessor ?max_read {l1; _} = get_predecessor ?max_read l1
let get_tezos_reorg_for_new_head {l1; _} ?get_old_predecessor old_head new_head
=
get_tezos_reorg_for_new_head l1 ?get_old_predecessor old_head new_head
module Internal_for_tests = struct
let dummy cctxt =
{
l1 = Internal_for_tests.dummy cctxt;
cctxt = (cctxt :> Client_context.full);
blocks_cache = Blocks_cache.create 1;
headers_cache = Blocks_cache.create 1;
prefetch_blocks = 0;
}
end
(**
Helpers
=======
*)
(** [fetch_tezos_block cctxt hash] returns a block shell header of
[hash]. Looks for the block in the blocks cache first, and fetches it from
the L1 node otherwise. *)
let {cctxt; ; _} hash =
trace (Cannot_find_block hash)
@@
let fetch hash =
Tezos_shell_services.Shell_services.Blocks.Header.shell_header
cctxt
~chain:`Main
~block:(`Hash (hash, 0))
()
in
Blocks_cache.bind_or_put headers_cache hash fetch Lwt.return
let fetch_block_no_cache (fetch : fetch_block_rpc)
({cctxt; blocks_cache; _} as l1_ctxt) hash =
let open Lwt_result_syntax in
trace (Cannot_find_block hash)
@@ let* block =
fetch cctxt ~chain:`Main ~block:(`Hash (hash, 0)) ~metadata:`Always ()
in
Blocks_cache.put blocks_cache hash (Lwt.return_ok block) ;
cache_shell_header l1_ctxt hash (extract_header block) ;
return block
(** [fetch_tezos_block cctxt fetch extract_header hash] returns a block info
given a block hash. Looks for the block in the blocks cache first, and
fetches it from the L1 node otherwise. *)
let fetch_tezos_block (fetch_rpc : fetch_block_rpc)
({cctxt; blocks_cache; _} as l1_ctxt) hash =
trace (Cannot_find_block hash)
@@
let open Lwt_result_syntax in
let fetch hash =
let* block =
fetch_rpc cctxt ~chain:`Main ~block:(`Hash (hash, 0)) ~metadata:`Always ()
in
cache_shell_header l1_ctxt hash (extract_header block) ;
return block
in
let*! block = Blocks_cache.bind_or_put blocks_cache hash fetch Lwt.return in
let is_of_expected_protocol =
match block with
| Error
(Tezos_rpc_http.RPC_client_errors.(
Request_failed {error = Unexpected_content _; _})
:: _) ->
false
| Error _ ->
true
| Ok block -> (
try
let (_ : Block_header.shell_header) = extract_header block in
true
with _ ->
false)
in
if is_of_expected_protocol then Lwt.return block
else
fetch_block_no_cache fetch_rpc extract_header l1_ctxt hash
let make_prefetching_schedule {prefetch_blocks; _} blocks =
let blocks_with_prefetching, _, first_prefetch =
List.fold_left
(fun (acc, nb_prefetch, prefetch) b ->
let nb_prefetch = nb_prefetch + 1 in
let prefetch = b :: prefetch in
if nb_prefetch >= prefetch_blocks then ((b, prefetch) :: acc, 0, [])
else ((b, []) :: acc, nb_prefetch, prefetch))
([], 0, [])
(List.rev blocks)
in
match (blocks_with_prefetching, first_prefetch) with
| [], _ | _, [] -> blocks_with_prefetching
| (first, _) :: rest, _ -> (first, first_prefetch) :: rest
let prefetch_tezos_blocks fetch l1_ctxt = function
| [] -> ()
| blocks ->
Lwt.async @@ fun () ->
List.iter_p
(fun {hash; _} ->
let open Lwt_syntax in
let+ _maybe_block =
fetch_tezos_block fetch extract_header l1_ctxt hash
in
())
blocks