videoflow.core package¶
Submodules¶
videoflow.core.bottlenecks module¶
-
class
videoflow.core.bottlenecks.
Accountant
(nb_nodes)¶ Bases:
object
Keeps track of the speed and actual speed of the nodes in the topological sort.
- Arguments:
nb_nodes (int): nb of nodes in flow
-
get_actual_proctime
()¶ Returns mean actual processing time of the nodes in the t-sort
- Returns:
to_return: [float]
-
get_proctime
()¶ Returns mean processing time of the nodes in the t-sort
- Returns:
to_return: [float]
-
logtype_actualproctime
= 'actual_proctime'¶
-
logtype_proctime
= 'proctime'¶
-
stat_mean
= 'mean'¶
-
stat_variance
= 'variance'¶
-
update_actualproctime
(node_id: int, value: float)¶
-
update_proctime
(node_index: int, value: float)¶
-
update_stat
(node_index: int, log_type: str, value: float)¶
-
class
videoflow.core.bottlenecks.
MetadataConsumer
(log_folder='./')¶ Bases:
videoflow.core.node.ConsumerNode
- Arguments:
log_folder: (str) Folder where to save the logs.
-
close
()¶ This method is called by the task running after finishing doing all consuming, processing or producing because of and end signal receival. Should be used to close any resources that were opened by the open() method, such as files, tensorflow sessions, etc.
-
consume
(*metadata)¶ - Arguments:
metadata: list of metadata for all the parent nodes for which we gather the data
-
get_bottlenecks
()¶ A bottleneck is any node that process at a speed that is lower to the speed of any of the producers.
There is also what we call
effective bottleneck
. Aneffective bottleneck
is a node that reduces the throughput of the flow.- Returns:
- is_bottleneck: list of booleans of the same size as self._parents. Contains
None
entries if data is not statistically significant to be able to judge if a node is a bottleneck.
- is_bottleneck: list of booleans of the same size as self._parents. Contains
- is_effective_bottleneck: list of booleans of the same size as self._parents. Contains
None
entries if data is not statistically significant to be able to judge if a node is a bottleneck.
- is_effective_bottleneck: list of booleans of the same size as self._parents. Contains
-
open
()¶ This method is called by the task runner before doing any consuming, processing or producing. Should be used to open any resources that will be needed during the life of the task, such as opening files, tensorflow sessions, etc.
-
report_bottlenecks
()¶
-
class
videoflow.core.bottlenecks.
Metric
(name='')¶ Bases:
object
Computes the mean and average of a series in an online manner. - Arguments:
name (str): metric name
-
mean
¶ Returns the mean of the series up to this point
-
name
¶
-
update_stats
(new_value: float)¶ Computes the mean and std of the series in an online manner using Welford’s online algorithm: See: https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford’s_online_algorithm
- Arguments:
new_value: (float)
-
variance
¶ Returns the variance of the series up to this point
videoflow.core.constants module¶
videoflow.core.engine module¶
-
class
videoflow.core.engine.
ExecutionEngine
¶ Bases:
object
Defines the interface of the execution environment
-
allocate_and_run_tasks
(tasks_data)¶ Defines a template with the order of methods that need to run in order to allocate and run tasks. How those methods are implemented corresponds to subclasses of this class that implement different execution environments.
- Arguments:
tasks_data: list of tuples. The list is of the form [(node : Node, node_index : int, parent_index : int, has_children : bool)]
-
join_task_processes
()¶ Blocking method. It is supposed to make the calling process sleep until all task processes have finished processing.
-
signal_flow_termination
()¶ Signals the execution environment that the flow needs to stop. When this signal is received, all consumer tasks will pick it on and pass it together with the flow until they reach every task in the graph and everyone stops working.
-
-
class
videoflow.core.engine.
Messenger
¶ Bases:
object
Not Utility class that tasks use to receive input and write output. The
videoflow.core.task.Task
class knows what are the graph nodes from where it receives computation outputs, and knows what are the graph nodes that depend on its computation, but is oblivious of how to communicate with them. The messenger, which is tightly coupled with the execution environment being used, knows how to do this for the task.-
check_for_termination
() → bool¶ Returns true if a flow termination signal has been received. Used by
videoflow.core.task.ProducerTask
.
-
passthrough_message
()¶ Used when the task has received a message that might be needed by another task below it, but when the task itself does not produces output needed by tasks below it. (i.e.:
videoflow.core.task.ConsumerTask
)Depending on the kind of environment, this method might drop the message if the receiving container (usually a queue) is full.
-
passthrough_termination_message
()¶ Same as
passthrough_message
, but this method will never drop the message regardless of environment, which means that sometimes this method might block until it can deliver the message.
-
publish_message
(message, metadata=None)¶ Publishes output message to a place where the child task will receive it. Depending on the kind of environment, this method might drop the message if the receiving container (usually a queue) is full.
-
publish_termination_message
(message, metadata=None)¶ Similar to
publish_message
, but this method will never drop the message regardless of environment, which means that sometimes this method might block until it can deliver the message.
-
receive_message
()¶ This method blocks. It waits until a message has been received.
- Returns:
message: the message received from parent task in topological sort.
-
receive_message_and_metadata
()¶ This method blocks. It waits until a message has been received.
- Returns:
metadata: the metadata received from parent task in topological sort.
-
receive_metadata
()¶ This method blocks. It waits until a message (with its corresponding metadata) has been received.
- Returns:
metadata: the metadata received from parent task in topological sort.
-
videoflow.core.flow module¶
-
class
videoflow.core.flow.
Flow
(producers, consumers, flow_type='realtime')¶ Bases:
object
Represents a linear flow of data from one task to another. Note that a flow is created from a directed acyclic graph of producer, processor and consumer nodes, but the flow itself is linear, because it is an optimized topological sort of the directed acyclic graph.
- Arguments:
producers: a list of producer nodes of type
videoflow.core.node.ProducerNode
.consumers: a list of consumer nodes of type
videoflow.core.node.ConsumerNode
.flow_type: one of ‘realtime’ or ‘batch’
-
join
()¶ Blocking method. Will make the process that calls this method block until the flow finishes running naturally.
-
run
()¶ Simple documentation: It starts the flow.
More complex documentation:
It creates a topological sort of the nodes in the computation graph, and wraps each node around a
videoflow.core.task.Task
It passes the tasks to the environment, which allocates them and creates the channels that will be used for communication between tasks. Tasks themselves do not know where this channels are, but the environment assigns a messenger to each task that knows how to communicate in those channels.
-
stop
()¶ Blocking method. Stops the flow. Makes the execution environment send a flow termination signal.
videoflow.core.graph module¶
videoflow.core.node module¶
-
class
videoflow.core.node.
ConsumerNode
(metadata=False, **kwargs)¶ Bases:
videoflow.core.node.Leaf
- Arguments:
metadata (boolean): By default is False. If True, instead of receiving output of parent nodes, receives metadata produced by parent nodes.
-
consume
(item)¶ Method definition that needs to be implemented by subclasses.
- Arguments:
item: the item being received as input (or consumed).
-
metadata
¶
-
class
videoflow.core.node.
FunctionProcessorNode
(processor_function, nb_proc: int = 1, device='cpu', **kwargs)¶ Bases:
videoflow.core.node.ProcessorNode
-
process
(inp)¶ Method definition that needs to be implemented by subclasses.
- Arguments:
inp: object or list of objects being received for processing from parent nodes.
- Returns:
the output being consumed by child nodes.
-
-
class
videoflow.core.node.
Leaf
(*args, **kwargs)¶ Bases:
videoflow.core.node.Node
Node with no children.
-
class
videoflow.core.node.
ModuleNode
(entry_node: videoflow.core.node.Node, exit_node: videoflow.core.node.Node, *args, **kwargs)¶ Bases:
videoflow.core.node.Node
Module node that wraps a subgraph of computation. Each node of the Module must be a
ProcessorNode
or aModuleNode
itself. For simplicity, a module node has exaclty one node as entry point, and exactly one node as exit point. If for some reason a ModuleNode has flagone_process
set toTrue
:Then any module within the subgraph must also be of that type, or an exception will be thrown.
No process inside the module can be allocated to a gpu, or an exception will be thrown
- Arguments:
entry_node (Node): The node that sits at the top of the subgraph
exit_node (Node): The node that sits at the top of the subgraph
- Raises:
ValueError
if:There is at least one node in the sequence that is not instance of
ProcessorNode
or ofModuleNode
There is a cycle in the subgraph
The
exit_node
is not reachable from theentry_node
- The flag
one_process
is set toTrue
, and any of the following conditions is true: There is a ModuleNode within the subgraph that does not have that flag set to true too.
There is at least one node in the sequence that has device_type GPU
- The flag
-
nodes
¶
-
class
videoflow.core.node.
Node
(name=None)¶ Bases:
object
Represents a computational node in the graph. It is also a callable object. It can be call with the list of parents on which it depends.
-
add_child
(child)¶ Adds child to the set of childs that depend on it.
-
children
¶ Returns a set of the child nodes
-
close
()¶ This method is called by the task running after finishing doing all consuming, processing or producing because of and end signal receival. Should be used to close any resources that were opened by the open() method, such as files, tensorflow sessions, etc.
-
id
¶ The id of the node. In this case the id of the node is produced by calling
id(self)
.
-
open
()¶ This method is called by the task runner before doing any consuming, processing or producing. Should be used to open any resources that will be needed during the life of the task, such as opening files, tensorflow sessions, etc.
-
parents
¶ Returns a list with the parent nodes
-
remove_child
(child)¶
-
-
class
videoflow.core.node.
OneTaskProcessorNode
(*args, **kwargs)¶ Bases:
videoflow.core.node.ProcessorNode
Used for processes that keep internal state so they are easily parallelizable. The main use of this class if for processes that can only run one task, such as trackers and aggregators.
-
class
videoflow.core.node.
ProcessorNode
(nb_tasks: int = 1, device_type='cpu', **kwargs)¶ Bases:
videoflow.core.node.Node
-
change_device
(device_type)¶
-
device_type
¶ Returns the preferred device type to use to run the processor’s code
-
nb_tasks
¶ Returns the number of tasks to allocate to this processor
-
process
(inp: any) → any¶ Method definition that needs to be implemented by subclasses.
- Arguments:
inp: object or list of objects being received for processing from parent nodes.
- Returns:
the output being consumed by child nodes.
-
-
class
videoflow.core.node.
ProducerNode
(*args, **kwargs)¶ Bases:
videoflow.core.node.Node
The producer node does not receive input, and produces input. Each time the
next()
method is called, it produces a new input.It would have been more natural to implement the
ProducerNode
as a generator, but generators cannot be pickled, and hence you cannot easily work with generators in a multiprocessing setting.-
next
() → any¶ Returns next produced element.
Raises
StopIteration
after the last element has been produced and a call to self.next happens.
-
-
class
videoflow.core.node.
TaskModuleNode
(entry_node: videoflow.core.node.ProcessorNode, exit_node: videoflow.core.node.ProcessorNode, nb_tasks=1, **kwargs)¶ Bases:
videoflow.core.node.ProcessorNode
Processor node that wraps a graph of processor nodes. This has the effect that instead of allocating one task per processor node in the graph, only one task process is allocated for the entire subgraph.
- Arguments:
entry_node (Node): The node that sits at the top of the subgraph
exit_node (Node): The node that sits at the top of the subgraph
nb_tasks (int) The number of parallel tasks to allocate
- Raises:
ValueError
if:There is at least one node in the subgraph that is not instance of
ProcessorNode
nb_tasks
parameter is greater than one and there is at least one node in the sequence that derives fromOneTaskProcessorNode
.There is at least one node in the sequence that has device_type GPU
The subgraph has less than one node.
There is a node of type
TaskModuleNode
among the nodes of the subgraph.
-
process
(*inp)¶ Method definition that needs to be implemented by subclasses.
- Arguments:
inp: object or list of objects being received for processing from parent nodes.
- Returns:
the output being consumed by child nodes.
videoflow.core.processor module¶
videoflow.core.task module¶
-
class
videoflow.core.task.
ConsumerTask
(consumer: videoflow.core.node.ConsumerNode, messenger, task_id: int, is_last: bool, parent_task_id: int)¶ Bases:
videoflow.core.task.NodeTask
It runs forever, blocking until it receives a message from parent nodes through the messenger. It consumes the message and does not publish anything back down the pipe.
If a consumer task has tasks after it in the topological sort, it does not mean that those tasks expect any input from the consumer task. It simply means that the consumer task is a passthrough of messages.
-
class
videoflow.core.task.
MultiprocessingOutputTask
(processor: videoflow.core.node.ProcessorNode, task_queue: multiprocessing.context.BaseContext.Queue, accountingQueue: multiprocessing.context.BaseContext.Queue, output_queues: [<bound method BaseContext.Queue of <multiprocessing.context.DefaultContext object at 0x1043f0208>>], flow_type: str, is_last: bool)¶ Bases:
videoflow.core.task.MultiprocessingTask
-
is_last
¶ Returns True if the task is the last one in the topological sort, otherwise returns false.
-
run
()¶ Starts the task in an infinite loop.
-
-
class
videoflow.core.task.
MultiprocessingProcessorTask
(idx: int, processor: videoflow.core.node.ProcessorNode, lock: multiprocessing.context.BaseContext.Lock, receiveQueue: multiprocessing.context.BaseContext.Queue, accountingQueue: multiprocessing.context.BaseContext.Queue, outputQueue: multiprocessing.context.BaseContext.Queue)¶ Bases:
videoflow.core.task.MultiprocessingTask
-
change_device
(device_type: str)¶
-
device_type
¶
-
run
()¶ Starts the task in an infinite loop.
-
-
class
videoflow.core.task.
MultiprocessingReceiveTask
(processor: videoflow.core.node.ProcessorNode, parent_task_queue: multiprocessing.context.BaseContext.Queue, receiveQueue: multiprocessing.context.BaseContext.Queue, flow_type: str)¶ Bases:
videoflow.core.task.MultiprocessingTask
-
run
()¶ Starts the task in an infinite loop.
-
-
class
videoflow.core.task.
MultiprocessingTask
(processor: videoflow.core.node.ProcessorNode)¶ Bases:
videoflow.core.task.Task
-
class
videoflow.core.task.
NodeTask
(computation_node: videoflow.core.node.Node, messenger, tsort_id: int, is_last: bool, parent_tsort_id: int = None)¶ Bases:
videoflow.core.task.Task
A
NodeTask
is a wrapper around avideoflow.core.node.Node
that is able to interact with the execution environment through a messenger. Nodes receive input and/or produce output, but tasks are the ones that run in infinite loops, receiving inputs from the environment and passing them to the computation node, and taking outputs from the computation node and passing them to the environment.- Arguments:
computation_node
messenger (Messenger): the messenger that will communicate between nodes.
tsort_id (int): the position of the node in the topological sort
parent_tsort_id: the position of the parent node in the topological sort.
is_last: True if the task is the last one in the topological sort
-
computation_node
¶ Returns the current computation node
-
id
¶ Returns an integer as id.
-
is_last
¶ Returns True if the task is the last one in the topological sort, otherwise returns false.
-
parent_id
¶ Returns the id of the parent task. Id of parent task is lower than id of current task.
-
run
()¶ Starts the task in an infinite loop. If this method is called and the
set_messenger()
method has not been called yet, an assertion error will happen.
-
class
videoflow.core.task.
ProcessorTask
(processor: videoflow.core.node.ProcessorNode, messenger, task_id: int, is_last, parent_task_id: int)¶ Bases:
videoflow.core.task.NodeTask
It runs forever, first blocking until it receives a message from parent nodes through the messenger. Then it passes it to the processor node and when it gets back the output it uses the messenger to publish it down the flow. If among the inputs it received from a parent it receives a termination message, it passes termination message down the flow and breaks from infinite loop.
-
change_device
(device_type: str)¶
-
device_type
¶
-
-
class
videoflow.core.task.
ProducerTask
(producer: videoflow.core.node.ProducerNode, messenger, task_id: int, is_last=False)¶ Bases:
videoflow.core.task.NodeTask
It runs forever calling the
next()
method in the producer node. At each iteration it checks for a termination signal, and if so it sends a termination message to its child task and breaks the infinite loop.