package irmin-pack

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file dispatcher.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
(*
 * Copyright (c) 2018-2022 Tarides <contact@tarides.com>
 *
 * Permission to use, copy, modify, and distribute this software for any
 * purpose with or without fee is hereby granted, provided that the above
 * copyright notice and this permission notice appear in all copies.
 *
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 *)

open Import
include Dispatcher_intf
module Payload = Control_file.Latest_payload

(* The following [with module Io = Io.Unix] forces unix *)
module Make (Fm : File_manager.S with module Io = Io.Unix) :
  S with module Fm = Fm = struct
  module Fm = Fm
  module Io = Fm.Io
  module Suffix = Fm.Suffix
  module Mapping_file = Fm.Mapping_file
  module Errs = Fm.Errs
  module Control = Fm.Control

  type t = { fm : Fm.t }
  type location = Prefix | Suffix [@@deriving irmin ~pp]

  type accessor = { poff : int63; len : int63; location : location }
  [@@deriving irmin]
  (** [poff] is a physical offset in a file. It is meant to be passed to [Io] or
      [Append_only]

      [len] is a number of bytes following [poff].

      [location] is a file identifier. *)

  let v fm =
    let t = { fm } in
    Ok t

  let get_prefix t =
    match Fm.prefix t.fm with
    | Some prefix -> prefix
    | None -> raise (Errors.Pack_error (`Invalid_prefix_read "no prefix found"))

  let get_mapping t =
    match Fm.mapping t.fm with
    | Some mapping -> mapping
    | None ->
        raise (Errors.Pack_error (`Invalid_mapping_read "no mapping found"))

  let suffix_start_offset t =
    let pl = Control.payload (Fm.control t.fm) in
    match pl.status with
    | Payload.From_v1_v2_post_upgrade _ | Used_non_minimal_indexing_strategy
    | No_gc_yet ->
        Int63.zero
    | T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10 | T11 | T12 | T13 | T14
    | T15 ->
        assert false
    | Gced { suffix_start_offset; _ } -> suffix_start_offset

  let suffix_dead_bytes t =
    let pl = Control.payload (Fm.control t.fm) in
    match pl.status with
    | Payload.From_v1_v2_post_upgrade _ | Used_non_minimal_indexing_strategy
    | No_gc_yet ->
        Int63.zero
    | T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10 | T11 | T12 | T13 | T14
    | T15 ->
        assert false
    | Gced { suffix_dead_bytes; _ } -> suffix_dead_bytes

  (* The dispatcher is responsible for translating between global offsets to prefix
     or suffix offsets.

     {!Suffix_arithmetic} and {!Prefix_arithmetic} encapsulate these calculations.
  *)

  module Suffix_arithmetic = struct
    (* Adjust the read in suffix, as the global offset [off] is
       [off] = [suffix_start_offset] + [soff] - [suffix_dead_bytes]. *)
    let soff_of_off t off =
      let open Int63.Syntax in
      let suffix_start_offset = suffix_start_offset t in
      let suffix_dead_bytes = suffix_dead_bytes t in
      off - suffix_start_offset + suffix_dead_bytes

    let off_of_soff t soff =
      let open Int63.Syntax in
      let suffix_start_offset = suffix_start_offset t in
      let suffix_dead_bytes = suffix_dead_bytes t in
      suffix_start_offset + soff - suffix_dead_bytes
  end

  let offset_of_soff = Suffix_arithmetic.off_of_soff
  let soff_of_offset = Suffix_arithmetic.soff_of_off

  let end_offset t =
    let end_soff = Suffix.end_soff (Fm.suffix t.fm) in
    offset_of_soff t end_soff

  module Prefix_arithmetic = struct
    (* Find the last chunk which is before [off_start] (or at [off_start]). If no
       chunk found, then the entry was possibly gced (case 1). If [off_start] is
       after the entry's chunk then the entry was possibly gced (case 2). Note
       that for these two cases we cannot distinguished between trying to read a
       gced entry, or doing an invalid read. We expose two [read_exn] functions
       and we handled this upstream. *)
    let chunk_of_off_exn mapping off_start =
      (* NOTE off_start is a virtual offset *)
      let open Int63 in
      let open Int63.Syntax in
      let res = Mapping_file.find_nearest_leq mapping off_start in
      match res with
      | None ->
          (* Case 1: The entry if before the very first chunk (or there are no
             chunks). Possibly the entry was gced. *)
          let s =
            Fmt.str
              "offset %a is before the first chunk, or the prefix is empty"
              Int63.pp off_start
          in
          raise (Errors.Pack_error (`Invalid_read_of_gced_object s))
      | Some entry ->
          let chunk_off_start = entry.off in
          assert (chunk_off_start <= off_start);
          let chunk_len = entry.len in
          let chunk_off_end = chunk_off_start + of_int chunk_len in

          (* Case 2: The entry starts after the chunk. Possibly the entry was
             gced. *)
          (if chunk_off_end <= off_start then
           let s =
             Fmt.str
               "offset %a is supposed to be contained in chunk \
                (off=%a,poff=%a,len=%d) but starts after chunk"
               Int63.pp off_start Int63.pp chunk_off_start Int63.pp entry.poff
               entry.len
           in
           raise (Errors.Pack_error (`Invalid_read_of_gced_object s)));

          let shift_in_chunk = off_start - chunk_off_start in
          let max_entry_len = of_int chunk_len - shift_in_chunk in
          assert (max_entry_len >= Int63.zero);

          (entry, shift_in_chunk, max_entry_len)

    (* After we find the chunk of an entry, we check that a read is possible in the
       chunk. If it's not, this is always an invalid read. *)
    let poff_of_entry_exn mapping ~off ~len =
      let chunk, shift_in_chunk, max_entry_len = chunk_of_off_exn mapping off in

      (* Case 3: The entry ends after the chunk *)
      let open Int63.Syntax in
      (if len > max_entry_len then
       let s =
         Fmt.str
           "entry (off=%a, len=%a) is supposed to be contained in chunk \
            (poff=%a,len=%d) and starting at %a but is larger than it can be\n\
           \ contained in chunk" Int63.pp off Int63.pp len Int63.pp chunk.poff
           chunk.len Int63.pp shift_in_chunk
       in
       raise (Errors.Pack_error (`Invalid_prefix_read s)));

      (* Case 4: Success *)
      chunk.poff + shift_in_chunk
  end

  module Accessor = struct
    let v_in_suffix_exn t ~off ~len =
      let open Int63.Syntax in
      let entry_end_offset = off + len in
      if entry_end_offset > end_offset t then
        raise (Errors.Pack_error `Read_out_of_bounds)
      else
        let poff = Suffix_arithmetic.soff_of_off t off in
        { poff; len; location = Suffix }

    let v_in_prefix_exn mapping ~off ~len =
      let poff = Prefix_arithmetic.poff_of_entry_exn mapping ~off ~len in
      { poff; len; location = Prefix }

    let v_exn t ~off ~len =
      let open Int63.Syntax in
      let suffix_start_offset = suffix_start_offset t in
      if off >= suffix_start_offset then v_in_suffix_exn t ~off ~len
      else v_in_prefix_exn (get_mapping t) ~off ~len

    let v_range_in_suffix_exn t ~off ~min_len ~max_len =
      let len =
        let open Int63.Syntax in
        let bytes_after_off = end_offset t - off in
        if bytes_after_off < min_len then
          raise (Errors.Pack_error `Read_out_of_bounds)
        else if bytes_after_off > max_len then max_len
        else bytes_after_off
      in
      let poff = Suffix_arithmetic.soff_of_off t off in
      { poff; len; location = Suffix }

    let v_range_in_prefix_exn t ~off ~min_len ~max_len =
      let mapping = get_mapping t in
      let chunk, shift_in_chunk, max_entry_len =
        Prefix_arithmetic.chunk_of_off_exn mapping off
      in
      let open Int63.Syntax in
      let len =
        if max_entry_len < min_len then
          raise (Errors.Pack_error `Read_out_of_bounds)
        else if max_entry_len > max_len then max_len
        else max_entry_len
      in
      let poff = chunk.poff + shift_in_chunk in
      { poff; len; location = Prefix }

    let v_range_exn t ~off ~min_len ~max_len =
      let open Int63.Syntax in
      let suffix_start_offset = suffix_start_offset t in
      if off >= suffix_start_offset then
        v_range_in_suffix_exn t ~off ~min_len ~max_len
      else v_range_in_prefix_exn t ~off ~min_len ~max_len
  end

  let read_exn t { poff; len; location } buf =
    [%log.debug
      "read_exn in %a at %a for %a" (Irmin.Type.pp location_t) location Int63.pp
        poff Int63.pp len];
    assert (len <= Int63.of_int Stdlib.max_int);
    (* This assetion cannot be triggered because:

       - The user of Dispatcher's API is only able to construct accessors from
         [int].
       - The internals of this file may construct very large accessors but they
         will be chopped before being passed to [read_exn]. *)
    let len = Int63.to_int len in
    match location with
    | Prefix -> Io.read_exn (get_prefix t) ~off:poff ~len buf
    | Suffix -> Suffix.read_exn (Fm.suffix t.fm) ~off:poff ~len buf

  let read_bytes_exn t ~f ~off ~len =
    let open Int63.Syntax in
    let bytes_in_prefix =
      let prefix_bytes_after_off = suffix_start_offset t - off in
      if prefix_bytes_after_off <= Int63.zero then Int63.zero
      else min len prefix_bytes_after_off
    in
    let bytes_in_suffix =
      if bytes_in_prefix < len then len - bytes_in_prefix else Int63.zero
    in
    assert (bytes_in_prefix + bytes_in_suffix = len);
    let prefix_accessor_opt =
      if bytes_in_prefix > Int63.zero then
        Some (Accessor.v_exn t ~off ~len:bytes_in_prefix)
      else None
    in
    let suffix_accessor_opt =
      if bytes_in_suffix > Int63.zero then
        let off = off + bytes_in_prefix in
        Some (Accessor.v_exn t ~off ~len:bytes_in_suffix)
      else None
    in

    (* Now that we have the accessor(s), we're sure the range is valid:
       - it doesn't include dead data from the prefix,
       - it doesn't go after the end of the suffix.

       Go for read. *)
    let max_read_size = 8192 in
    let buffer = Bytes.create max_read_size in
    let max_read_size = Int63.of_int max_read_size in
    let rec aux accessor =
      if accessor.len = Int63.zero then ()
      else if accessor.len < max_read_size then (
        read_exn t accessor buffer;
        f (Bytes.sub_string buffer 0 (Int63.to_int accessor.len)))
      else
        let left, right =
          ( { accessor with len = max_read_size },
            {
              accessor with
              poff = accessor.poff + max_read_size;
              len = accessor.len - max_read_size;
            } )
        in
        read_exn t left buffer;
        f (Bytes.to_string buffer);
        aux right
    in
    Option.iter aux prefix_accessor_opt;
    Option.iter aux suffix_accessor_opt

  let create_accessor_exn t ~off ~len =
    let len = Int63.of_int len in
    Accessor.v_exn t ~off ~len

  let create_accessor_from_range_exn t ~off ~min_len ~max_len =
    let min_len = Int63.of_int min_len in
    let max_len = Int63.of_int max_len in
    Accessor.v_range_exn t ~off ~min_len ~max_len

  let create_accessor_to_prefix_exn t ~off ~len =
    let len = Int63.of_int len in
    Accessor.v_in_prefix_exn t ~off ~len

  let shrink_accessor_exn a ~new_len =
    let open Int63.Syntax in
    let new_len = Int63.of_int new_len in
    if new_len > a.len then failwith "shrink_accessor_exn to larger accessor";
    { a with len = new_len }

  let create_sequential_accessor_exn location rem_len ~poff ~len =
    if len > rem_len then raise (Errors.Pack_error `Read_out_of_bounds)
    else { poff; len = Int63.of_int len; location }

  let create_sequential_accessor_from_range_exn location rem_len ~poff ~min_len
      ~max_len =
    let len =
      if rem_len < min_len then raise (Errors.Pack_error `Read_out_of_bounds)
      else if rem_len > max_len then max_len
      else rem_len
    in
    { poff; len = Int63.of_int len; location }

  let create_sequential_accessor_seq t ~min_header_len ~max_header_len ~read_len
      =
    let preffix_chunks =
      match Fm.mapping t.fm with
      | Some mapping ->
          let preffix_chunks = ref [] in
          Mapping_file.iter mapping (fun ~off ~len ->
              preffix_chunks := (off, len) :: !preffix_chunks)
          |> Errs.raise_if_error;
          List.rev !preffix_chunks
      | None -> []
    in
    let suffix_end_soff = Fm.Suffix.end_soff (Fm.suffix t.fm) in
    let suffix_start_offset = suffix_start_offset t in
    let get_entry_accessor rem_len location poff =
      let accessor =
        create_sequential_accessor_from_range_exn location rem_len ~poff
          ~min_len:min_header_len ~max_len:max_header_len
      in
      let buf = Bytes.create max_header_len in
      read_exn t accessor buf;
      let entry_len = read_len buf in
      ( entry_len,
        create_sequential_accessor_exn location rem_len ~poff ~len:entry_len )
    in
    let rec suffix_accessors soff () =
      let open Seq in
      let open Int63.Syntax in
      if soff >= suffix_end_soff then Nil
      else
        let rem_len = Int63.to_int (suffix_end_soff - soff) in
        let entry_len, accessor = get_entry_accessor rem_len Suffix soff in
        let r = (suffix_start_offset + soff, accessor) in
        let soff = soff + Int63.of_int entry_len in
        let f = suffix_accessors soff in
        Cons (r, f)
    in
    let rec prefix_accessors poff acc () =
      let open Seq in
      match acc with
      | [] -> suffix_accessors Int63.zero ()
      | (off, rem_len) :: acc ->
          if rem_len <= 0 then prefix_accessors poff acc ()
          else
            let entry_len, accessor = get_entry_accessor rem_len Prefix poff in
            let r = (off, accessor) in
            let rem_len = rem_len - entry_len in
            let open Int63.Syntax in
            let poff = poff + Int63.of_int entry_len in
            let off = off + Int63.of_int entry_len in
            let f = prefix_accessors poff ((off, rem_len) :: acc) in
            Cons (r, f)
    in
    prefix_accessors Int63.zero preffix_chunks
end
OCaml

Innovation. Community. Security.