package dune-rpc
Communicate with dune using rpc
Install
Dune Dependency
Authors
Maintainers
Sources
dune-3.9.3.tbz
sha256=96bf755da267fb46e4af2dda0db56d5863761589618089c429ff85e0f7f65783
sha512=ce05560a2cff0beb805a259df449b5dbd15420e353cc686a482904b837969bce6f91eedec608ecef4be0ebc232fa013652745a7cc831af1a7f8fe06a391e5488
doc/src/dune-rpc.private/dune_rpc_private.ml.html
Source file dune_rpc_private.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729
open Import module Conv = Conv module Versioned = Versioned module Menu = Menu module Procedures = Procedures module Where = Where module Registry = Registry include Types include Exported_types module Version_error = Versioned.Version_error module Decl = Decl module Sub = Sub module type Fiber = Fiber_intf.S module Public = struct module Request = struct type ('a, 'b) t = ('a, 'b) Decl.Request.witness let ping = Procedures.Public.ping.decl let diagnostics = Procedures.Public.diagnostics.decl let format_dune_file = Procedures.Public.format_dune_file.decl let promote = Procedures.Public.promote.decl let build_dir = Procedures.Public.build_dir.decl end module Notification = struct type 'a t = 'a Decl.Notification.witness let shutdown = Procedures.Public.shutdown.decl end module Sub = struct type 'a t = 'a Sub.t let diagnostic = Sub.of_procedure Procedures.Poll.diagnostic let progress = Sub.of_procedure Procedures.Poll.progress let running_jobs = Sub.of_procedure Procedures.Poll.running_jobs end end module Server_notifications = struct let abort = Procedures.Server_side.abort.decl let log = Procedures.Server_side.log.decl end module Client = struct module type S = sig type t type 'a fiber type chan module Versioned : sig type ('a, 'b) request = ('a, 'b) Versioned.Staged.request type 'a notification = 'a Versioned.Staged.notification val prepare_request : t -> ('a, 'b) Decl.Request.witness -> (('a, 'b) request, Version_error.t) result fiber val prepare_notification : t -> 'a Decl.Notification.witness -> ('a notification, Version_error.t) result fiber end val request : ?id:Id.t -> t -> ('a, 'b) Versioned.request -> 'a -> ('b, Response.Error.t) result fiber val notification : t -> 'a Versioned.notification -> 'a -> unit fiber val disconnected : t -> unit fiber module Stream : sig type 'a t val cancel : _ t -> unit fiber val next : 'a t -> 'a option fiber end val poll : ?id:Id.t -> t -> 'a Sub.t -> ('a Stream.t, Version_error.t) result fiber module Batch : sig type client := t type t val create : client -> t val request : ?id:Id.t -> t -> ('a, 'b) Versioned.request -> 'a -> ('b, Response.Error.t) result fiber val notification : t -> 'a Versioned.notification -> 'a -> unit val submit : t -> unit fiber end module Handler : sig type t val create : ?log:(Message.t -> unit fiber) -> ?abort:(Message.t -> unit fiber) -> unit -> t end type proc = | Request : ('a, 'b) Decl.request -> proc | Notification : 'a Decl.notification -> proc | Poll : 'a Procedures.Poll.t -> proc | Handle_request : ('a, 'b) Decl.request * ('a -> 'b fiber) -> proc val connect : ?handler:Handler.t -> chan -> Initialize.Request.t -> f:(t -> 'a fiber) -> 'a fiber end module Make (Fiber : Fiber_intf.S) (Chan : sig type t val write : t -> Sexp.t list option -> unit Fiber.t val read : t -> Sexp.t option Fiber.t end) = struct open Fiber.O module V = Versioned.Make (Fiber) module Chan = struct type t = { read : unit -> Sexp.t option Fiber.t ; write : Sexp.t list option -> unit Fiber.t ; closed_read : bool ; mutable closed_write : bool ; disconnected : unit Fiber.Ivar.t } let of_chan c = let disconnected = Fiber.Ivar.create () in let read () = let* result = Chan.read c in match result with | None -> let+ () = Fiber.Ivar.fill disconnected () in None | _ -> Fiber.return result in { read ; write = (fun s -> Chan.write c s) ; closed_read = false ; closed_write = false ; disconnected } let write t s = let* () = Fiber.return () in match s with | Some _ -> t.write s | None -> if t.closed_write then Fiber.return () else ( t.closed_write <- true; t.write None) let read t = let* () = Fiber.return () in if t.closed_read then Fiber.return None else t.read () end type abort = | Invalid_session of Conv.error | Server_aborted of Message.t exception Abort of abort let () = Printexc.register_printer (function | Abort error -> let dyn = match error with | Invalid_session e -> Dyn.variant "Invalid_session" [ Conv.dyn_of_error e ] | Server_aborted e -> Dyn.variant "Server_aborted" [ Sexp.to_dyn (Message.to_sexp_unversioned e) ] in Some (Dyn.to_string dyn) | _ -> None) type t = { chan : Chan.t ; requests : ( Id.t , [ `Cancelled | `Pending of [ `Completed of Response.t | `Connection_dead | `Cancelled ] Fiber.Ivar.t ] ) Table.t ; initialize : Initialize.Request.t ; mutable next_id : int ; mutable running : bool ; mutable handler_initialized : bool ; (* We need this field to be an Ivar to ensure that any typed communications are correctly versioned. The contract of the [Fiber] interface ensures that this will be filled before any user code is run. *) handler : unit V.Handler.t Fiber.t ; on_preemptive_abort : Message.t -> unit Fiber.t } (* When the client is terminated via this function, the session is considered to be dead without a way to recover. *) let terminate t = let* () = Fiber.return () in match t.running with | false -> Fiber.return () | true -> t.running <- false; let ivars = ref [] in Table.filteri_inplace t.requests ~f:(fun ~key:_ ~data:ivar -> ivars := ivar :: !ivars; false); let ivars () = Fiber.return (match !ivars with | [] -> None | x :: xs -> ivars := xs; Some x) in Fiber.fork_and_join_unit (fun () -> Chan.write t.chan None) (fun () -> Fiber.parallel_iter ivars ~f:(fun status -> match status with | `Cancelled -> Fiber.return () | `Pending ivar -> Fiber.Ivar.fill ivar `Connection_dead)) let terminate_with_error t message info = Fiber.fork_and_join_unit (fun () -> terminate t) (fun () -> (* TODO stop using code error here. If [terminate_with_error] is called, it's because the other side is doing something unexpected, not because we have a bug *) Code_error.raise message info) let send conn (packet : Packet.t list option) = let sexps = Option.map packet ~f:(List.map ~f:(Conv.to_sexp Packet.sexp)) in Chan.write conn.chan sexps let create ~chan ~initialize ~handler ~on_preemptive_abort = let requests = Table.create (module Id) 16 in { chan ; requests ; next_id = 0 ; initialize ; running = true ; handler_initialized = false ; handler ; on_preemptive_abort } let prepare_request' conn (id, req) = match conn.running with | false -> let err = let payload = Sexp.record [ ("id", Id.to_sexp id) ; ("req", Conv.to_sexp (Conv.record Call.fields) req) ] in Response.Error.create ~payload ~message:"request sent while connection is dead" ~kind:Connection_dead () in Error err | true -> let ivar = Fiber.Ivar.create () in (match Table.add conn.requests id (`Pending ivar) with | Ok () -> () | Error _ -> Code_error.raise "duplicate id" [ ("id", Id.to_dyn id) ]); Ok ivar let request_untyped conn (id, req) = let* () = Fiber.return () in match prepare_request' conn (id, req) with | Error e -> Fiber.return (`Completed (Error e)) | Ok ivar -> let* () = send conn (Some [ Request (id, req) ]) in Fiber.Ivar.read ivar let parse_response t decode = function | Error e -> Fiber.return (Error e) | Ok res -> ( match decode res with | Ok s -> Fiber.return (Ok s) | Error e -> terminate_with_error t "response not matched by decl" [ ("e", Response.Error.to_dyn e) ]) let gen_id t = function | Some id -> id | None -> let id = Sexp.List [ Atom "auto"; Atom (Int.to_string t.next_id) ] in t.next_id <- t.next_id + 1; Id.make id module Versioned = struct type ('a, 'b) request = ('a, 'b) Versioned.Staged.request type 'a notification = 'a Versioned.Staged.notification let prepare_request t (decl : _ Decl.Request.witness) = let+ handler = t.handler in V.Handler.prepare_request handler decl let prepare_notification (type a) t (decl : a Decl.Notification.witness) = let+ handler = t.handler in V.Handler.prepare_notification handler decl end let request t id ({ encode_req; decode_resp } : _ Versioned.request) req = let req = encode_req req in let* res = request_untyped t (id, req) in match res with | `Connection_dead -> Fiber.return `Connection_dead | `Cancelled -> Fiber.return `Cancelled | `Completed res -> let+ res = parse_response t decode_resp res in `Completed res let cancel t id = match Table.find t.requests id with | None | Some `Cancelled -> Fiber.return () | Some (`Pending ivar) -> Table.remove t.requests id; Fiber.Ivar.fill ivar `Cancelled let make_notification (type a) t ({ encode } : a Versioned.notification) (n : a) (k : Call.t -> 'a) : 'a = let call = encode n in match t.running with | true -> k call | false -> let err = let payload = Conv.to_sexp (Conv.record Call.fields) call in Response.Error.create ~payload ~message:"notification sent while connection is dead" ~kind:Code_error () in raise (Response.Error.E err) let notification (type a) t (stg : a Versioned.notification) (n : a) = let* () = Fiber.return () in make_notification t stg n (fun call -> send t (Some [ Notification call ])) let disconnected t = Fiber.Ivar.read t.chan.disconnected module Stream = struct type nonrec 'a t = { poll : (Id.t, 'a option) Versioned.request ; cancel : Id.t Versioned.notification ; client : t ; id : Id.t ; mutable pending_request_id : Id.t option ; counter : int ; mutable active : bool } let create sub client id = let+ handler = client.handler in let open Result.O in let+ poll = V.Handler.prepare_request handler (Sub.poll sub) and+ cancel = V.Handler.prepare_notification handler (Sub.poll_cancel sub) in { poll ; cancel ; client ; id ; pending_request_id = None ; counter = 0 ; active = true } let check_active t = if not t.active then Code_error.raise "polling is inactive" [ ("id", Id.to_dyn t.id) ] let next t = let* () = Fiber.return () in check_active t; (match t.pending_request_id with | Some _ -> Code_error.raise "Poll.next: previous Poll.next did not terminate yet" [] | None -> ()); let id = Sexp.record [ ("poll", Id.to_sexp t.id) ; ("i", Sexp.Atom (string_of_int t.counter)) ] |> Id.make in t.pending_request_id <- Some id; let+ res = request t.client id t.poll t.id in t.pending_request_id <- None; match res with | `Connection_dead | `Cancelled -> None | `Completed (Ok res) -> res | `Completed (Error e) -> (* cwong: Should this really be a raise? *) raise (Response.Error.E e) let cancel t = let* () = Fiber.return () in check_active t; t.active <- false; (* XXX should we add a pool to stop waiting for the notification to reach the server? *) let notify () = notification t.client t.cancel t.id in match t.pending_request_id with | None -> notify () | Some id -> Fiber.fork_and_join_unit (fun () -> cancel t.client id) notify end let no_cancel_raise_connection_dead id = function | `Cancelled -> assert false | `Completed s -> s | `Connection_dead -> let payload = Sexp.record [ ("id", Id.to_sexp id) ] in let error = Response.Error.create ~kind:Connection_dead ~payload ~message: "connection terminated. this request will never receive a \ response" () in Error error let request ?id t spec req = let id = gen_id t id in let+ res = request t id spec req in no_cancel_raise_connection_dead id res let poll ?id client sub = let* () = Fiber.return () in let id = gen_id client id in Stream.create sub client id module Batch = struct type nonrec t = { client : t ; mutable pending : Packet.t list } let create client = { client; pending = [] } let notification t n a = make_notification t.client n a (fun call -> t.pending <- Notification call :: t.pending) let request (type a b) ?id t ({ encode_req; decode_resp } : (a, b) Versioned.request) (req : a) : (b, _) result Fiber.t = let* () = Fiber.return () in let id = gen_id t.client id in let call = encode_req req in let ivar = prepare_request' t.client (id, call) in match ivar with | Error e -> Fiber.return (Error e) | Ok ivar -> t.pending <- Packet.Request (id, call) :: t.pending; let* res = Fiber.Ivar.read ivar in (* currently impossible because there's no batching for polling and cancellation is only available for polled requests *) let res = no_cancel_raise_connection_dead id res in parse_response t.client decode_resp res let submit t = let* () = Fiber.return () in let pending = List.rev t.pending in t.pending <- []; send t.client (Some pending) end let read_packets t packets = let* () = Fiber.parallel_iter packets ~f:(function | Packet.Notification n -> ( if String.equal n.method_ Procedures.Server_side.abort.decl.method_ && not t.handler_initialized then match Conv.of_sexp ~version:t.initialize.dune_version Message.sexp n.params with | Ok msg -> t.on_preemptive_abort msg | Error error -> terminate_with_error t "fatal: server aborted connection, but couldn't parse reason" [ ("reason", Sexp.to_dyn n.params) ; ("error", Conv.dyn_of_error error) ] else let* handler = t.handler in let* result = V.Handler.handle_notification handler () n in match result with | Error e -> terminate_with_error t "received bad notification from server" [ ("error", Response.Error.to_dyn e) ; ("notification", Call.to_dyn n) ] | Ok () -> Fiber.return ()) | Request (id, req) -> let* handler = t.handler in let* result = V.Handler.handle_request handler () (id, req) in send t (Some [ Response (id, result) ]) | Response (id, response) -> ( match Table.find t.requests id with | Some status -> ( Table.remove t.requests id; match status with | `Pending ivar -> Fiber.Ivar.fill ivar (`Completed response) | `Cancelled -> Fiber.return ()) | None -> terminate_with_error t "unexpected response" [ ("id", Id.to_dyn id); ("response", Response.to_dyn response) ] )) in terminate t module Handler = struct type nonrec t = { log : Message.t -> unit Fiber.t ; abort : Message.t -> unit Fiber.t } let log { Message.payload; message } = let+ () = Fiber.return () in match payload with | None -> Format.eprintf "%s@." message | Some payload -> Format.eprintf "%s: %s@." message (Sexp.to_string payload) let abort m = raise (Abort (Server_aborted m)) let default = { log; abort } let create ?log ?abort () = let t = let t = default in match log with | None -> t | Some log -> { t with log } in let t = match abort with | None -> t | Some abort -> { t with abort } in t end type proc = | Request : ('a, 'b) Decl.request -> proc | Notification : 'a Decl.notification -> proc | Poll : 'a Procedures.Poll.t -> proc | Handle_request : ('a, 'b) Decl.request * ('a -> 'b Fiber.t) -> proc let setup_versioning ~ ~(handler : Handler.t) = let module Builder = V.Builder in let t : unit Builder.t = Builder.create () in (* CR-soon cwong: It is a *huge* footgun that you have to remember to declare a request here, or via [private_menu], and there is no mechanism to warn you if you forget. The closest thing is either seeing that [dune rpc status] does not report the new procedure, or need to deal with the [Notification_error.t], which contains some good context, but very little to indicate this specific problem. *) Builder.declare_request t Procedures.Public.ping; Builder.declare_request t Procedures.Public.diagnostics; Builder.declare_request t Procedures.Poll.(poll running_jobs); Builder.declare_notification t Procedures.Public.shutdown; Builder.declare_request t Procedures.Public.format_dune_file; Builder.declare_request t Procedures.Public.promote; Builder.declare_request t Procedures.Public.build_dir; Builder.implement_notification t Procedures.Server_side.abort (fun () -> handler.abort); Builder.implement_notification t Procedures.Server_side.log (fun () -> handler.log); Builder.declare_request t Procedures.Poll.(poll diagnostic); Builder.declare_request t Procedures.Poll.(poll progress); Builder.declare_notification t Procedures.Poll.(cancel running_jobs); Builder.declare_notification t Procedures.Poll.(cancel diagnostic); Builder.declare_notification t Procedures.Poll.(cancel progress); List.iter private_menu ~f:(function | Handle_request (r, h) -> Builder.implement_request t r (fun () -> h) | Request r -> Builder.declare_request t r | Notification n -> Builder.declare_notification t n | Poll p -> Builder.declare_request t (Procedures.Poll.poll p); Builder.declare_notification t (Procedures.Poll.cancel p)); t let connect_raw chan (initialize : Initialize.Request.t) ~( : proc list) ~(handler : Handler.t) ~f = let packets () = let+ read = Chan.read chan in Option.map read ~f:(fun sexp -> match Conv.of_sexp Packet.sexp ~version:initialize.dune_version sexp with | Error e -> raise (Abort (Invalid_session e)) | Ok message -> message) in let builder = setup_versioning ~handler ~private_menu in let handler_var = Fiber.Ivar.create () in let client = let on_preemptive_abort = handler.abort in let handler = Fiber.Ivar.read handler_var in create ~initialize ~chan ~handler ~on_preemptive_abort in let run () = let* init = let id = Id.make (List [ Atom "initialize" ]) in let initialize = Initialize.Request.to_call initialize in let+ res = request_untyped client (id, initialize) in no_cancel_raise_connection_dead id res in match init with | Error e -> raise (Response.Error.E e) | Ok csexp -> let* = match Conv.of_sexp ~version:initialize.dune_version Initialize.Response.sexp csexp with | Error e -> raise (Abort (Invalid_session e)) | Ok _resp -> ( let id = Id.make (List [ Atom "version menu" ]) in let supported_versions = let request = Version_negotiation.Request.create (V.Builder.registered_procedures builder) in Version_negotiation.Request.to_call request in let* resp = request_untyped client (id, supported_versions) in (* we don't allow cancelling negotiation *) match no_cancel_raise_connection_dead id resp with | Error e -> raise (Response.Error.E e) | Ok sexp -> ( match Conv.of_sexp ~version:initialize.dune_version Version_negotiation.Response.sexp sexp with | Error e -> raise (Abort (Invalid_session e)) | Ok (Selected methods) -> ( match Menu.of_list methods with | Ok m -> Fiber.return m | Error (method_, a, b) -> terminate_with_error client "server responded with invalid version menu" [ ( "duplicated" , Dyn.Tuple [ Dyn.String method_; Dyn.Int a; Dyn.Int b ] ) ]))) in let handler = V.Builder.to_handler builder ~menu ~session_version:(fun () -> client.initialize.dune_version) in client.handler_initialized <- true; let* () = Fiber.Ivar.fill handler_var handler in Fiber.finalize (fun () -> f client) ~finally:(fun () -> Chan.write chan None) in Fiber.fork_and_join_unit (fun () -> read_packets client packets) run let ?(handler = Handler.default) ~ chan init ~f = connect_raw (Chan.of_chan chan) init ~handler ~private_menu ~f let connect = connect_with_menu ~private_menu:[] end end
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>