Advanced Flowing¶
This section describes in a more detailed way what happens behind the scenes when nodes and flows are created, and when flows are started and stopped.
Node creation and graph definition¶
Consider the following simple example that defines a linear graph with one producer, one processor and one consumer:
from videoflow.core import Flow
from videoflow.producers import IntProducer
from videoflow.processors import IdentityProcessor
from videoflow.consumers import CommandlineConsumer
A = IntProducer(0, 40, 0.1)
B = IdentityProcessor()(A)
C = CommandlineConsumer()(B)
In the case of the processor and the consumer, two calls happen: one call to the __init__
function, and another one to the __call__
function of the just created object.
The call to __init__
creates a node. The call to __call__
defines the edges between the
nodes of the graph.
For example, A = IntProducer(0, 40, 0.1)
creates node A.
B = IdentityProcessor()(producer)
creates node B and creates edge A -> B, indicating
that B takes A’s output as its input.
Calling __call__
twice in an object will raise a RuntimeError
.
Flow creation¶
A flow is created passing to it the list of producers, the list of consumers, an optional flow_type, and an optional flow_options parameter:
flow = Flow([A], [C])
When the flow is created, the constructor checks that there are no cycles in the graph, otherwise
it raises a ValueError
exception. Also, only flows with exactly one producer are supported
for now.
Note
- In the future:
Graphs with more than one producer will be supported.
flow.run() and the Execution Engine¶
Once the flow is built, when flow.run()
is called, a topological sort of the nodes in the
graph is created, and the topological sort of nodes is passed to the execution engine,
whose function is: (1) to wrap each node as a task, (2) to create queues for communication between tasks,
and (3) to allocate each task to run in an independent operating system process. If at node creation time it
was specified that more than one task (OS process) should be used for it, then more than one task is allocated
for that node.
- A flow eventually stops running after any of the following events happen:
All producers of the graph have raised an
StopIteration
exception.A
KeyboardInterruption
is received, such asCtrl-C
.flow.stop()
is explicitly called on the flow.
For any of the three cases above, the flow stops naturally: producers stop
emitting data and emit a STOP_FLOW
signal. STOP_FLOW
is propagated through
the graph in the same way the rest of the data has been propagated. Each time a task receives the
STOP_FLOW
signal, it closes any resources its corresponding node might have been using, passes
the STOP_FLOW
to its “children” (only one child, since now the graph has been linearized
into a topological sort), and stops itself from running.
Multiple producers¶
To be supported in later versions of the framework.