package sihl-queue

  1. Overview
  2. Docs

Source file sihl_queue.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
include Sihl.Contract.Queue

let log_src = Logs.Src.create ("sihl.service." ^ Sihl.Contract.Queue.name)

module Logs = (val Logs.src_log log_src : Logs.LOG)

let create_instance input delay now (job : 'a job) =
  let input = job.encode input in
  let name = job.name in
  let next_run_at =
    match delay with
    | Some delay -> Option.value (Ptime.add_span now delay) ~default:now
    | None -> now
  in
  let max_tries = job.max_tries in
  { id = Uuidm.v `V4 |> Uuidm.to_string
  ; name
  ; input
  ; tries = 0
  ; next_run_at
  ; max_tries
  ; status = Pending
  ; last_error = None
  ; last_error_at = None
  }
;;

let update_next_run_at (retry_delay : Ptime.Span.t) (job_instance : instance) =
  let next_run_at =
    match Ptime.add_span job_instance.next_run_at retry_delay with
    | Some date -> date
    | None -> failwith "Can not determine next run date of job"
  in
  { job_instance with next_run_at }
;;

let incr_tries job_instance =
  { job_instance with tries = job_instance.tries + 1 }
;;

module Make (Repo : Repo.Sig) : Sihl.Contract.Queue.Sig = struct
  let registered_jobs : job' list ref = ref []
  let stop_schedule : (unit -> unit) option ref = ref None

  let dispatch ?delay input (job : 'a job) =
    let open Sihl.Contract.Queue in
    let name = job.name in
    Logs.debug (fun m -> m "Dispatching job %s" name);
    let now = Ptime_clock.now () in
    let job_instance = create_instance input delay now job in
    Repo.enqueue job_instance
  ;;

  let dispatch_all ?delay inputs job =
    let now = Ptime_clock.now () in
    let job_instances =
      List.map (fun input -> create_instance input delay now job) inputs
    in
    Repo.enqueue_all job_instances
  ;;

  let run_job (input : string) (job : job') (job_instance : instance)
      : (unit, string) Result.t Lwt.t
    =
    let job_instance_id = job_instance.id in
    let%lwt result =
      Lwt.catch
        (fun () -> job.handle input)
        (fun exn ->
          let exn_string = Printexc.to_string exn in
          Logs.err (fun m ->
              m
                "Exception caught while running job, this is a bug in your job \
                 handler. Don't throw exceptions there, use Result.t instead. \
                 '%s'"
                exn_string);
          Lwt.return @@ Error exn_string)
    in
    match result with
    | Error msg ->
      Logs.err (fun m ->
          m
            "Failure while running job instance %a %s"
            pp_instance
            job_instance
            msg);
      Lwt.catch
        (fun () ->
          let%lwt () = job.failed msg job_instance in
          Lwt.return @@ Error msg)
        (fun exn ->
          let exn_string = Printexc.to_string exn in
          Logs.err (fun m ->
              m
                "Exception caught while cleaning up job, this is a bug in your \
                 job failure handler, make sure to not throw exceptions there \
                 '%s"
                exn_string);
          Lwt.return @@ Error exn_string)
    | Ok () ->
      Logs.debug (fun m ->
          m "Successfully ran job instance '%s'" job_instance_id);
      Lwt.return @@ Ok ()
  ;;

  let update ~job_instance = Repo.update job_instance

  let work_job (job : job') (job_instance : instance) =
    let now = Ptime_clock.now () in
    if should_run job_instance now
    then (
      let input_string = job_instance.input in
      let%lwt job_run_status = run_job input_string job job_instance in
      let job_instance =
        job_instance |> incr_tries |> update_next_run_at job.retry_delay
      in
      let job_instance =
        match job_run_status with
        | Error msg ->
          if job_instance.tries >= job.max_tries
          then
            { job_instance with
              status = Failed
            ; last_error = Some msg
            ; last_error_at = Some (Ptime_clock.now ())
            }
          else
            { job_instance with
              last_error = Some msg
            ; last_error_at = Some (Ptime_clock.now ())
            }
        | Ok () -> { job_instance with status = Succeeded }
      in
      update ~job_instance)
    else (
      Logs.debug (fun m ->
          m "Not going to run job instance %a" pp_instance job_instance);
      Lwt.return ())
  ;;

  let work_queue ~jobs =
    let%lwt pending_job_instances = Repo.find_workable () in
    let n_job_instances = List.length pending_job_instances in
    if n_job_instances > 0
    then (
      Logs.debug (fun m ->
          m
            "Start working queue of length %d"
            (List.length pending_job_instances));
      let rec loop job_instances jobs =
        match job_instances with
        | [] -> Lwt.return ()
        | (job_instance : instance) :: job_instances ->
          let job =
            List.find_opt
              (fun job -> job.name |> String.equal job_instance.name)
              jobs
          in
          (match job with
          | None -> loop job_instances jobs
          | Some job -> work_job job job_instance)
      in
      let%lwt () = loop pending_job_instances jobs in
      Logs.debug (fun m -> m "Finish working queue");
      Lwt.return ())
    else Lwt.return ()
  ;;

  let register_jobs jobs =
    registered_jobs := List.concat [ !registered_jobs; jobs ];
    Lwt.return ()
  ;;

  let start_queue () =
    Logs.debug (fun m -> m "Start job queue");
    (* This function run every second, the request context gets created here
       with each tick *)
    let scheduled_function () =
      let jobs = !registered_jobs in
      if List.length jobs > 0
      then (
        let job_strings =
          jobs |> List.map (fun job -> job.name) |> String.concat ", "
        in
        Logs.debug (fun m ->
            m "Run job queue with registered jobs: %s" job_strings);
        work_queue ~jobs)
      else (
        Logs.debug (fun m -> m "No jobs found to run, trying again later");
        Lwt.return ())
    in
    let schedule =
      Sihl.Schedule.create
        Sihl.Schedule.every_second
        scheduled_function
        "job_queue"
    in
    stop_schedule := Some (Sihl.Schedule.schedule schedule);
    Lwt.return ()
  ;;

  let start () = start_queue () |> Lwt.map ignore

  let stop () =
    registered_jobs := [];
    match !stop_schedule with
    | Some stop_schedule ->
      stop_schedule ();
      Lwt.return ()
    | None ->
      Logs.warn (fun m -> m "Can not stop schedule");
      Lwt.return ()
  ;;

  let lifecycle =
    Sihl.Container.create_lifecycle
      Sihl.Contract.Queue.name
      ~dependencies:(fun () ->
        List.cons Sihl.Schedule.lifecycle Repo.lifecycles)
      ~start
      ~stop
  ;;

  let register ?(jobs = []) () =
    Repo.register_migration ();
    Repo.register_cleaner ();
    registered_jobs := List.concat [ !registered_jobs; jobs ];
    Sihl.Container.Service.create lifecycle
  ;;

  let query () : instance list Lwt.t = Repo.query ()

  let find id : instance Lwt.t =
    let%lwt job = Repo.find id in
    match job with
    | Some job -> Lwt.return job
    | None ->
      raise @@ Exception (Format.asprintf "Failed to find with id %s" id)
  ;;

  let update (job : instance) : instance Lwt.t =
    let%lwt () = Repo.update job in
    let%lwt updated = Repo.find job.id in
    match updated with
    | Some job -> Lwt.return job
    | None ->
      raise
      @@ Exception (Format.asprintf "Failed to update job %a" pp_instance job)
  ;;

  let requeue (job : instance) : instance Lwt.t =
    let status = Pending in
    let tries = 0 in
    let next_run_at = Ptime_clock.now () in
    let updated = { job with status; tries; next_run_at } in
    update updated
  ;;

  let cancel (job : instance) : instance Lwt.t =
    let status = Cancelled in
    let updated = { job with status } in
    update updated
  ;;

  let router ?back ?theme scope =
    Admin_ui.router query find cancel requeue ?back ?theme scope
  ;;
end

module InMemory = Make (Repo.InMemory)
module MariaDb = Make (Repo.MariaDb)
module PostgreSql = Make (Repo.PostgreSql)
OCaml

Innovation. Community. Security.