Writing your own components¶
One of the best features of Videoflow is that it can be easily extended. Videoflow has a good collection of components (producers, processors and consumers) that can help you quickly bootstrap a computer vision application. Sometimes the components provided are not enough because you need faster or more accurate models, or you need to implement a complex algorithm, or you are dealing with a non-standard data source.
This tutorial shows how to write your own components, and how to integrate them to the Videoflow application.
Writing producers¶
To write a custom producer, simply extend the class
videoflow.core.ProducerNode
. You must implement the next()
method,
and you may implement open()
and close()
methods that videoflow.core.ProducerNode
inherits from videoflow.core.Node
.
The open()
method will be called by Videoflow’s execution engine before the producer task
begins to run. Use it whenever to open access to resources such as file system
resources, creating Tensorflow sessions, etc.
Once the task begins to run, the task runner will continuously call the next()
method of the producer.
Each time the next()
method is called, it should return the next produced output. To indicate that
no more outputs will be produced and returned, the method must raise the StopIteration
exception.
When the task finishes to run because the producer has raised a StopIteration
exception, the Videoflow execution engine
calls the close()
method. The method should close any resources that were opened by the open()
or next()
methods.
See below a sample implementation of videoflow.producers.VideofileReader
:
import cv2
from ..core.node import ProducerNode
class VideofileReader(ProducerNode):
'''
Opens a video capture object and returns subsequent frames
from the video each time ``next`` is called
'''
def __init__(self, video_file : str, nb_frames = -1):
'''
- Arguments:
- video_file: path to video file
- nb_frames: number of frames to process. -1 means all of them
'''
self._video_file = video_file
self._video = None
self._nb_frames = nb_frames
self._frame_count = 0
super(VideofileReader, self).__init__()
def open(self):
'''
Opens the video stream
'''
if self._video is None:
self._video = cv2.VideoCapture(self._video_file)
def close(self):
'''
Releases the video stream object
'''
self._video.release()
def next(self):
'''
- Returns:
- frame: np.array of shape (h, w, 3)
- Raises:
- StopIteration: after it finishes reading the videofile \
or when it reaches the specified number of frames to \
process.
'''
if self._video.isOpened():
success, frame = self._video.read()
self._frame_count += 1
if not success or self._frame_count == self._nb_frames:
raise StopIteration()
else:
return frame
else:
raise StopIteration()
Writing processors¶
Processors are the nodes that perform computations, transformations or filtering of data. In general, processors receive data as input and return data as output.
To write your own custom processor, create a class that extends
videoflow.core.Processor
. The class must implement the process()
method,
and may implement change_device()
method and the open()
and close()
methods that videoflow.core.ProcessorNode
inherits from videoflow.core.Node
.
Read Writing producers section above for a good explanation of how to implement the open()
and
close()
methods.
Implementing the process
method¶
The number of parameters of the process
method is equal to the number of parents of the processor
in the computation graph. For example, consider implementing a processor that takes as input the
outputs of two parents, and returns as output 0 if the minimum came from the first parent, and 1
if it came from the second one. To implement it, simply do:
from ..core.node import ProcessorNode
class ComparisonProcessor(ProcessorNode):
def process(self, inp1, inp2):
if inp1 > inp2:
return 0
return 1
Notice that the order in which the inputs are received is important. At flow definition time,
be sure to pass the parents to the __call__
method in the same order that the process
method expects its inputs.
Using the GPU and the change_device
method¶
When a processor node is instantiated, the instantiator can pass a device_type
parameter to
indicate its preference of whether the process
method should be run in the cpu
or the gpu
.
As the writer of a processor, you are responsible to write code that reads this parameter and acts
accordingly. For an example, see videoflow.processors.vision.detectors.TensorflowObjectDetector
.
The Videoflow execution engine keeps track of the number of gpus in the system, and of
the number of processors in the flow that were instantiated with device_type
being gpu
(regardless of if the processor actually makes use of that parameter to allocate to a gpu or not).
At task allocation time (tasks are allocated in topological-sort order of the computation graph (notice that an
acyclic graph can have more than one valid topological sort ordering)),
if there are no gpus left, the execution engine will call the change_device
method of the processor
to change the device_type to cpu
.
If for some reason you want to force the process to run on a gpu or make the flow process fail,
you need to reimplement the change_device()
method and raise a ValueError
exception to make
the allocation fail and as a consequence, to make the entire flow execution to fail.
When to extend OneTaskProcessorNode
¶
Videoflow supports the parallelization of a processor in multiple processes. That functionality is very useful when the processor is or may become a bottleneck of the flow.
But there are certain processor nodes that for one reason or another should not be parallelized.
This usually happens if the processor node keeps an internal state.
If that is the case, the processor should subclass videoflow.core.node.OneTaskProcessorNode
.
A simple example is given below. More examples are all the subclasses of videoflow.processors.vision.trackers.BoundingBoxTracker
:
class MinAggregator(OneTaskProcessorNode):
def __init__(self):
self._min = float("inf")
super(MinAggregator, self).__init__()
def process(self, inp):
if inp < self._min:
self._min = inp
return self._min
Writing consumers¶
Consumers are the sinks of the flow. They are leafs in the computation graph, so they do not produce output, hence they are not parents to any node. A common use of consumers is to publish results to sources external to the flow, such as the file system, the command line, or a remote endpoint, etc.
To write your own custom consumer, sublcass videoflow.core.Consumer
. The child class must implement the consume()
method,
and you may implement the open()
and close()
methods that videoflow.core.ConsumerNode
inherits from videoflow.core.Node
.
The consume
method receives as parameters as many items as the number of parents the consumer
node receives input from. Notice that the order in which the inputs are received is important. At flow definition time,
be sure to pass the parents to the __call__
method of the consumer in the same order that the consume
method expects them.
See below an example of a simple consumer that writes its input to the command line:
from ..core.node import ConsumerNode
class CommandlineConsumer(ConsumerNode):
def consume(self, item):
print(item)