package kafka

  1. Overview
  2. Docs
OCaml bindings for Kafka

Install

Dune Dependency

Authors

Maintainers

Sources

0.3.2.tar.gz
sha256=709c482a2477da1790e1e8393fcf3c614ce79a2e183e3ffad4d772f3aded3858
md5=49bb99a375ed791cc9700bac18001965

README.md.html

OCaml bindings for Kafka

Pre-requisites

License

MIT License

Install

$ opam install kafka

From source:

$ make            # use jbuilder
$ make test       # assuming kakfa is running at localhost:9092 with a 'test' topic.
$ make install    # use opam

To enable Lwt support:

$ make lwt
$ make test
$ make install

Usage

#use "topfind";;
#require "kafka";;

(* Prepare a producer handler. *)
let producer = Kafka.new_producer ["metadata.broker.list","localhost:9092"];;
let producer_topic = Kafka.new_topic producer "test" ["message.timeout.ms","10000"];;

(* Prepare a consumer handler *)
let consumer = Kafka.new_consumer ["metadata.broker.list","localhost:9092"];;
let consumer_topic = Kafka.new_topic consumer "test" ["auto.commit.enable","false"];;
let partition = 1;;
let timeout_ms = 1000;;

(* Start collecting messages *)
(* Here we start from offset_end, i.e. we will consume only messages produced from now. *)
Kafka.consume_start consumer_topic partition Kafka.offset_end;;

(* Produce some messages *)
Kafka.produce producer_topic partition "message 0";;
Kafka.produce producer_topic partition "message 1";;
Kafka.produce producer_topic partition "message 2";;

(* Consume messages *)
let rec consume t p = match Kafka.consume ~timeout_ms t p with
  | Kafka.Message(_,_,_,msg,_) -> msg
  | Kafka.PartitionEnd(_,_,_) -> consume t p
  | exception Kafka.Error(Kafka.TIMED_OUT,_) ->
    (Printf.fprintf stderr "Timeout after: %d ms\n%!" timeout_ms; consume t p)
in
let msg = consume consumer_topic partition in assert (msg = "message 0");
let msg = consume consumer_topic partition in assert (msg = "message 1");
let msg = consume consumer_topic partition in assert (msg = "message 2");

(* Stop collecting messages. *)
Kafka.consume_stop consumer_topic partition;;

(* Topics, consumers and producers must be released. *)
Kafka.destroy_topic producer_topic;;
Kafka.destroy_handler producer;;
Kafka.destroy_topic consumer_topic;;
Kafka.destroy_handler consumer;;

Documentation

The API is documented in lib/kafka.mli, and the Lwt extension is documented in lib_lwt/kafka_lwt.mli.

See bin/tail_kafka_topic.ml for an example consumer using queues, batches and lwt. See bin/sendto_kafka_topic.ml for an example producer.

Configuration options of producers, consumers and topics are inherited from librdkafka/CONFIGURATION.

OCaml

Innovation. Community. Security.