package picos_std

  1. Overview
  2. Docs

Module Picos_std_syncSource

Basic communication and synchronization primitives for Picos.

This library essentially provides a conventional set of communication and synchronization primitives for concurrent programming with any Picos compatible scheduler.

For the examples we open some modules:

  open Picos_std_structured
  open Picos_std_sync

Modules

Sourcemodule Mutex : sig ... end

A mutual-exclusion lock or mutex.

Sourcemodule Condition : sig ... end

A condition variable.

Sourcemodule Semaphore : sig ... end

Counting and Binary semaphores.

Sourcemodule Lazy : sig ... end

A lazy suspension.

Sourcemodule Latch : sig ... end

A dynamic single-use countdown latch.

Sourcemodule Ivar : sig ... end

An incremental or single-assignment poisonable variable.

Sourcemodule Stream : sig ... end

A lock-free, poisonable, many-to-many, stream.

Examples

A simple bounded queue

Here is an example of a simple bounded (blocking) queue using a mutex and condition variables:

  module Bounded_q : sig
    type 'a t
    val create : capacity:int -> 'a t
    val push : 'a t -> 'a -> unit
    val pop : 'a t -> 'a
  end = struct
    type 'a t = {
      mutex : Mutex.t;
      queue : 'a Queue.t;
      capacity : int;
      not_empty : Condition.t;
      not_full : Condition.t;
    }

    let create ~capacity =
      if capacity < 0 then
        invalid_arg "negative capacity"
      else {
        mutex = Mutex.create ();
        queue = Queue.create ();
        capacity;
        not_empty = Condition.create ();
        not_full = Condition.create ();
      }

    let is_full_unsafe t =
      t.capacity <= Queue.length t.queue

    let push t x =
      let was_empty =
        Mutex.protect t.mutex @@ fun () ->
        while is_full_unsafe t do
          Condition.wait t.not_full t.mutex
        done;
        Queue.push x t.queue;
        Queue.length t.queue = 1
      in
      if was_empty then
        Condition.broadcast t.not_empty

    let pop t =
      let elem, was_full =
        Mutex.protect t.mutex @@ fun () ->
        while Queue.length t.queue = 0 do
          Condition.wait
            t.not_empty t.mutex
        done;
        let was_full = is_full_unsafe t in
        Queue.pop t.queue, was_full
      in
      if was_full then
        Condition.broadcast t.not_full;
      elem
  end

The above is definitely not the fastest nor the most scalable bounded queue, but we can now demonstrate it with the cooperative Picos_mux_fifo scheduler:

  # Picos_mux_fifo.run @@ fun () ->

    let bq =
      Bounded_q.create ~capacity:3
    in

    Flock.join_after ~on_return:`Terminate begin fun () ->
      Flock.fork begin fun () ->
        while true do
          Printf.printf "Popped %d\n%!"
            (Bounded_q.pop bq)
        done
      end;

      for i=1 to 5 do
        Printf.printf "Pushing %d\n%!" i;
        Bounded_q.push bq i
      done;

      Printf.printf "All done?\n%!";

      Control.yield ();
    end;

    Printf.printf "Pushing %d\n%!" 101;
    Bounded_q.push bq 101;

    Printf.printf "Popped %d\n%!"
      (Bounded_q.pop bq)
  Pushing 1
  Pushing 2
  Pushing 3
  Pushing 4
  Popped 1
  Popped 2
  Popped 3
  Pushing 5
  All done?
  Popped 4
  Popped 5
  Pushing 101
  Popped 101
  - : unit = ()

Notice how the producer was able to push three elements to the queue after which the fourth push blocked and the consumer was started. Also, after canceling the consumer, the queue could still be used just fine.

Conventions

The optional padded argument taken by several constructor functions, e.g. Latch.create, Mutex.create, Condition.create, Semaphore.Counting.make, and Semaphore.Binary.make, defaults to false. When explicitly specified as ~padded:true the object is allocated in a way to avoid false sharing. For relatively long lived objects this can improve performance and make performance more stable at the cost of using more memory. It is not recommended to use ~padded:true for short lived objects.

OCaml

Innovation. Community. Security.