videoflow.core package

Submodules

videoflow.core.bottlenecks module

class videoflow.core.bottlenecks.Accountant(nb_nodes)

Bases: object

Keeps track of the speed and actual speed of the nodes in the topological sort.

  • Arguments:
    • nb_nodes (int): nb of nodes in flow

get_actual_proctime()

Returns mean actual processing time of the nodes in the t-sort

  • Returns:
    • to_return: [float]

get_proctime()

Returns mean processing time of the nodes in the t-sort

  • Returns:
    • to_return: [float]

logtype_actualproctime = 'actual_proctime'
logtype_proctime = 'proctime'
stat_mean = 'mean'
stat_variance = 'variance'
update_actualproctime(node_id: int, value: float)
update_proctime(node_index: int, value: float)
update_stat(node_index: int, log_type: str, value: float)
class videoflow.core.bottlenecks.MetadataConsumer(log_folder='./')

Bases: videoflow.core.node.ConsumerNode

  • Arguments:
    • log_folder: (str) Folder where to save the logs.

close()

This method is called by the task running after finishing doing all consuming, processing or producing because of and end signal receival. Should be used to close any resources that were opened by the open() method, such as files, tensorflow sessions, etc.

consume(*metadata)
  • Arguments:
    • metadata: list of metadata for all the parent nodes for which we gather the data

get_bottlenecks()

A bottleneck is any node that process at a speed that is lower to the speed of any of the producers.

There is also what we call effective bottleneck. An effective bottleneck is a node that reduces the throughput of the flow.

  • Returns:
    • is_bottleneck: list of booleans of the same size as self._parents. Contains None entries if data

      is not statistically significant to be able to judge if a node is a bottleneck.

    • is_effective_bottleneck: list of booleans of the same size as self._parents. Contains None entries if data

      is not statistically significant to be able to judge if a node is a bottleneck.

open()

This method is called by the task runner before doing any consuming, processing or producing. Should be used to open any resources that will be needed during the life of the task, such as opening files, tensorflow sessions, etc.

report_bottlenecks()
class videoflow.core.bottlenecks.Metric(name='')

Bases: object

Computes the mean and average of a series in an online manner. - Arguments:

  • name (str): metric name

mean

Returns the mean of the series up to this point

name
update_stats(new_value: float)

Computes the mean and std of the series in an online manner using Welford’s online algorithm: See: https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford’s_online_algorithm

  • Arguments:
    • new_value: (float)

variance

Returns the variance of the series up to this point

class videoflow.core.bottlenecks.MetricMessage(nodeid, logtype, value)

Bases: tuple

logtype

Alias for field number 1

nodeid

Alias for field number 0

value

Alias for field number 2

class videoflow.core.bottlenecks.MetricsLogger(log_folder='./')

Bases: object

log(node_id: str, log_type: str, value: float)

videoflow.core.constants module

videoflow.core.engine module

class videoflow.core.engine.ExecutionEngine

Bases: object

Defines the interface of the execution environment

allocate_and_run_tasks(tasks_data)

Defines a template with the order of methods that need to run in order to allocate and run tasks. How those methods are implemented corresponds to subclasses of this class that implement different execution environments.

  • Arguments:
    • tasks_data: list of tuples. The list is of the form [(node : Node, node_index : int, parent_index : int, has_children : bool)]

join_task_processes()

Blocking method. It is supposed to make the calling process sleep until all task processes have finished processing.

signal_flow_termination()

Signals the execution environment that the flow needs to stop. When this signal is received, all consumer tasks will pick it on and pass it together with the flow until they reach every task in the graph and everyone stops working.

class videoflow.core.engine.Messenger

Bases: object

Not Utility class that tasks use to receive input and write output. The videoflow.core.task.Task class knows what are the graph nodes from where it receives computation outputs, and knows what are the graph nodes that depend on its computation, but is oblivious of how to communicate with them. The messenger, which is tightly coupled with the execution environment being used, knows how to do this for the task.

check_for_termination() → bool

Returns true if a flow termination signal has been received. Used by videoflow.core.task.ProducerTask.

passthrough_message()

Used when the task has received a message that might be needed by another task below it, but when the task itself does not produces output needed by tasks below it. (i.e.: videoflow.core.task.ConsumerTask)

Depending on the kind of environment, this method might drop the message if the receiving container (usually a queue) is full.

passthrough_termination_message()

Same as passthrough_message, but this method will never drop the message regardless of environment, which means that sometimes this method might block until it can deliver the message.

publish_message(message, metadata=None)

Publishes output message to a place where the child task will receive it. Depending on the kind of environment, this method might drop the message if the receiving container (usually a queue) is full.

publish_termination_message(message, metadata=None)

Similar to publish_message, but this method will never drop the message regardless of environment, which means that sometimes this method might block until it can deliver the message.

receive_message()

This method blocks. It waits until a message has been received.

  • Returns:
    • message: the message received from parent task in topological sort.

receive_message_and_metadata()

This method blocks. It waits until a message has been received.

  • Returns:
    • metadata: the metadata received from parent task in topological sort.

receive_metadata()

This method blocks. It waits until a message (with its corresponding metadata) has been received.

  • Returns:
    • metadata: the metadata received from parent task in topological sort.

videoflow.core.flow module

class videoflow.core.flow.Flow(producers, consumers, flow_type='realtime')

Bases: object

Represents a linear flow of data from one task to another. Note that a flow is created from a directed acyclic graph of producer, processor and consumer nodes, but the flow itself is linear, because it is an optimized topological sort of the directed acyclic graph.

  • Arguments:
    • producers: a list of producer nodes of type videoflow.core.node.ProducerNode.

    • consumers: a list of consumer nodes of type videoflow.core.node.ConsumerNode.

    • flow_type: one of ‘realtime’ or ‘batch’

join()

Blocking method. Will make the process that calls this method block until the flow finishes running naturally.

run()

Simple documentation: It starts the flow.

More complex documentation:

  1. It creates a topological sort of the nodes in the computation graph, and wraps each node around a videoflow.core.task.Task

  2. It passes the tasks to the environment, which allocates them and creates the channels that will be used for communication between tasks. Tasks themselves do not know where this channels are, but the environment assigns a messenger to each task that knows how to communicate in those channels.

stop()

Blocking method. Stops the flow. Makes the execution environment send a flow termination signal.

videoflow.core.graph module

class videoflow.core.graph.GraphEngine(producers, consumers)

Bases: object

topological_sort()

videoflow.core.node module

class videoflow.core.node.ConsumerNode(metadata=False, **kwargs)

Bases: videoflow.core.node.Leaf

  • Arguments:
    • metadata (boolean): By default is False. If True, instead of receiving output of parent nodes, receives metadata produced by parent nodes.

consume(item)

Method definition that needs to be implemented by subclasses.

  • Arguments:
    • item: the item being received as input (or consumed).

metadata
class videoflow.core.node.FunctionProcessorNode(processor_function, nb_proc: int = 1, device='cpu', **kwargs)

Bases: videoflow.core.node.ProcessorNode

process(inp)

Method definition that needs to be implemented by subclasses.

  • Arguments:
    • inp: object or list of objects being received for processing from parent nodes.

  • Returns:
    • the output being consumed by child nodes.

class videoflow.core.node.Leaf(*args, **kwargs)

Bases: videoflow.core.node.Node

Node with no children.

class videoflow.core.node.ModuleNode(entry_node: videoflow.core.node.Node, exit_node: videoflow.core.node.Node, *args, **kwargs)

Bases: videoflow.core.node.Node

Module node that wraps a subgraph of computation. Each node of the Module must be a ProcessorNode or a ModuleNode itself. For simplicity, a module node has exaclty one node as entry point, and exactly one node as exit point. If for some reason a ModuleNode has flag one_process set to True:

  • Then any module within the subgraph must also be of that type, or an exception will be thrown.

  • No process inside the module can be allocated to a gpu, or an exception will be thrown

  • Arguments:
    • entry_node (Node): The node that sits at the top of the subgraph

    • exit_node (Node): The node that sits at the top of the subgraph

  • Raises:
    • ValueError if:
      • There is at least one node in the sequence that is not instance of ProcessorNode or of ModuleNode

      • There is a cycle in the subgraph

      • The exit_node is not reachable from the entry_node

      • The flag one_process is set to True, and any of the following conditions is true:
        • There is a ModuleNode within the subgraph that does not have that flag set to true too.

        • There is at least one node in the sequence that has device_type GPU

nodes
class videoflow.core.node.Node(name=None)

Bases: object

Represents a computational node in the graph. It is also a callable object. It can be call with the list of parents on which it depends.

add_child(child)

Adds child to the set of childs that depend on it.

children

Returns a set of the child nodes

close()

This method is called by the task running after finishing doing all consuming, processing or producing because of and end signal receival. Should be used to close any resources that were opened by the open() method, such as files, tensorflow sessions, etc.

id

The id of the node. In this case the id of the node is produced by calling id(self).

open()

This method is called by the task runner before doing any consuming, processing or producing. Should be used to open any resources that will be needed during the life of the task, such as opening files, tensorflow sessions, etc.

parents

Returns a list with the parent nodes

remove_child(child)
class videoflow.core.node.OneTaskProcessorNode(*args, **kwargs)

Bases: videoflow.core.node.ProcessorNode

Used for processes that keep internal state so they are easily parallelizable. The main use of this class if for processes that can only run one task, such as trackers and aggregators.

class videoflow.core.node.ProcessorNode(nb_tasks: int = 1, device_type='cpu', **kwargs)

Bases: videoflow.core.node.Node

change_device(device_type)
device_type

Returns the preferred device type to use to run the processor’s code

nb_tasks

Returns the number of tasks to allocate to this processor

process(inp: any) → any

Method definition that needs to be implemented by subclasses.

  • Arguments:
    • inp: object or list of objects being received for processing from parent nodes.

  • Returns:
    • the output being consumed by child nodes.

class videoflow.core.node.ProducerNode(*args, **kwargs)

Bases: videoflow.core.node.Node

The producer node does not receive input, and produces input. Each time the next() method is called, it produces a new input.

It would have been more natural to implement the ProducerNode as a generator, but generators cannot be pickled, and hence you cannot easily work with generators in a multiprocessing setting.

next() → any

Returns next produced element.

Raises StopIteration after the last element has been produced and a call to self.next happens.

class videoflow.core.node.TaskModuleNode(entry_node: videoflow.core.node.ProcessorNode, exit_node: videoflow.core.node.ProcessorNode, nb_tasks=1, **kwargs)

Bases: videoflow.core.node.ProcessorNode

Processor node that wraps a graph of processor nodes. This has the effect that instead of allocating one task per processor node in the graph, only one task process is allocated for the entire subgraph.

  • Arguments:
    • entry_node (Node): The node that sits at the top of the subgraph

    • exit_node (Node): The node that sits at the top of the subgraph

    • nb_tasks (int) The number of parallel tasks to allocate

  • Raises:
    • ValueError if:
      • There is at least one node in the subgraph that is not instance of ProcessorNode

      • nb_tasks parameter is greater than one and there is at least one node in the sequence that derives from OneTaskProcessorNode.

      • There is at least one node in the sequence that has device_type GPU

      • The subgraph has less than one node.

      • There is a node of type TaskModuleNode among the nodes of the subgraph.

process(*inp)

Method definition that needs to be implemented by subclasses.

  • Arguments:
    • inp: object or list of objects being received for processing from parent nodes.

  • Returns:
    • the output being consumed by child nodes.

videoflow.core.processor module

class videoflow.core.processor.Processor

Bases: object

process(item)

videoflow.core.task module

class videoflow.core.task.ConsumerTask(consumer: videoflow.core.node.ConsumerNode, messenger, task_id: int, is_last: bool, parent_task_id: int)

Bases: videoflow.core.task.NodeTask

It runs forever, blocking until it receives a message from parent nodes through the messenger. It consumes the message and does not publish anything back down the pipe.

If a consumer task has tasks after it in the topological sort, it does not mean that those tasks expect any input from the consumer task. It simply means that the consumer task is a passthrough of messages.

class videoflow.core.task.MultiprocessingOutputTask(processor: videoflow.core.node.ProcessorNode, task_queue: multiprocessing.context.BaseContext.Queue, accountingQueue: multiprocessing.context.BaseContext.Queue, output_queues: [<bound method BaseContext.Queue of <multiprocessing.context.DefaultContext object at 0x1043f0208>>], flow_type: str, is_last: bool)

Bases: videoflow.core.task.MultiprocessingTask

is_last

Returns True if the task is the last one in the topological sort, otherwise returns false.

run()

Starts the task in an infinite loop.

class videoflow.core.task.MultiprocessingProcessorTask(idx: int, processor: videoflow.core.node.ProcessorNode, lock: multiprocessing.context.BaseContext.Lock, receiveQueue: multiprocessing.context.BaseContext.Queue, accountingQueue: multiprocessing.context.BaseContext.Queue, outputQueue: multiprocessing.context.BaseContext.Queue)

Bases: videoflow.core.task.MultiprocessingTask

change_device(device_type: str)
device_type
run()

Starts the task in an infinite loop.

class videoflow.core.task.MultiprocessingReceiveTask(processor: videoflow.core.node.ProcessorNode, parent_task_queue: multiprocessing.context.BaseContext.Queue, receiveQueue: multiprocessing.context.BaseContext.Queue, flow_type: str)

Bases: videoflow.core.task.MultiprocessingTask

run()

Starts the task in an infinite loop.

class videoflow.core.task.MultiprocessingTask(processor: videoflow.core.node.ProcessorNode)

Bases: videoflow.core.task.Task

class videoflow.core.task.NodeTask(computation_node: videoflow.core.node.Node, messenger, tsort_id: int, is_last: bool, parent_tsort_id: int = None)

Bases: videoflow.core.task.Task

A NodeTask is a wrapper around a videoflow.core.node.Node that is able to interact with the execution environment through a messenger. Nodes receive input and/or produce output, but tasks are the ones that run in infinite loops, receiving inputs from the environment and passing them to the computation node, and taking outputs from the computation node and passing them to the environment.

  • Arguments:
    • computation_node

    • messenger (Messenger): the messenger that will communicate between nodes.

    • tsort_id (int): the position of the node in the topological sort

    • parent_tsort_id: the position of the parent node in the topological sort.

    • is_last: True if the task is the last one in the topological sort

computation_node

Returns the current computation node

id

Returns an integer as id.

is_last

Returns True if the task is the last one in the topological sort, otherwise returns false.

parent_id

Returns the id of the parent task. Id of parent task is lower than id of current task.

run()

Starts the task in an infinite loop. If this method is called and the set_messenger() method has not been called yet, an assertion error will happen.

class videoflow.core.task.ProcessorTask(processor: videoflow.core.node.ProcessorNode, messenger, task_id: int, is_last, parent_task_id: int)

Bases: videoflow.core.task.NodeTask

It runs forever, first blocking until it receives a message from parent nodes through the messenger. Then it passes it to the processor node and when it gets back the output it uses the messenger to publish it down the flow. If among the inputs it received from a parent it receives a termination message, it passes termination message down the flow and breaks from infinite loop.

change_device(device_type: str)
device_type
class videoflow.core.task.ProducerTask(producer: videoflow.core.node.ProducerNode, messenger, task_id: int, is_last=False)

Bases: videoflow.core.task.NodeTask

It runs forever calling the next() method in the producer node. At each iteration it checks for a termination signal, and if so it sends a termination message to its child task and breaks the infinite loop.

class videoflow.core.task.Task

Bases: object

run()

Starts the task in an infinite loop.