package kafka

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file kafka_consumer.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
open Kafka.Metadata

(** [protect ~finally f x] calls [f x] and ensures that [finally ()] is called before returning [f x].

    Adapted from http://stackoverflow.com/questions/11276985/emulating-try-with-finally-in-ocaml.
*)
let protect ~finally f x =
  let module E = struct type 'a t = Left of 'a | Right of exn end in
  let res = try E.Left (f x) with e -> E.Right e in
  let () = finally () in
  match res with
  | E.Left  r -> r
  | E.Right e -> raise e

let fold_partition
  ?(consumer_props = ["metadata.broker.list","localhost:9092"])
  ?(topic_props = [])
  ?(timeout_ms = 1000)
  ?(stop_at_end = false)
  topic_name partition update offset seed
=
  (* enable.partition.eof must be set to true to catch partition end *)
  let consumer_props = ("enable.partition.eof", "true")::consumer_props in
  let consumer = Kafka.new_consumer consumer_props in
  let topic = Kafka.new_topic consumer topic_name topic_props in
  let start_consuming () =
    Kafka.consume_start topic partition offset
  in
  let stop_consuming () =
    Kafka.consume_stop topic partition;
    Kafka.destroy_topic topic;
    Kafka.destroy_handler consumer;
  in
  let rec loop acc =
    match Kafka.consume ~timeout_ms topic partition with
    | Kafka.Message _ as msg ->
      loop (update acc msg)
    | Kafka.PartitionEnd _ as msg ->
      let acc = update acc msg in
      if stop_at_end
      then acc
      else loop acc
    | exception Kafka.Error(Kafka.TIMED_OUT,_) ->
      if stop_at_end
      then acc
      else loop acc
    | exception e ->
      raise e
  in
  start_consuming ();
  protect ~finally:stop_consuming loop seed

let fold_queue_for_ever queue timeout_ms update seed
=
  let rec loop acc =
    match Kafka.consume_queue ~timeout_ms queue with
    | Kafka.Message _ as msg ->
      loop (update acc msg)
    | Kafka.PartitionEnd _ as msg ->
      loop (update acc msg)
    | exception Kafka.Error(Kafka.TIMED_OUT,_) ->
      loop acc
    | exception e -> 
      raise e
    in
    loop seed

module Partition = struct
  type t = int
  let compare a b = a - b
end
module PartitionSet : Set.S with type elt = int = Set.Make(Partition)

let fold_queue_upto_end queue timeout_ms update partitions seed
=
  let rec loop (partition_set,acc) =
    match Kafka.consume_queue ~timeout_ms queue with
    | Kafka.Message (_,partition,_,_,_) as msg ->
      let partition_set = PartitionSet.add partition partition_set in
      loop (partition_set, update acc msg)
    | Kafka.PartitionEnd (_,partition,_) as msg ->
      let partition_set = PartitionSet.remove partition partition_set in
      let acc = update acc msg in
      if PartitionSet.is_empty partition_set
      then acc
      else loop (partition_set,acc)
    | exception Kafka.Error(Kafka.TIMED_OUT,_) ->
      loop (partition_set,acc)
    | exception e -> (
      raise e
    )
    in
    loop (PartitionSet.of_list partitions,seed)

let find_offset partition_offsets partition =
  try List.assoc partition partition_offsets
  with Not_found -> 0L

let fold_topic
  ?(consumer_props = ["metadata.broker.list","localhost:9092"])
  ?(topic_props = [])
  ?(timeout_ms = 1000)
  ?(stop_at_end = false)
  topic_name partitions update partition_offsets seed
=
  (* enable.partition.eof must be set to true to catch partition end *)
  let consumer_props = ("enable.partition.eof", "true")::consumer_props in
  let consumer = Kafka.new_consumer consumer_props in
  let topic = Kafka.new_topic consumer topic_name topic_props in
  let partitions = match partitions with
    | [] -> (Kafka.topic_metadata consumer topic).topic_partitions
    | _ -> partitions
  in
  let offsets = List.map (find_offset partition_offsets) partitions in
  let queue = Kafka.new_queue consumer in
  let start_consuming () =
    List.iter2 (Kafka.consume_start_queue queue topic) partitions offsets
  in
  let loop seed =
    if stop_at_end
    then fold_queue_upto_end queue timeout_ms update partitions seed
    else fold_queue_for_ever queue timeout_ms update seed;
  in
  let stop_consuming () =
    List.iter (Kafka.consume_stop topic) partitions;
    Kafka.destroy_queue queue;
    Kafka.destroy_topic topic;
    Kafka.destroy_handler consumer;
  in
  start_consuming ();
  protect ~finally:stop_consuming loop seed

module TopicMap : Map.S with type key = string = Map.Make(String)

let fold_queue
  ?(consumer_props = ["metadata.broker.list","localhost:9092"])
  ?(topic_props = [])
  ?(timeout_ms = 1000)
  ?stop_at_end:(_stop_at_end = false)
  topic_partition_pairs update topic_partition_offsets seed
=
  (* enable.partition.eof must be set to true to catch partition end *)
  let consumer_props = ("enable.partition.eof", "true")::consumer_props in
  let consumer = Kafka.new_consumer consumer_props in
  let topics = List.fold_left (fun acc (topic_name,_) ->
    if TopicMap.mem topic_name acc
    then acc
    else TopicMap.add topic_name (Kafka.new_topic consumer topic_name topic_props) acc
  ) TopicMap.empty topic_partition_pairs in
  let partitions = List.map (fun (topic_name,partition) ->
    (TopicMap.find topic_name topics, partition)
  ) topic_partition_pairs in
  let offsets = List.map (fun (topic_name,partition,offset) ->
    (TopicMap.find topic_name topics, partition,offset)
  ) topic_partition_offsets in
  let queue = Kafka.new_queue consumer in
  let start_consuming () =
    List.iter (fun (topic,partition,offset) -> Kafka.consume_start_queue queue topic partition offset) offsets;
  in
  let loop seed =
    fold_queue_for_ever queue timeout_ms update seed;
  in
  let stop_consuming () =
    List.iter (fun (topic,partition) -> Kafka.consume_stop topic partition) partitions;
    Kafka.destroy_queue queue;
    TopicMap.iter (fun _ topic -> Kafka.destroy_topic topic) topics;
    Kafka.destroy_handler consumer;
  in
  start_consuming ();
  protect ~finally:stop_consuming loop seed
OCaml

Innovation. Community. Security.