package carton

  1. Overview
  2. Docs

Source file thin.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
module Bigarray = Bigarray_compat
open Carton

type ('uid, 's) light_load = 'uid -> (kind * int, 's) io
type ('uid, 's) heavy_load = 'uid -> (Dec.v, 's) io
type optint = Optint.t

let blit_from_string src src_off dst dst_off len =
  Bigstringaf.blit_from_string src ~src_off dst ~dst_off ~len
  [@@inline]

let src = Logs.Src.create "thin"

module Log = (val Logs.src_log src : Logs.LOG)

exception Exists

module Make
    (Scheduler : SCHEDULER)
    (IO : IO with type 'a t = 'a Scheduler.s)
    (Uid : UID) =
struct
  let ( >>= ) x f = IO.bind x f
  let return x = IO.return x
  let ( >>? ) x f = x >>= function Ok x -> f x | Error _ as err -> return err

  let sched =
    let open Scheduler in
    {
      Carton.bind = (fun x f -> inj (prj x >>= fun x -> prj (f x)));
      Carton.return = (fun x -> inj (return x));
    }

  let read stream =
    let ke = Ke.Rke.create ~capacity:0x1000 Bigarray.char in

    let rec go filled inputs =
      match Ke.Rke.N.peek ke with
      | [] -> (
          stream () >>= function
          | Some (src, off, len) ->
              Ke.Rke.N.push ke ~blit:blit_from_string ~length:String.length ~off
                ~len src;
              go filled inputs
          | None -> return filled)
      | src :: _ ->
          let src = Cstruct.of_bigarray src in
          let len = min (Cstruct.len inputs) (Cstruct.len src) in
          Cstruct.blit src 0 inputs 0 len;
          Ke.Rke.N.shift_exn ke len;
          if len < Cstruct.len inputs then
            go (filled + len) (Cstruct.shift inputs len)
          else return (filled + len)
    in
    fun filled inputs -> go filled inputs

  module Verify = Carton.Dec.Verify (Uid) (Scheduler) (IO)
  module Fp = Carton.Dec.Fp (Uid)

  let first_pass ~zl_buffer ~digest stream =
    let fl_buffer = Cstruct.create De.io_buffer_size in
    let zl_window = De.make_window ~bits:15 in

    let allocate _ = zl_window in

    let read_cstruct = read stream in
    let read_bytes () buf ~off ~len =
      let rec go rest raw =
        if rest <= 0 then (
          Cstruct.blit_to_bytes fl_buffer 0 buf off len;
          return (abs rest + len))
        else
          read_cstruct 0 raw >>= function
          | 0 ->
              (* TODO(dinosaure): end of flow, add a test. *)
              return (len - rest)
          | filled -> go (rest - filled) (Cstruct.shift raw filled)
      in
      go len fl_buffer
    in
    let read_bytes () buf ~off ~len =
      Scheduler.inj (read_bytes () buf ~off ~len)
    in

    Fp.check_header sched read_bytes () |> Scheduler.prj
    >>= fun (max, _, len) ->
    let decoder = Fp.decoder ~o:zl_buffer ~allocate `Manual in
    let decoder = Fp.src decoder (Cstruct.to_bigarray fl_buffer) 0 len in

    let children = Hashtbl.create 0x100 in
    let where = Hashtbl.create 0x100 in
    let weight = Hashtbl.create 0x100 in
    let checks = Hashtbl.create 0x100 in
    let matrix = Array.make max Verify.unresolved_node in

    let replace hashtbl k v =
      try
        let v' = Hashtbl.find hashtbl k in
        if v < v' then Hashtbl.replace hashtbl k v'
      with Not_found -> Hashtbl.add hashtbl k v
    in

    let rec go decoder =
      match Fp.decode decoder with
      | `Await decoder ->
          read_cstruct 0 fl_buffer >>= fun len ->
          Log.debug (fun m ->
              m "Refill the first-pass state with %d byte(s)." len);
          go (Fp.src decoder (Cstruct.to_bigarray fl_buffer) 0 len)
      | `Peek decoder ->
          (* XXX(dinosaure): [Fp] does the compression. *)
          let keep = Fp.src_rem decoder in
          read_cstruct 0 (Cstruct.shift fl_buffer keep) >>= fun len ->
          go (Fp.src decoder (Cstruct.to_bigarray fl_buffer) 0 (keep + len))
      | `Entry ({ Fp.kind = Base _; offset; size; crc; _ }, decoder) ->
          let n = Fp.count decoder - 1 in
          Log.debug (fun m -> m "[+] base object (%d) (%Ld)." n offset);
          replace weight offset size;
          Hashtbl.add where offset n;
          Hashtbl.add checks offset crc;
          matrix.(n) <- Verify.unresolved_base ~cursor:offset;
          go decoder
      | `Entry
          ( { Fp.kind = Ofs { sub = s; source; target }; offset; crc; _ },
            decoder ) ->
          let n = Fp.count decoder - 1 in
          Log.debug (fun m -> m "[+] ofs object (%d) (%Ld)." n offset);
          replace weight Int64.(sub offset (Int64.of_int s)) source;
          replace weight offset target;
          Hashtbl.add where offset n;
          Hashtbl.add checks offset crc;

          (try
             let vs =
               Hashtbl.find children (`Ofs Int64.(sub offset (of_int s)))
             in
             Hashtbl.replace children
               (`Ofs Int64.(sub offset (of_int s)))
               (offset :: vs)
           with Not_found ->
             Hashtbl.add children
               (`Ofs Int64.(sub offset (of_int s)))
               [ offset ]);
          go decoder
      | `Entry
          ({ Fp.kind = Ref { ptr; target; source }; offset; crc; _ }, decoder)
        ->
          let n = Fp.count decoder - 1 in
          Log.debug (fun m ->
              m "[+] ref object (%d) (%Ld) (weight: %d)." n offset
                (Stdlib.max target source :> int));
          replace weight offset (Stdlib.max target source);
          Hashtbl.add where offset n;
          Hashtbl.add checks offset crc;

          (try
             let vs = Hashtbl.find children (`Ref ptr) in
             Hashtbl.replace children (`Ref ptr) (offset :: vs)
           with Not_found -> Hashtbl.add children (`Ref ptr) [ offset ]);
          go decoder
      | `End uid -> return (Ok uid)
      | `Malformed err ->
          Log.err (fun m -> m "Got an error: %s." err);
          return (Error (`Msg err))
    in
    go decoder >>? fun uid ->
    Log.debug (fun m -> m "First pass on incoming PACK file is done.");
    return
      (Ok
         ( {
             Carton.Dec.where = (fun ~cursor -> Hashtbl.find where cursor);
             children =
               (fun ~cursor ~uid ->
                 match
                   ( Hashtbl.find_opt children (`Ofs cursor),
                     Hashtbl.find_opt children (`Ref uid) )
                 with
                 | Some a, Some b -> List.sort_uniq compare (a @ b)
                 | Some x, None | None, Some x -> x
                 | None, None -> []);
             digest;
             weight = (fun ~cursor -> Hashtbl.find weight cursor);
           },
           matrix,
           where,
           checks,
           children,
           uid ))

  type ('t, 'path, 'fd, 'error) fs = {
    create : 't -> 'path -> ('fd, 'error) result IO.t;
    append : 't -> 'fd -> string -> unit IO.t;
    map : 't -> 'fd -> pos:int64 -> int -> Bigstringaf.t IO.t;
    close : 't -> 'fd -> (unit, 'error) result IO.t;
  }

  module Set = Set.Make (Uid)

  let zip a b =
    if Array.length a <> Array.length b then invalid_arg "zip: lengths mismatch";
    Array.init (Array.length a) (fun i -> a.(i), b.(i))

  let share l0 l1 =
    try
      List.iter
        (fun (v, _) -> if List.exists (Int64.equal v) l1 then raise Exists)
        l0;
      false
    with Exists -> true

  let verify ?(threads = 4) ~digest t path { create; append; map; close } stream
      =
    let zl_buffer = De.bigstring_create De.io_buffer_size in
    let allocate bits = De.make_window ~bits in
    let weight = ref 0L in
    create t path >>? fun fd ->
    let stream () =
      stream () >>= function
      | Some (buf, off, len) as res ->
          append t fd (String.sub buf off len) >>= fun () ->
          weight := Int64.add !weight (Int64.of_int len);
          return res
      | none -> return none
    in
    Log.debug (fun m -> m "Start to analyse the PACK file.");
    first_pass ~zl_buffer ~digest stream
    >>? fun (oracle, matrix, where, checks, children, uid) ->
    let weight = !weight in
    let pack =
      Carton.Dec.make fd ~allocate ~z:zl_buffer ~uid_ln:Uid.length
        ~uid_rw:Uid.of_raw_string (fun _ -> assert false)
    in
    let map fd ~pos len =
      let len = min len Int64.(to_int (sub weight pos)) in
      Scheduler.inj (map t fd ~pos len)
    in
    Log.debug (fun m -> m "Start to verify incoming PACK file (second pass).");
    Verify.verify ~threads pack ~map ~oracle ~matrix >>= fun () ->
    Log.debug (fun m -> m "Second pass on incoming PACK file is done.");
    let offsets =
      Hashtbl.fold (fun k _ a -> k :: a) where []
      |> List.sort Int64.compare
      |> Array.of_list
    in
    let unresolveds, resolveds =
      let fold (unresolveds, resolveds) (offset, status) =
        if Verify.is_resolved status then
          let uid = Verify.uid_of_status status in
          let crc = Hashtbl.find checks offset in
          unresolveds, { Carton.Dec.Idx.crc; offset; uid } :: resolveds
        else
          let crc = Hashtbl.find checks offset in
          (offset, crc) :: unresolveds, resolveds
      in
      Array.fold_left fold ([], []) (zip offsets matrix)
    in
    let requireds =
      Hashtbl.fold
        (fun k vs a ->
          match k with
          | `Ofs _ -> a
          | `Ref uid -> if share unresolveds vs then Set.add uid a else a)
        children Set.empty
    in
    close t fd >>? fun () ->
    Log.debug (fun m ->
        m "PACK file verified (%d resolved object(s), %d unresolved object(s))"
          (List.length resolveds) (List.length unresolveds));
    return
      (Ok
         ( Hashtbl.length where,
           Set.elements requireds,
           unresolveds,
           resolveds,
           weight,
           uid ))

  let find _ = assert false

  let vuid =
    { Carton.Enc.uid_ln = Uid.length; Carton.Enc.uid_rw = Uid.to_raw_string }

  type nonrec light_load = (Uid.t, Scheduler.t) light_load
  type nonrec heavy_load = (Uid.t, Scheduler.t) heavy_load

  let canonicalize ~light_load ~heavy_load ~src ~dst t
      { create; append; close; map; _ } n uids weight =
    let b =
      {
        Carton.Enc.o = Bigstringaf.create De.io_buffer_size;
        Carton.Enc.i = Bigstringaf.create De.io_buffer_size;
        Carton.Enc.q = De.Queue.create 0x10000;
        Carton.Enc.w = De.make_window ~bits:15;
      }
    in
    let ctx = ref Uid.empty in
    let cursor = ref 0L in
    let light_load uid = Scheduler.prj (light_load uid) in
    create t dst >>? fun fd ->
    let header = Bigstringaf.create 12 in
    Carton.Enc.header_of_pack ~length:(n + List.length uids) header 0 12;
    let hdr = Bigstringaf.to_string header in
    append t fd hdr >>= fun () ->
    ctx := Uid.feed !ctx header;
    cursor := Int64.add !cursor 12L;
    let encode_base uid =
      light_load uid >>= fun (kind, length) ->
      let entry = Carton.Enc.make_entry ~kind ~length uid in
      let anchor = !cursor in
      let crc = ref Checkseum.Crc32.default in
      Carton.Enc.entry_to_target sched ~load:heavy_load entry |> Scheduler.prj
      >>= fun target ->
      Carton.Enc.encode_target sched ~b ~find ~load:heavy_load ~uid:vuid target
        ~cursor:(Int64.to_int anchor)
      |> Scheduler.prj
      >>= fun (len, encoder) ->
      let rec go encoder =
        match Carton.Enc.N.encode ~o:b.o encoder with
        | `Flush (encoder, len) ->
            append t fd (Bigstringaf.substring b.o ~off:0 ~len) >>= fun () ->
            ctx := Uid.feed !ctx ~off:0 ~len b.o;
            crc := Checkseum.Crc32.digest_bigstring b.o 0 len !crc;
            cursor := Int64.add !cursor (Int64.of_int len);
            let encoder =
              Carton.Enc.N.dst encoder b.o 0 (Bigstringaf.length b.o)
            in
            go encoder
        | `End -> return { Carton.Dec.Idx.crc = !crc; offset = anchor; uid }
      in
      append t fd (Bigstringaf.substring b.o ~off:0 ~len) >>= fun () ->
      ctx := Uid.feed !ctx ~off:0 ~len b.o;
      crc := Checkseum.Crc32.digest_bigstring b.o 0 len !crc;
      cursor := Int64.add !cursor (Int64.of_int len);
      let encoder = Carton.Enc.N.dst encoder b.o 0 (Bigstringaf.length b.o) in
      go encoder
    in
    let rec go acc = function
      | [] -> return (List.rev acc)
      | uid :: uids -> encode_base uid >>= fun entry -> go (entry :: acc) uids
    in
    go [] uids >>= fun entries ->
    let shift = Int64.sub !cursor 12L in
    let top = Int64.sub weight (Int64.of_int Uid.length) in
    let rec go src pos =
      let max = Int64.sub top pos in
      let len = min max (Int64.mul 1024L 1024L) in
      let len = Int64.to_int len in
      map t src ~pos len >>= fun raw ->
      append t fd (Bigstringaf.to_string raw) >>= fun () ->
      ctx := Uid.feed !ctx raw;
      cursor := Int64.add !cursor (Int64.of_int len);
      if Int64.add pos (Int64.of_int len) < top then
        go src (Int64.add pos (Int64.of_int len))
      else
        let uid = Uid.get !ctx in
        append t fd (Uid.to_raw_string uid) >>= fun () ->
        return (Ok (Int64.(add !cursor (of_int Uid.length)), uid))
    in
    create t src >>? fun src ->
    go src 12L >>? fun (weight, uid) ->
    close t fd >>? fun () -> return (Ok (shift, weight, uid, entries))
end
OCaml

Innovation. Community. Security.