项目作者: c-cube

项目描述 :
[beta] A multi-consumer, multi-producers blocking queue and stream for Lwt
高级语言: OCaml
项目地址: git://github.com/c-cube/lwt-pipe.git
创建时间: 2016-04-22T20:13:38Z
项目社区:https://github.com/c-cube/lwt-pipe

开源协议:BSD 2-Clause "Simplified" License

下载


Lwt Pipe build

An alternative to Lwt_stream with interfaces for producers and consumers
and a bounded internal buffer.

Online Documentation

Build

  1. opam install lwt-pipe

or:

  1. opam pin https://github.com/c-cube/lwt-pipe.git

License

permissive free software (BSD-2)

Use

A pipe can be used as a regular iterator:

  1. # #require "lwt";;
  2. # #require "lwt-pipe";;
  3. # open Lwt.Infix;;
  4. # let l = [1;2;3;4];;
  5. val l : int list = [1; 2; 3; 4]
  6. # Lwt_pipe.of_list l
  7. |> Lwt_pipe.Reader.map ~f:(fun x->x+1)
  8. |> Lwt_pipe.to_list;;
  9. - : int list = [2; 3; 4; 5]

But also as a streaming queue (here with two producers push_ints that will
put 1, 2, … 5 into the pipe, and one reader that consumes the whole pipe):

  1. # let rec push_ints p i : unit Lwt.t =
  2. if i <= 0 then Lwt.return ()
  3. else Lwt_pipe.write_exn p i >>= fun () -> push_ints p (i-1) ;;
  4. val push_ints : (int, [< `r | `w > `w ]) Lwt_pipe.t -> int -> unit Lwt.t =
  5. <fun>
  6. # let reader =
  7. let p = Lwt_pipe.create ~max_size:3 () in
  8. let t1 = push_ints p 5
  9. and t2 = push_ints p 5
  10. and t_read = Lwt_pipe.to_list p in
  11. Lwt.join [t1;t2] >>= fun () ->
  12. Lwt_pipe.close p >>= fun () ->
  13. t_read
  14. in
  15. List.sort compare @@ Lwt_main.run reader
  16. ;;
  17. - : int list = [1; 1; 2; 2; 3; 3; 4; 4; 5; 5]

This can be expressed with higher level constructs:

  1. # let rec list_range i = if i<=0 then [] else i :: list_range (i-1);;
  2. val list_range : int -> int list = <fun>
  3. # let int_range n = Lwt_pipe.of_list @@ list_range n ;;
  4. val int_range : int -> int Lwt_pipe.Reader.t = <fun>
  5. # Lwt_main.run @@ Lwt_pipe.to_list (int_range 5);;
  6. - : int list = [5; 4; 3; 2; 1]
  7. # let reader =
  8. let p1 = int_range 6
  9. and p2 = int_range 6
  10. and p3 = int_range 6 in
  11. Lwt_pipe.to_list (Lwt_pipe.Reader.merge_all [p1;p2;p3])
  12. in
  13. List.sort compare @@ Lwt_main.run reader
  14. ;;
  15. - : int list = [1; 1; 1; 2; 2; 2; 3; 3; 3; 4; 4; 4; 5; 5; 5; 6; 6; 6]