项目作者: heyoka

项目描述 :
DataFlow computations in Erlang
高级语言: Erlang
项目地址: git://github.com/heyoka/dataflow.git
创建时间: 2017-01-03T14:29:51Z
项目社区:https://github.com/heyoka/dataflow

开源协议:Apache License 2.0

下载


Archived, because dataflow is included in faxe now !

dataflow

An OTP library for DataFlow computations in Erlang.

It supports building Graph structures with custom computation nodes and running them in ‘push’ or ‘pull’ mode.

(The libary serves as a basis for a processing and analytics-framework for timeseries data.)

Build

  1. $ rebar3 compile

News

New api for building and running a computing-graph.

The graph will be a child of the graph_sup supervisor.

  1. -type graph_definition() :: #{nodes => [], edges => []}.

Nodes for the graph_definition are defined this way (see example below) :

  1. {NodeName :: any(), Callback_Module :: atom(), \[Args] :: optional}

Edges between nodes :

  1. {NodeOut :: any(), PortOut :: non_neg_integer(), NodeIn :: any(), PortIn :: non_neg_integer(), Params :: list() optional}

Create a graph with graph_definition() map

  1. -spec create_graph(any(), graph_definition()) -> {ok, pid()} | {error, Reason::any()}.
  2. {ok, Graph} = dataflow:create_graph("flow1", GraphDef).

Start the computation

  1. ok = dataflow:start_graph(Graph, push).

Example

Define 4 Nodes and connect them in a pipe fashion, start the graph in ‘push’ mode

  1. pipe2() ->
  2. pipe2("graph_p").
  3. pipe2(G) ->
  4. N1 = "p1", N2 = "p2", N3 = "p3", N4 = "p4",
  5. Nodes = [
  6. {N1, df_auto_emit},
  7. {N2, df_print},
  8. {N3, df_print, [node3_args_here]},
  9. {N4, df_print}
  10. ],
  11. Edges = [
  12. {N1, 1, N2, 1},
  13. {N2, 1, N3, 1},
  14. {N3, 1, N4, 1}
  15. ],
  16. GraphDef = #{nodes => Nodes, edges => Edges},
  17. {ok, Graph} = dataflow:create_graph(G, GraphDef),
  18. dataflow:start_graph(Graph, push),
  19. Graph.

Example without graph supervision

Define 4 Nodes and connect them in a pipe fashion, start the graph in ‘push’ mode

  1. N1 = "print1", N2 = "print2", N3 = "print3", N4 = "print4",
  2. {ok, Graph} = df_graph:start_link("graph1",[]),
  3. df_graph:add_node(Graph, N1, auto_emit, []),
  4. df_graph:add_node(Graph, N2, add, [2]),
  5. df_graph:add_node(Graph, N3, add, [3]),
  6. df_graph:add_node(Graph, N4, add, [4]),
  7. df_graph:add_edge(Graph, N1, 1, N2, 1, []),
  8. df_graph:add_edge(Graph, N2, 1, N3, 1, []),
  9. df_graph:add_edge(Graph, N3, 1, N4, 1, []),
  10. df_graph:start_graph(Graph, push).

Custom Nodes

Implement df_component behavior

  1. %%%===================================================================
  2. %%% CALLBACKS
  3. %%%===================================================================
  4. %% @doc
  5. %% INIT/3
  6. %%
  7. %% initialisation
  8. %% @end
  9. -callback init(NodeId :: term(), Inputs :: list(), Args :: term())
  10. -> {ok, auto_request(), cbstate()}.
  11. %% @doc
  12. %% PROCESS/2
  13. %%
  14. %% process value or batch incoming on specific inport
  15. %%
  16. %% return values :
  17. %%
  18. %% :: just return the state
  19. %% {ok, cbstate()}
  20. %%
  21. %% :: used to emit a value right after processing
  22. %% {emit, Port :: port(), Value :: term(), cbstate()} :: used to emit right after processing
  23. %%
  24. %% :: request a value in return of a process call
  25. %% {request, ReqPort :: port(), ReqPid :: pid(), cbstate()}
  26. %%
  27. %% :: emit and request a value
  28. %% {emit_request, OutPort :: port(), Value :: term(), ReqPort :: port(), ReqPid :: pid(), cbstate()}
  29. %%
  30. %% @end
  31. -callback process(Inport :: non_neg_integer(), Value :: #data_point{} | #data_batch{}, State :: cbstate())
  32. ->
  33. {ok, cbstate()} |
  34. {emit,
  35. { Port :: df_port(), Value :: term() }, cbstate()
  36. } |
  37. {request,
  38. { ReqPort :: df_port(), ReqPid :: pid() }, cbstate()
  39. } |
  40. {emit_request,
  41. { OutPort :: df_port(), Value :: term() }, { ReqPort :: df_port(), ReqPid :: pid() }, cbstate()
  42. } |
  43. {error, Reason :: term()}.
  44. %%%==========================================================
  45. %%% OPTIONAL CALLBACKS
  46. %%% =========================================================
  47. %% @doc
  48. %% OPTIONS/0
  49. %%
  50. %% optional
  51. %%
  52. %% retrieve options (with default values optionally) for a component
  53. %% for an optional parameter, provide Default term
  54. %%
  55. %% options with no 'Default' value, will be treated as mandatory
  56. %%
  57. -callback options() ->
  58. list(
  59. {Name :: atom(), Type :: atom(), Default :: term()} |
  60. {Name :: atom(), Type :: atom()}
  61. ).
  62. %% @doc
  63. %% INPORTS/0
  64. %%
  65. %% optional
  66. %% provide a list of inports for the component :
  67. %%
  68. -callback inports() -> {ok, list()}.
  69. %% @doc
  70. %% OUTPORTS/0
  71. %%
  72. %% optional
  73. %% provide a list of outports for the component:
  74. %%
  75. -callback outports() -> {ok, list()}.
  76. %% @doc
  77. %% HANDLE_INFO/2
  78. %%
  79. %% optional
  80. %% handle other messages that will be sent to this process :
  81. %%
  82. -callback handle_info(Request :: term(), State :: cbstate())
  83. -> {ok, NewCallbackState :: cbstate()} | {error, Reason :: term()}.
  84. %% @doc
  85. %% SHUTDOWN/1
  86. %%
  87. %% optional
  88. %% called when component process is about to stop :
  89. %%
  90. -callback shutdown(State :: cbstate()) -> any().
  91. %% -optional_callbacks([inports/0, outports/0, handle_info/2, shutdown/1]). %% erlang 18+

Test

  1. % for debug console-output, add built-in event handlers
  2. dataflow:add_debug_handler().
  3. G = df:pipe2().
  4. G ! stop.

a view more in df module

Todo

  • more tests
  • api-docs for event-handlers
  • docs for component options
  • make edoc work for df_component
  • single-run mode