Source file prevalidator.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
(** Minimal delay between two mempool advertisements *)
let advertisement_delay = 0.1
(** Argument that will be provided to {!Worker.MakeGroup} to create
the prevalidator worker. *)
module Name = struct
type t = Chain_id.t * Protocol_hash.t
let encoding = Data_encoding.tup2 Chain_id.encoding Protocol_hash.encoding
let base = ["prevalidator"]
let pp fmt (chain_id, proto_hash) =
Format.fprintf
fmt
"%a:%a"
Chain_id.pp_short
chain_id
Protocol_hash.pp_short
proto_hash
let equal (c1, p1) (c2, p2) =
Chain_id.equal c1 c2 && Protocol_hash.equal p1 p2
end
open Prevalidator_worker_state
(** A prevalidator instance, tailored to a specific protocol (even if
it is not visible in this module type). *)
module type T = sig
type types_state
val get_rpc_directory :
types_state -> types_state Tezos_rpc.Directory.t lazy_t
val name : Name.t
module Types : Worker_intf.TYPES with type state = types_state
module Worker :
Worker.T
with type ('a, 'b) Request.t = ('a, 'b) Request.t
and type Request.view = Request.view
and type Types.state = types_state
type worker = Worker.infinite Worker.queue Worker.t
val worker : worker Lazy.t
end
open Shell_operation
module Events = Prevalidator_events
module Classification = Prevalidator_classification
(** This module encapsulates pending operations to maintain them in two
different data structure and avoid coslty repetitive convertions when
handling batches in [classify_pending_operations]. *)
module Pending_ops = Prevalidator_pending_operations
(** Module encapsulating some types that are used both in production
and in tests. Having them in a module makes it possible to
[include] this module in {!Internal_for_tests} below and avoid
code duplication.
The raison d'etre of these records of functions is to be able to use
alternative implementations of all functions in tests.
The purpose of the {!Tools.tools} record is to abstract away from {!Store.chain_store}.
Under the hood [Store.chain_store] requires an Irmin store on disk,
which makes it impractical for fast testing: every test would need
to create a temporary folder on disk which doesn't scale well.
The purpose of the {!Tools.worker_tools} record is to abstract away
from the {!Worker} implementation. This implementation is overkill
for testing: we don't need asynchronicity and concurrency in our
pretty basic existing tests. Having this abstraction allows to get
away with a much simpler state machine model of execution and
to have simpler test setup. *)
module Tools = struct
(** Functions provided by {!Distributed_db} and {!Store.chain_store}
that are used in various places of the mempool. Gathered here so that we can test
the mempool without requiring a full-fledged [Distributed_db]/[Store.Chain_store]. *)
type tools = {
advertise_current_head : mempool:Mempool.t -> Store.Block.t -> unit;
(** [advertise_current_head mempool head] sends a
[Current_head (chain_id, head_header, mempool)] message to all known
active peers for the chain being considered. *)
chain_tools : Store.Block.t Classification.chain_tools;
(** Lower-level tools provided by {!Prevalidator_classification} *)
fetch :
?peer:P2p_peer.Id.t ->
?timeout:Time.System.Span.t ->
Operation_hash.t ->
Operation.t tzresult Lwt.t;
(** [fetch ?peer ?timeout oph] returns the value when it is known.
It can fail with [Requester.Timeout] if [timeout] is provided and the value
isn't known before the timeout expires. It can fail with [Requester.Cancel] if
the request is canceled. *)
read_block : Block_hash.t -> Store.Block.t tzresult Lwt.t;
(** [read_block bh] tries to read the block [bh] from the chain store. *)
send_get_current_head : ?peer:P2p_peer_id.t -> unit -> unit;
(** [send_get_current_head ?peer ()] sends a [Get_Current_head]
to a given peer, or to all known active peers for the chain considered.
Expected answer is a [Get_current_head] message *)
set_mempool : head:Block_hash.t -> Mempool.t -> unit tzresult Lwt.t;
(** [set_mempool ~head mempool] sets the [mempool] of
the [chain_store] of the chain considered. Does nothing if [head] differs
from current_head which might happen when a new head concurrently arrives just
before this operation is being called. *)
}
(** Abstraction over services implemented in production by {!Worker}
but implemented differently in tests.
Also see the enclosing module documentation as to why we have this record. *)
type worker_tools = {
push_request :
(unit, Empty.t) Prevalidator_worker_state.Request.t -> bool Lwt.t;
(** Adds a message to the queue. *)
push_request_now :
(unit, Empty.t) Prevalidator_worker_state.Request.t -> unit;
(** Adds a message to the queue immediately. *)
}
end
type 'prevalidation_t parameters = {
limits : Shell_limits.prevalidator_limits;
tools : Tools.tools;
flush :
head:Store.Block.t ->
timestamp:Time.Protocol.t ->
'prevalidation_t ->
'prevalidation_t tzresult Lwt.t;
(** Create a new empty prevalidation state, recycling some elements
of the provided previous prevalidation state. *)
}
(** The type needed for the implementation of [Make] below, but
* which is independent from the protocol. *)
type ('protocol_data, 'a) types_state_shell = {
classification : 'protocol_data Classification.t;
parameters : 'a parameters;
mutable predecessor : Store.Block.t;
mutable timestamp : Time.System.t;
mutable live_blocks : Block_hash.Set.t;
mutable live_operations : Operation_hash.Set.t;
mutable fetching : Operation_hash.Set.t;
(** An operation is in [fetching] while the ddb is actively
requesting it from peers. It is removed from it when the
operation arrives or if the request fails (e.g. timeout). *)
mutable pending : 'protocol_data Pending_ops.t;
mutable mempool : Mempool.t;
mutable advertisement : [`Pending of Mempool.t | `None];
mutable banned_operations : Operation_hash.Set.t;
worker : Tools.worker_tools;
}
let metrics = Shell_metrics.Mempool.init ["mempool"]
(** The concrete production instance of {!block_tools} *)
let block_tools : Store.Block.t Classification.block_tools =
{
bhash = Store.Block.hash;
operations = Store.Block.operations;
all_operation_hashes = Store.Block.all_operation_hashes;
}
(** How to create an instance of {!chain_tools} from a {!Distributed_db.chain_db}. *)
let mk_chain_tools (chain_db : Distributed_db.chain_db) :
Store.Block.t Classification.chain_tools =
let open Lwt_syntax in
let new_blocks ~from_block ~to_block =
let chain_store = Distributed_db.chain_store chain_db in
Store.Chain_traversal.new_blocks chain_store ~from_block ~to_block
in
let read_predecessor_opt block =
let chain_store = Distributed_db.chain_store chain_db in
Store.Block.read_predecessor_opt chain_store block
in
let inject_operation oph op =
let* _ = Distributed_db.inject_operation chain_db oph op in
Lwt.return_unit
in
{
clear_or_cancel = Distributed_db.Operation.clear_or_cancel chain_db;
inject_operation;
new_blocks;
read_predecessor_opt;
}
(** Module type used both in production and in tests. *)
module type S = sig
(** Type instantiated by {!Prevalidation.T.config}. *)
type config
(** Similar to the type [operation] from the protocol,
see {!Tezos_protocol_environment.PROTOCOL} *)
type protocol_operation
(** Type instantiated by {!Prevalidation.t} *)
type prevalidation_t
type types_state = {
shell : (protocol_operation, prevalidation_t) types_state_shell;
mutable validation_state : prevalidation_t;
(** Internal prevalidation state. Among others, this contains the
internal states of the protocol mempool and of the plugin. *)
mutable operation_stream :
(Classification.classification * protocol_operation operation)
Lwt_watcher.input;
mutable rpc_directory : types_state Tezos_rpc.Directory.t lazy_t;
mutable config : config;
lock : Lwt_mutex.t;
}
(** This function fetches an operation if it is not already handled
as defined by [already_handled] below. The implementation makes
sure to fetch an operation at most once, modulo operations
lost because of bounded buffers becoming full.
This function is an intruder to this module type. It just happens
that it is needed both by internals of the implementation of {!S}
and by the internals of the implementation of {!T}; so it needs
to be exposed here. *)
val may_fetch_operation :
(protocol_operation, prevalidation_t) types_state_shell ->
P2p_peer_id.t option ->
Operation_hash.t ->
unit
(** The function called after every call to a function of {!API}. *)
val handle_unprocessed : types_state -> unit Lwt.t
(** The inner API of the mempool i.e. functions called by the worker
when an individual request arrives. These functions are the
most high-level ones that we test. All these [on_*] functions
correspond to a single event. Possible
sequences of calls to this API are always of the form:
on_*; handle_unprocessed; on_*; handle_unprocessed; ... *)
module Requests : sig
val on_advertise : _ types_state_shell -> unit
val on_arrived :
types_state ->
Operation_hash.t ->
Operation.t ->
(unit, Empty.t) result Lwt.t
val on_ban : types_state -> Operation_hash.t -> unit tzresult Lwt.t
val on_flush :
handle_branch_refused:bool ->
types_state ->
Store.Block.t ->
Block_hash.Set.t ->
Operation_hash.Set.t ->
unit tzresult Lwt.t
val on_inject :
types_state -> force:bool -> Operation.t -> unit tzresult Lwt.t
val on_notify : _ types_state_shell -> P2p_peer_id.t -> Mempool.t -> unit
end
end
(** A functor for obtaining the testable part of this file (see
the instantiation of this functor in {!Internal_for_tests} at the
end of this file). Contrary to the production-only functor {!Make} below,
this functor doesn't assume a specific chain store implementation,
which is the crux for having it easily unit-testable. *)
module Make_s
(Proto : Protocol_plugin.T)
(Prevalidation_t : Prevalidation.T
with type protocol_operation = Proto.operation) :
S
with type config = Prevalidation_t.config
and type protocol_operation = Proto.operation
and type prevalidation_t = Prevalidation_t.t = struct
type config = Prevalidation_t.config
type protocol_operation = Proto.operation
type prevalidation_t = Prevalidation_t.t
type types_state = {
shell : (protocol_operation, prevalidation_t) types_state_shell;
mutable validation_state : prevalidation_t;
mutable operation_stream :
(Classification.classification * protocol_operation operation)
Lwt_watcher.input;
mutable rpc_directory : types_state Tezos_rpc.Directory.t lazy_t;
mutable config : config;
lock : Lwt_mutex.t;
}
let already_handled ~origin shell oph =
if Operation_hash.Set.mem oph shell.banned_operations then (
ignore
(Unit.catch_s (fun () ->
Events.(emit ban_operation_encountered) (origin, oph))) ;
true)
else
Classification.is_in_mempool oph shell.classification <> None
|| Operation_hash.Set.mem oph shell.live_operations
|| Pending_ops.mem oph shell.pending
|| Classification.is_known_unparsable oph shell.classification
let advertise (shell : ('operation_data, _) types_state_shell) mempool =
let open Lwt_syntax in
match shell.advertisement with
| `Pending {Mempool.known_valid; pending} ->
shell.advertisement <-
`Pending
{
known_valid =
Operation_hash.Set.union known_valid mempool.Mempool.known_valid;
pending = Operation_hash.Set.union pending mempool.pending;
}
| `None ->
shell.advertisement <- `Pending mempool ;
Lwt.dont_wait
(fun () ->
let* () = Lwt_unix.sleep advertisement_delay in
shell.worker.push_request_now Advertise ;
Lwt.return_unit)
(fun exc ->
Format.eprintf "Uncaught exception: %s\n%!" (Printexc.to_string exc))
let handle_classification
~(notifier :
Classification.classification -> protocol_operation operation -> unit)
shell (op, kind) =
Classification.add kind op shell.classification ;
notifier kind op
let mk_notifier operation_stream classification op =
Lwt_watcher.notify operation_stream (classification, op)
type pre_filter_result = Drop | Priority of Pending_ops.priority
let pre_filter pv ~notifier parsed_op : pre_filter_result Lwt.t =
let open Lwt_syntax in
let+ v =
Prevalidation_t.pre_filter pv.validation_state pv.config parsed_op
in
match v with
| (`Branch_delayed _ | `Branch_refused _ | `Refused _ | `Outdated _) as errs
->
handle_classification ~notifier pv.shell (parsed_op, errs) ;
Drop
| `Passed_prefilter priority -> Priority priority
let set_mempool shell mempool =
shell.mempool <- mempool ;
shell.parameters.tools.set_mempool
~head:(Store.Block.hash shell.predecessor)
shell.mempool
let remove_from_advertisement oph = function
| `Pending mempool -> `Pending (Mempool.remove oph mempool)
| `None -> `None
let reclassify_replaced_manager_op old_hash shell
(replacement_classification : [< Classification.error_classification]) =
shell.advertisement <-
remove_from_advertisement old_hash shell.advertisement ;
match Classification.remove old_hash shell.classification with
| Some (op, _class) ->
Some (op, (replacement_classification :> Classification.classification))
| None ->
shell.parameters.tools.chain_tools.clear_or_cancel old_hash ;
None
let classify_operation shell ~config ~validation_state
(status_and_priority : Pending_ops.status_and_priority) op :
(prevalidation_t
* (Operation_hash.t * bool) option
* (protocol_operation operation * Classification.classification) trace)
Lwt.t =
let open Lwt_syntax in
let* v_state, op, classification, replacements =
Prevalidation_t.add_operation validation_state config op
in
let to_replace =
List.filter_map
(fun (replaced_oph, new_classification) ->
reclassify_replaced_manager_op replaced_oph shell new_classification)
replacements
in
let to_handle = (op, classification) :: to_replace in
let validated_operation =
match classification with
| `Validated ->
let is_advertisable =
match
(status_and_priority.status, status_and_priority.priority)
with
| Fresh, _ | Reclassified, High -> true
| Reclassified, Medium | Reclassified, Low _ ->
false
in
Some (op.hash, is_advertisable)
| `Branch_refused _ | `Branch_delayed _ | `Refused _ | `Outdated _ -> None
in
return (v_state, validated_operation, to_handle)
let classify_pending_operations ~notifier shell config state =
let open Lwt_syntax in
let* r =
Pending_ops.fold_es
(fun status_and_priority
oph
op
( acc_validation_state,
advertisable_mempool,
validated_mempool,
limit ) ->
if limit <= 0 then
Lwt.return_error
(acc_validation_state, advertisable_mempool, validated_mempool)
else (
shell.pending <- Pending_ops.remove oph shell.pending ;
let* new_validation_state, validated_operation, to_handle =
classify_operation
shell
~config
~validation_state:acc_validation_state
status_and_priority
op
in
let+ () = Events.(emit operation_reclassified) oph in
List.iter (handle_classification ~notifier shell) to_handle ;
let advertisable_mempool, validated_mempool =
match validated_operation with
| None -> (advertisable_mempool, validated_mempool)
| Some (oph, true) ->
( Mempool.cons_valid oph advertisable_mempool,
Mempool.cons_valid oph validated_mempool )
| Some (oph, false) ->
( advertisable_mempool,
Mempool.cons_valid oph validated_mempool )
in
Ok
( new_validation_state,
advertisable_mempool,
validated_mempool,
limit - 1 )))
shell.pending
( state,
Mempool.empty,
Mempool.empty,
shell.parameters.limits.operations_batch_size )
in
match r with
| Error (state, advertisable_mempool, validated_mempool) ->
let* (_was_pushed : bool) =
shell.worker.push_request Request.Leftover
in
Lwt.return (state, advertisable_mempool, validated_mempool)
| Ok (state, advertisable_mempool, validated_mempool, _) ->
Lwt.return (state, advertisable_mempool, validated_mempool)
let update_advertised_mempool_fields pv_shell advertisable_mempool
validated_mempool =
let open Lwt_syntax in
if not (Mempool.is_empty advertisable_mempool) then
advertise pv_shell advertisable_mempool ;
if Mempool.is_empty validated_mempool then Lwt.return_unit
else
let our_mempool =
let known_valid =
Operation_hash.Set.union
validated_mempool.known_valid
pv_shell.mempool.known_valid
in
{Mempool.known_valid; pending = Pending_ops.hashes pv_shell.pending}
in
let* _res = set_mempool pv_shell our_mempool in
Lwt.pause ()
let handle_unprocessed pv =
let open Lwt_syntax in
let notifier = mk_notifier pv.operation_stream in
if Pending_ops.is_empty pv.shell.pending then Lwt.return_unit
else
let* () = Events.(emit processing_operations) () in
let* validation_state, advertisable_mempool, validated_mempool =
classify_pending_operations
~notifier
pv.shell
pv.config
pv.validation_state
in
pv.validation_state <- validation_state ;
update_advertised_mempool_fields
pv.shell
advertisable_mempool
validated_mempool
let fetch_operation ~notify_arrival
(shell : ('operation_data, _) types_state_shell) ?peer oph =
let open Lwt_syntax in
let* () = Events.(emit fetching_operation) oph in
let* r =
protect @@ fun () ->
shell.parameters.tools.fetch
~timeout:shell.parameters.limits.operation_timeout
?peer
oph
in
match r with
| Ok op ->
if notify_arrival then shell.worker.push_request_now (Arrived (oph, op)) ;
Lwt.return_unit
| Error err -> (
if notify_arrival then
shell.fetching <- Operation_hash.Set.remove oph shell.fetching ;
match err with
| Distributed_db.Operation.Canceled _ :: _ ->
Events.(emit operation_included) oph
| _ ->
Events.(emit operation_not_fetched) oph)
let may_fetch_operation (shell : ('operation_data, _) types_state_shell) peer
oph =
let origin =
match peer with Some peer -> Events.Peer peer | None -> Leftover
in
let spawn_fetch_operation ~notify_arrival =
ignore
(Unit.catch_s (fun () ->
fetch_operation ~notify_arrival shell ?peer oph))
in
if Operation_hash.Set.mem oph shell.fetching then
spawn_fetch_operation ~notify_arrival:false
else if not (already_handled ~origin shell oph) then (
shell.fetching <- Operation_hash.Set.add oph shell.fetching ;
spawn_fetch_operation ~notify_arrival:true)
(** Module containing functions that are the internal transitions
of the mempool. These functions are called by the {!Worker} when
an event arrives. *)
module Requests = struct
module Parser = MakeParser (Proto)
let on_arrived (pv : types_state) oph op : (unit, Empty.t) result Lwt.t =
let open Lwt_syntax in
pv.shell.fetching <- Operation_hash.Set.remove oph pv.shell.fetching ;
if already_handled ~origin:Events.Arrived pv.shell oph then return_ok_unit
else
match Parser.parse oph op with
| Error _ ->
let* () = Events.(emit unparsable_operation) oph in
Prevalidator_classification.add_unparsable
oph
pv.shell.classification ;
return_ok_unit
| Ok parsed_op -> (
let* v =
pre_filter
pv
~notifier:(mk_notifier pv.operation_stream)
parsed_op
in
match v with
| Drop -> return_ok_unit
| Priority ((High | Medium | Low _) as priority) ->
if
not
(Block_hash.Set.mem
op.Operation.shell.branch
pv.shell.live_blocks)
then (
pv.shell.parameters.tools.chain_tools.clear_or_cancel oph ;
return_ok_unit)
else (
pv.shell.pending <-
Pending_ops.(
add parsed_op {status = Fresh; priority} pv.shell.pending) ;
return_ok_unit))
let on_inject (pv : types_state) ~force op =
let open Lwt_result_syntax in
let oph = Operation.hash op in
let status_and_priority = Pending_ops.{status = Fresh; priority = High} in
if already_handled ~origin:Events.Injected pv.shell oph then
return_unit
else
match Parser.parse oph op with
| Error err ->
failwith
"Invalid operation %a: %a."
Operation_hash.pp
oph
Error_monad.pp_print_trace
err
| Ok parsed_op -> (
if force then (
let*! () =
pv.shell.parameters.tools.chain_tools.inject_operation oph op
in
pv.shell.pending <-
Pending_ops.add parsed_op status_and_priority pv.shell.pending ;
let*! () = Events.(emit operation_injected) oph in
return_unit)
else if
not
(Block_hash.Set.mem
op.Operation.shell.branch
pv.shell.live_blocks)
then
failwith
"Operation %a is branched on either:\n\
\ - a block %a which is too old (%d blocks in the past)\n\
\ - a predecessor block from an alternative branch which is \
now unknown"
Operation_hash.pp
oph
Block_hash.pp
op.Operation.shell.branch
(Block_hash.Set.cardinal pv.shell.live_blocks)
else
let notifier = mk_notifier pv.operation_stream in
let*! validation_state, validated_operation, to_handle =
classify_operation
pv.shell
~config:pv.config
~validation_state:pv.validation_state
status_and_priority
parsed_op
in
let op_status =
List.find_opt
(function
| ({hash; _} : protocol_operation operation), _ ->
Operation_hash.equal hash oph)
to_handle
in
match op_status with
| Some (_h, `Validated) ->
let*! () =
pv.shell.parameters.tools.chain_tools.inject_operation
oph
op
in
List.iter (handle_classification ~notifier pv.shell) to_handle ;
pv.validation_state <- validation_state ;
let*! () =
match validated_operation with
| None -> Lwt.return_unit
| Some (oph, is_advertisable) ->
update_advertised_mempool_fields
pv.shell
(if is_advertisable then
Mempool.cons_valid oph Mempool.empty
else Mempool.empty)
(Mempool.cons_valid oph Mempool.empty)
in
let*! () = Events.(emit operation_injected) oph in
return_unit
| Some
( _h,
( `Branch_delayed e
| `Branch_refused e
| `Refused e
| `Outdated e ) ) ->
Lwt.return
@@ error_with
"Error while validating injected operation %a:@ %a"
Operation_hash.pp
oph
pp_print_trace
e
| None ->
failwith
"Unexpected error while injecting operation %a. Operation \
not found after classifying it."
Operation_hash.pp
oph)
let on_notify (shell : ('operation_data, _) types_state_shell) peer mempool
=
let may_fetch_operation = may_fetch_operation shell (Some peer) in
let () =
Operation_hash.Set.iter may_fetch_operation mempool.Mempool.known_valid
in
Seq.iter
may_fetch_operation
(Operation_hash.Set.to_seq mempool.Mempool.pending)
let on_flush ~handle_branch_refused pv new_predecessor new_live_blocks
new_live_operations =
let open Lwt_result_syntax in
let old_predecessor = pv.shell.predecessor in
pv.shell.predecessor <- new_predecessor ;
pv.shell.live_blocks <- new_live_blocks ;
pv.shell.live_operations <- new_live_operations ;
Lwt_watcher.shutdown_input pv.operation_stream ;
pv.operation_stream <- Lwt_watcher.create_input () ;
let timestamp_system = Tezos_base.Time.System.now () in
pv.shell.timestamp <- timestamp_system ;
let timestamp = Time.System.to_protocol timestamp_system in
let* validation_state =
pv.shell.parameters.flush
~head:new_predecessor
~timestamp
pv.validation_state
in
pv.validation_state <- validation_state ;
let*! new_pending_operations =
Classification.recycle_operations
~from_branch:old_predecessor
~to_branch:new_predecessor
~live_blocks:new_live_blocks
~parse:(fun oph op -> Result.to_option (Parser.parse oph op))
~classes:pv.shell.classification
~pending:(Pending_ops.operations pv.shell.pending)
~block_store:block_tools
~chain:pv.shell.parameters.tools.chain_tools
~handle_branch_refused
in
let*! new_pending_operations, nb_pending =
Operation_hash.Map.fold_s
(fun oph op (pending, nb_pending) ->
let*! v =
pre_filter pv ~notifier:(mk_notifier pv.operation_stream) op
in
match v with
| Drop -> Lwt.return (pending, nb_pending)
| Priority ((High | Medium | Low _) as priority) ->
let status =
if Pending_ops.mem oph pv.shell.pending then Pending_ops.Fresh
else Reclassified
in
Lwt.return
(Pending_ops.add op {status; priority} pending, nb_pending + 1))
new_pending_operations
(Pending_ops.empty, 0)
in
let*! () = Events.(emit operations_to_reclassify) nb_pending in
pv.shell.pending <- new_pending_operations ;
set_mempool pv.shell Mempool.empty
let on_advertise (shell : ('protocol_data, _) types_state_shell) =
match shell.advertisement with
| `None ->
()
| `Pending mempool ->
shell.advertisement <- `None ;
if not (Mempool.is_empty mempool) then
shell.parameters.tools.advertise_current_head
~mempool
shell.predecessor
let remove ~flush_if_validated pv oph =
let open Lwt_result_syntax in
pv.shell.parameters.tools.chain_tools.clear_or_cancel oph ;
pv.shell.advertisement <-
remove_from_advertisement oph pv.shell.advertisement ;
pv.shell.banned_operations <-
Operation_hash.Set.add oph pv.shell.banned_operations ;
match Classification.remove oph pv.shell.classification with
| None ->
pv.shell.pending <- Pending_ops.remove oph pv.shell.pending ;
pv.shell.fetching <- Operation_hash.Set.remove oph pv.shell.fetching ;
return_unit
| Some (_op, classification) -> (
match (classification, flush_if_validated) with
| `Validated, true ->
let+ () =
on_flush
~handle_branch_refused:false
pv
pv.shell.predecessor
pv.shell.live_blocks
pv.shell.live_operations
in
pv.shell.pending <- Pending_ops.remove oph pv.shell.pending
| `Branch_delayed _, _
| `Branch_refused _, _
| `Refused _, _
| `Outdated _, _
| `Validated, false ->
pv.validation_state <-
Prevalidation_t.remove_operation pv.validation_state oph ;
return_unit)
let on_ban pv oph_to_ban =
let open Lwt_result_syntax in
pv.shell.banned_operations <-
Operation_hash.Set.add oph_to_ban pv.shell.banned_operations ;
let* res = remove ~flush_if_validated:true pv oph_to_ban in
let*! () = Events.(emit operation_banned) oph_to_ban in
return res
end
end
module type ARG = sig
val limits : Shell_limits.prevalidator_limits
val chain_db : Distributed_db.chain_db
val chain_id : Chain_id.t
val tools : Tools.tools
end
module WorkerGroup = Worker.MakeGroup (Name) (Prevalidator_worker_state.Request)
(** The functor that is not tested, in other words used only in production.
This functor's code is not tested (contrary to functor {!Make_s} above),
because it hardcodes a dependency to [Store.chain_store] in its instantiation
of type [chain_store]. This is what makes the code of this functor
not testable for the moment, because [Store.chain_store] has poor
testing capabilities.
Note that, because this functor [include]s {!Make_s}, it is a
strict extension of [Make_s]. *)
module Make
(Proto : Protocol_plugin.T)
(Arg : ARG)
(Prevalidation_t : Prevalidation.T
with type protocol_operation = Proto.operation
and type chain_store = Store.chain_store) : T = struct
module S = Make_s (Proto) (Prevalidation_t)
open S
type types_state = S.types_state
let get_rpc_directory pv = pv.rpc_directory
let name = (Arg.chain_id, Proto.hash)
module Types = struct
type state = types_state
type parameters = Shell_limits.prevalidator_limits * Distributed_db.chain_db
end
module Worker :
Worker.T
with type Name.t = Name.t
and type ('a, 'b) Request.t = ('a, 'b) Request.t
and type Request.view = Request.view
and type Types.state = Types.state
and type Types.parameters = Types.parameters =
WorkerGroup.MakeWorker (Types)
open Types
type worker = Worker.infinite Worker.queue Worker.t
(** Return a json describing the prevalidator's [config].
The boolean [include_default] ([true] by default) indicates
whether the json should include the fields which have a value
equal to their default value. *)
let get_config_json ?(include_default = true) pv =
let include_default_fields = if include_default then `Always else `Never in
Data_encoding.Json.construct
~include_default_fields
Prevalidation_t.config_encoding
pv.config
let filter_validation_passes allowed_validation_passes
(op : protocol_operation) =
match allowed_validation_passes with
| [] -> true
| validation_passes -> (
match Proto.acceptable_pass op with
| None -> false
| Some validation_pass ->
List.mem ~equal:Compare.Int.equal validation_pass validation_passes)
let build_rpc_directory w =
lazy
(let open Lwt_result_syntax in
let dir : state Tezos_rpc.Directory.t ref =
ref Tezos_rpc.Directory.empty
in
let module Proto_services = Block_services.Make (Proto) (Proto) in
dir :=
Tezos_rpc.Directory.register
!dir
(Proto_services.S.Mempool.get_filter Tezos_rpc.Path.open_root)
(fun pv params () ->
return (get_config_json ~include_default:params#include_default pv)) ;
dir :=
Tezos_rpc.Directory.register
!dir
(Proto_services.S.Mempool.set_filter Tezos_rpc.Path.open_root)
(fun pv () obj ->
let open Lwt_syntax in
let* () =
try
let config =
Data_encoding.Json.destruct
Prevalidation_t.config_encoding
obj
in
pv.config <- config ;
Lwt.return_unit
with _ -> Events.(emit invalid_mempool_filter_configuration) ()
in
return_ok (get_config_json pv)) ;
dir :=
Tezos_rpc.Directory.register
!dir
(Proto_services.S.Mempool.ban_operation Tezos_rpc.Path.open_root)
(fun _pv () oph ->
let open Lwt_result_syntax in
let*! r = Worker.Queue.push_request_and_wait w (Request.Ban oph) in
match r with
| Error (Closed None) -> fail [Worker_types.Terminated]
| Error (Closed (Some errs)) -> fail errs
| Error (Request_error err) -> fail err
| Error (Any exn) -> fail [Exn exn]
| Ok () -> return_unit) ;
dir :=
Tezos_rpc.Directory.register
!dir
(Proto_services.S.Mempool.unban_operation Tezos_rpc.Path.open_root)
(fun pv () oph ->
pv.shell.banned_operations <-
Operation_hash.Set.remove oph pv.shell.banned_operations ;
return_unit) ;
dir :=
Tezos_rpc.Directory.register
!dir
(Proto_services.S.Mempool.unban_all_operations
Tezos_rpc.Path.open_root)
(fun pv () () ->
pv.shell.banned_operations <- Operation_hash.Set.empty ;
return_unit) ;
dir :=
Tezos_rpc.Directory.gen_register
!dir
(Proto_services.S.Mempool.pending_operations Tezos_rpc.Path.open_root)
(fun pv params () ->
let validated =
if params#validated then
Classification.Sized_map.to_map
pv.shell.classification.validated
|> Operation_hash.Map.to_seq
|> Seq.filter_map (fun (oph, op) ->
if
filter_validation_passes
params#validation_passes
op.protocol
then Some (oph, op.protocol)
else None)
|> List.of_seq
else []
in
let process_map map =
let open Operation_hash in
Map.filter_map
(fun _oph (op, error) ->
if
filter_validation_passes
params#validation_passes
op.protocol
then Some (op.protocol, error)
else None)
map
in
let refused =
if params#refused then
process_map (Classification.map pv.shell.classification.refused)
else Operation_hash.Map.empty
in
let outdated =
if params#outdated then
process_map
(Classification.map pv.shell.classification.outdated)
else Operation_hash.Map.empty
in
let branch_refused =
if params#branch_refused then
process_map
(Classification.map pv.shell.classification.branch_refused)
else Operation_hash.Map.empty
in
let branch_delayed =
if params#branch_delayed then
process_map
(Classification.map pv.shell.classification.branch_delayed)
else Operation_hash.Map.empty
in
let unprocessed =
Operation_hash.Map.filter_map
(fun _ {protocol; _} ->
if filter_validation_passes params#validation_passes protocol
then Some protocol
else None)
(Pending_ops.operations pv.shell.pending)
in
let pending_operations =
{
Proto_services.Mempool.validated;
refused;
outdated;
branch_refused;
branch_delayed;
unprocessed;
}
in
Tezos_rpc.Answer.return (params#version, pending_operations)) ;
dir :=
Tezos_rpc.Directory.register
!dir
(Proto_services.S.Mempool.request_operations Tezos_rpc.Path.open_root)
(fun pv t () ->
pv.shell.parameters.tools.send_get_current_head ?peer:t#peer_id () ;
return_unit) ;
dir :=
Tezos_rpc.Directory.gen_register
!dir
(Proto_services.S.Mempool.monitor_operations Tezos_rpc.Path.open_root)
(fun pv params () ->
Lwt_mutex.with_lock pv.lock @@ fun () ->
let op_stream, stopper =
Lwt_watcher.create_stream pv.operation_stream
in
let validated_seq =
if params#validated then
Classification.Sized_map.to_map
pv.shell.classification.validated
|> Operation_hash.Map.to_seq
|> Seq.map (fun (hash, {protocol; _}) ->
((hash, protocol), None))
else Seq.empty
in
let process_error_map map =
let open Operation_hash in
map |> Map.to_seq
|> Seq.map (fun (hash, (op, error)) ->
((hash, op.protocol), Some error))
in
let refused_seq =
if params#refused then
process_error_map
(Classification.map pv.shell.classification.refused)
else Seq.empty
in
let branch_refused_seq =
if params#branch_refused then
process_error_map
(Classification.map pv.shell.classification.branch_refused)
else Seq.empty
in
let branch_delayed_seq =
if params#branch_delayed then
process_error_map
(Classification.map pv.shell.classification.branch_delayed)
else Seq.empty
in
let outdated_seq =
if params#outdated then
process_error_map
(Classification.map pv.shell.classification.outdated)
else Seq.empty
in
let filter ((_, op), _) =
filter_validation_passes params#validation_passes op
in
let current_mempool =
Seq.append outdated_seq branch_delayed_seq
|> Seq.append branch_refused_seq
|> Seq.append refused_seq |> Seq.append validated_seq
|> Seq.filter filter |> List.of_seq
in
let current_mempool = ref (Some current_mempool) in
let filter_result = function
| `Validated -> params#validated
| `Refused _ -> params#refused
| `Outdated _ -> params#outdated
| `Branch_refused _ -> params#branch_refused
| `Branch_delayed _ -> params#branch_delayed
in
let rec next () =
let open Lwt_syntax in
match !current_mempool with
| Some mempool ->
current_mempool := None ;
Lwt.return_some (params#version, mempool)
| None -> (
let* o = Lwt_stream.get op_stream in
match o with
| Some (kind, op)
when filter_result kind
&& filter_validation_passes
params#validation_passes
op.protocol ->
let errors =
match kind with
| `Validated -> None
| `Branch_delayed errors
| `Branch_refused errors
| `Refused errors
| `Outdated errors ->
Some errors
in
Lwt.return_some
(params#version, [((op.hash, op.protocol), errors)])
| Some _ -> next ()
| None -> Lwt.return_none)
in
let shutdown () = Lwt_watcher.shutdown stopper in
Tezos_rpc.Answer.return_stream {next; shutdown}) ;
!dir)
(** Module implementing the events at the {!Worker} level. Contrary
to {!Requests}, these functions depend on [Worker]. *)
module Handlers = struct
type self = worker
let on_request :
type r request_error.
worker ->
(r, request_error) Request.t ->
(r, request_error) result Lwt.t =
fun w request ->
let open Lwt_result_syntax in
Prometheus.Counter.inc_one metrics.worker_counters.worker_request_count ;
let pv = Worker.state w in
let post_processing :
(r, request_error) result Lwt.t -> (r, request_error) result Lwt.t =
fun r ->
let open Lwt_syntax in
let* () = handle_unprocessed pv in
r
in
post_processing
@@
match request with
| Request.Flush (hash, event, live_blocks, live_operations) ->
Requests.on_advertise pv.shell ;
let* block = pv.shell.parameters.tools.read_block hash in
let handle_branch_refused =
Chain_validator_worker_state.(
match event with
| Head_increment | Ignored_head -> false
| Branch_switch -> true)
in
Lwt_mutex.with_lock pv.lock
@@ fun () : (r, error trace) result Lwt.t ->
Requests.on_flush
~handle_branch_refused
pv
block
live_blocks
live_operations
| Request.Notify (peer, mempool) ->
Requests.on_notify pv.shell peer mempool ;
return_unit
| Request.Leftover ->
return_unit
| Request.Inject {op; force} -> Requests.on_inject pv ~force op
| Request.Arrived (oph, op) -> Requests.on_arrived pv oph op
| Request.Advertise ->
Requests.on_advertise pv.shell ;
return_unit
| Request.Ban oph -> Requests.on_ban pv oph
let on_close w =
let pv = Worker.state w in
Lwt_watcher.shutdown_input pv.operation_stream ;
Operation_hash.Set.iter
pv.shell.parameters.tools.chain_tools.clear_or_cancel
pv.shell.fetching ;
Lwt.return_unit
let mk_worker_tools w : Tools.worker_tools =
let push_request r = Worker.Queue.push_request w r in
let push_request_now r = Worker.Queue.push_request_now w r in
{push_request; push_request_now}
type launch_error = error trace
let on_launch w _ (limits, chain_db) : (state, launch_error) result Lwt.t =
let open Lwt_result_syntax in
let chain_store = Distributed_db.chain_store chain_db in
let flush = Prevalidation_t.flush (Distributed_db.chain_store chain_db) in
let*! head = Store.Chain.current_head chain_store in
let*! mempool = Store.Chain.mempool chain_store in
let*! live_blocks, live_operations =
Store.Chain.live_blocks chain_store
in
let timestamp_system = Tezos_base.Time.System.now () in
let timestamp = Time.System.to_protocol timestamp_system in
let* validation_state =
Prevalidation_t.create chain_store ~head ~timestamp
in
let fetching = mempool.known_valid in
let classification_parameters =
Classification.
{
map_size_limit = limits.Shell_limits.max_refused_operations;
on_discarded_operation =
Distributed_db.Operation.clear_or_cancel chain_db;
}
in
let classification = Classification.create classification_parameters in
let parameters = {limits; tools = Arg.tools; flush} in
let shell =
{
classification;
parameters;
predecessor = head;
timestamp = timestamp_system;
live_blocks;
live_operations;
mempool = Mempool.empty;
fetching;
pending = Pending_ops.empty;
advertisement = `None;
banned_operations = Operation_hash.Set.empty;
worker = mk_worker_tools w;
}
in
Shell_metrics.Mempool.set_validated_collector (fun () ->
Prevalidator_classification.Sized_map.cardinal
shell.classification.validated
|> float_of_int) ;
Shell_metrics.Mempool.set_refused_collector (fun () ->
Prevalidator_classification.cardinal shell.classification.refused
|> float_of_int) ;
Shell_metrics.Mempool.set_branch_refused_collector (fun () ->
Prevalidator_classification.cardinal
shell.classification.branch_refused
|> float_of_int) ;
Shell_metrics.Mempool.set_branch_delayed_collector (fun () ->
Prevalidator_classification.cardinal
shell.classification.branch_delayed
|> float_of_int) ;
Shell_metrics.Mempool.set_outdated_collector (fun () ->
Prevalidator_classification.cardinal shell.classification.outdated
|> float_of_int) ;
Shell_metrics.Mempool.set_unprocessed_collector (fun () ->
Prevalidator_pending_operations.cardinal shell.pending |> float_of_int) ;
let pv =
{
shell;
validation_state;
operation_stream = Lwt_watcher.create_input ();
rpc_directory = build_rpc_directory w;
config =
Prevalidation_t.default_config;
lock = Lwt_mutex.create ();
}
in
Seq.iter
(may_fetch_operation pv.shell None)
(Operation_hash.Set.to_seq fetching) ;
return pv
let on_error (type a b) _w st (request : (a, b) Request.t) (errs : b) :
unit tzresult Lwt.t =
Prometheus.Counter.inc_one metrics.worker_counters.worker_error_count ;
let open Lwt_syntax in
match request with
| Request.(Inject _) as r ->
let* () = Events.(emit request_failed) (Request.view r, st, errs) in
return_ok_unit
| Request.Notify _ -> ( match errs with _ -> .)
| Request.Leftover -> ( match errs with _ -> .)
| Request.Arrived _ -> ( match errs with _ -> .)
| Request.Advertise -> ( match errs with _ -> .)
| Request.Flush _ ->
let request_view = Request.view request in
let* () = Events.(emit request_failed) (request_view, st, errs) in
Lwt.return_error errs
| Request.Ban _ ->
let request_view = Request.view request in
let* () = Events.(emit request_failed) (request_view, st, errs) in
Lwt.return_error errs
let on_completion _w r _ st =
Prometheus.Counter.inc_one metrics.worker_counters.worker_completion_count ;
match Request.view r with
| View (Inject _) | View (Ban _) | Request.View (Flush _) ->
Events.(emit request_completed_info) (Request.view r, st)
| View (Notify _) | View Leftover | View (Arrived _) | View Advertise ->
Events.(emit request_completed_debug) (Request.view r, st)
let on_no_request _ = Lwt.return_unit
end
let table = Worker.create_table Queue
let worker_promise =
Worker.launch table name (Arg.limits, Arg.chain_db) (module Handlers)
let worker =
lazy
(match Lwt.state worker_promise with
| Lwt.Return (Ok worker) -> worker
| Lwt.Return (Error _) | Lwt.Fail _ | Lwt.Sleep -> assert false)
end
let mk_tools chain_db : Tools.tools =
let advertise_current_head ~mempool bh =
Distributed_db.Advertise.current_head chain_db ~mempool bh
in
let chain_tools = mk_chain_tools chain_db in
let fetch ?peer ?timeout oph =
Distributed_db.Operation.fetch chain_db ?timeout ?peer oph ()
in
let read_block bh =
let chain_store = Distributed_db.chain_store chain_db in
Store.Block.read_block chain_store bh
in
let send_get_current_head ?peer () =
match peer with
| None -> Distributed_db.Request.current_head_from_all chain_db
| Some peer -> Distributed_db.Request.current_head_from_peer chain_db peer
in
let set_mempool ~head mempool =
let chain_store = Distributed_db.chain_store chain_db in
Store.Chain.set_mempool chain_store ~head mempool
in
{
advertise_current_head;
chain_tools;
fetch;
read_block;
send_get_current_head;
set_mempool;
}
let make limits chain_db chain_id tools (module Proto : Protocol_plugin.T) =
let module Prevalidation_t = Prevalidation.Make (Proto) in
let module Prevalidator =
Make
(Proto)
(struct
let limits = limits
let chain_db = chain_db
let chain_id = chain_id
let tools = tools
end)
(Prevalidation_t)
in
(module Prevalidator : T)
module ChainProto_registry = Map.Make (struct
type t = Chain_id.t * Protocol_hash.t
let compare (c1, p1) (c2, p2) =
let pc = Protocol_hash.compare p1 p2 in
if pc = 0 then Chain_id.compare c1 c2 else pc
end)
(** {2 Public interface} *)
type t = (module T)
let chain_proto_registry : t ChainProto_registry.t ref =
ref ChainProto_registry.empty
let create limits (module Proto : Protocol_plugin.T) chain_db =
let open Lwt_result_syntax in
let chain_store = Distributed_db.chain_store chain_db in
let chain_id = Store.Chain.chain_id chain_store in
match
ChainProto_registry.find (chain_id, Proto.hash) !chain_proto_registry
with
| None ->
let prevalidator =
make limits chain_db chain_id (mk_tools chain_db) (module Proto)
in
let (module Prevalidator : T) = prevalidator in
chain_proto_registry :=
ChainProto_registry.add
Prevalidator.name
prevalidator
!chain_proto_registry ;
return prevalidator
| Some p -> return p
let shutdown (t : t) =
let module Prevalidator : T = (val t) in
let w = Lazy.force Prevalidator.worker in
chain_proto_registry :=
ChainProto_registry.remove Prevalidator.name !chain_proto_registry ;
Prevalidator.Worker.shutdown w
let flush (t : t) event head live_blocks live_operations =
let open Lwt_result_syntax in
let module Prevalidator : T = (val t) in
let w = Lazy.force Prevalidator.worker in
let*! r =
Prevalidator.Worker.Queue.push_request_and_wait
w
(Request.Flush (head, event, live_blocks, live_operations))
in
match r with
| Ok r -> Lwt.return_ok r
| Error (Closed None) -> fail [Worker_types.Terminated]
| Error (Closed (Some errs)) -> fail errs
| Error (Any exn) -> fail [Exn exn]
| Error (Request_error error_trace) -> fail error_trace
let notify_operations (t : t) peer mempool =
let module Prevalidator : T = (val t) in
let w = Lazy.force Prevalidator.worker in
let open Lwt_result_syntax in
let*! (_was_pushed : bool) =
Prevalidator.Worker.Queue.push_request w (Request.Notify (peer, mempool))
in
Lwt.return_unit
let inject_operation (t : t) ~force op =
let module Prevalidator : T = (val t) in
let open Lwt_result_syntax in
let w = Lazy.force Prevalidator.worker in
let*! r =
Prevalidator.Worker.Queue.push_request_and_wait w (Inject {op; force})
in
match r with
| Ok r -> Lwt.return_ok r
| Error (Closed None) -> fail [Worker_types.Terminated]
| Error (Closed (Some errs)) -> fail errs
| Error (Any exn) -> fail [Exn exn]
| Error (Request_error error_trace) -> fail error_trace
let status (t : t) =
let module Prevalidator : T = (val t) in
let w = Lazy.force Prevalidator.worker in
Prevalidator.Worker.status w
let running_workers () =
ChainProto_registry.fold
(fun (id, proto) t acc -> (id, proto, t) :: acc)
!chain_proto_registry
[]
let pending_requests (t : t) =
let module Prevalidator : T = (val t) in
let w = Lazy.force Prevalidator.worker in
Prevalidator.Worker.Queue.pending_requests w
let current_request (t : t) =
let module Prevalidator : T = (val t) in
let w = Lazy.force Prevalidator.worker in
Prevalidator.Worker.current_request w
let information (t : t) =
let module Prevalidator : T = (val t) in
let w = Lazy.force Prevalidator.worker in
Prevalidator.Worker.information w
let pipeline_length (t : t) =
let module Prevalidator : T = (val t) in
let w = Lazy.force Prevalidator.worker in
Prevalidator.Worker.Queue.pending_requests_length w
let empty_rpc_directory : unit Tezos_rpc.Directory.t =
Tezos_rpc.Directory.gen_register
Tezos_rpc.Directory.empty
(Block_services.Empty.S.Mempool.pending_operations Tezos_rpc.Path.open_root)
(fun _pv params () ->
let pending_operations =
{
Block_services.Empty.Mempool.validated = [];
refused = Operation_hash.Map.empty;
outdated = Operation_hash.Map.empty;
branch_refused = Operation_hash.Map.empty;
branch_delayed = Operation_hash.Map.empty;
unprocessed = Operation_hash.Map.empty;
}
in
Tezos_rpc.Answer.return (params#version, pending_operations))
let rpc_directory : t option Tezos_rpc.Directory.t =
Tezos_rpc.Directory.register_dynamic_directory
Tezos_rpc.Directory.empty
(Block_services.mempool_path Tezos_rpc.Path.open_root)
(function
| None ->
Lwt.return
(Tezos_rpc.Directory.map
(fun _ -> Lwt.return_unit)
empty_rpc_directory)
| Some t ->
let module Prevalidator : T = (val t : T) in
let w = Lazy.force Prevalidator.worker in
let pv = Prevalidator.Worker.state w in
let pv_rpc_dir = Lazy.force (Prevalidator.get_rpc_directory pv) in
Lwt.return
(Tezos_rpc.Directory.map (fun _ -> Lwt.return pv) pv_rpc_dir))
module Internal_for_tests = struct
module Tools = Tools
let mk_chain_tools = mk_chain_tools
let create tools limits (module Proto : Protocol_plugin.T) chain_db =
let open Lwt_result_syntax in
let chain_store = Distributed_db.chain_store chain_db in
let chain_id = Store.Chain.chain_id chain_store in
match
ChainProto_registry.find (chain_id, Proto.hash) !chain_proto_registry
with
| None ->
let prevalidator = make limits chain_db chain_id tools (module Proto) in
let (module Prevalidator : T) = prevalidator in
chain_proto_registry :=
ChainProto_registry.add
Prevalidator.name
prevalidator
!chain_proto_registry ;
return prevalidator
| Some p -> return p
let advertise_mempool (t : t) =
let module Prevalidator : T = (val t) in
let w = Lazy.force Prevalidator.worker in
let open Lwt_result_syntax in
let*! (_was_pushed : bool) =
Prevalidator.Worker.Queue.push_request w Request.Advertise
in
Lwt.return_unit
end