Writing your own components =========================== One of the best features of **Videoflow** is that it can be easily extended. Videoflow has a good collection of components (producers, processors and consumers) that can help you quickly bootstrap a computer vision application. Sometimes the components provided are not enough because you need faster or more accurate models, or you need to implement a complex algorithm, or you are dealing with a non-standard data source. This tutorial shows how to write your own components, and how to integrate them to the **Videoflow** application. Writing producers ----------------- To write a custom producer, simply extend the class ``videoflow.core.ProducerNode``. You must implement the ``next()`` method, and you may implement ``open()`` and ``close()`` methods that ``videoflow.core.ProducerNode`` inherits from ``videoflow.core.Node``. The ``open()`` method will be called by **Videoflow**'s execution engine before the producer task begins to run. Use it whenever to open access to resources such as file system resources, creating Tensorflow sessions, etc. Once the task begins to run, the task runner will continuously call the ``next()`` method of the producer. Each time the ``next()`` method is called, it should return the next produced output. To indicate that no more outputs will be produced and returned, the method must raise the ``StopIteration`` exception. When the task finishes to run because the producer has raised a ``StopIteration`` exception, the **Videoflow** execution engine calls the ``close()`` method. The method should close any resources that were opened by the ``open()`` or ``next()`` methods. See below a sample implementation of ``videoflow.producers.VideofileReader``:: import cv2 from ..core.node import ProducerNode class VideofileReader(ProducerNode): ''' Opens a video capture object and returns subsequent frames from the video each time ``next`` is called ''' def __init__(self, video_file : str, nb_frames = -1): ''' - Arguments: - video_file: path to video file - nb_frames: number of frames to process. -1 means all of them ''' self._video_file = video_file self._video = None self._nb_frames = nb_frames self._frame_count = 0 super(VideofileReader, self).__init__() def open(self): ''' Opens the video stream ''' if self._video is None: self._video = cv2.VideoCapture(self._video_file) def close(self): ''' Releases the video stream object ''' self._video.release() def next(self): ''' - Returns: - frame: np.array of shape (h, w, 3) - Raises: - StopIteration: after it finishes reading the videofile \ or when it reaches the specified number of frames to \ process. ''' if self._video.isOpened(): success, frame = self._video.read() self._frame_count += 1 if not success or self._frame_count == self._nb_frames: raise StopIteration() else: return frame else: raise StopIteration() Writing processors ------------------ Processors are the nodes that perform computations, transformations or filtering of data. In general, processors receive data as input and return data as output. To write your own custom processor, create a class that extends ``videoflow.core.Processor``. The class must implement the ``process()`` method, and may implement ``change_device()`` method and the ``open()`` and ``close()`` methods that ``videoflow.core.ProcessorNode`` inherits from ``videoflow.core.Node``. Read **Writing producers** section above for a good explanation of how to implement the ``open()`` and ``close()`` methods. Implementing the ``process`` method ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The number of parameters of the ``process`` method is equal to the number of parents of the **processor** in the computation graph. For example, consider implementing a processor that takes as input the outputs of two parents, and returns as output `0` if the minimum came from the first parent, and `1` if it came from the second one. To implement it, simply do:: from ..core.node import ProcessorNode class ComparisonProcessor(ProcessorNode): def process(self, inp1, inp2): if inp1 > inp2: return 0 return 1 Notice that the order in which the inputs are received is important. At flow definition time, be sure to pass the parents to the ``__call__`` method in the same order that the ``process`` method expects its inputs. Using the GPU and the ``change_device`` method ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ When a processor node is instantiated, the instantiator can pass a ``device_type`` parameter to indicate its preference of whether the ``process`` method should be run in the ``cpu`` or the ``gpu``. As the writer of a processor, you are responsible to write code that reads this parameter and acts accordingly. For an example, see ``videoflow.processors.vision.detectors.TensorflowObjectDetector``. The **Videoflow** execution engine keeps track of the number of gpus in the system, and of the number of processors in the flow that were instantiated with ``device_type`` being ``gpu`` (regardless of if the processor actually makes use of that parameter to allocate to a gpu or not). At task allocation time (tasks are allocated in topological-sort order of the computation graph (notice that an acyclic graph can have more than one valid topological sort ordering)), if there are no gpus left, the execution engine will call the ``change_device`` method of the **processor** to change the device_type to ``cpu``. If for some reason you want to force the process to run on a gpu or make the flow process fail, you need to reimplement the ``change_device()`` method and raise a ``ValueError`` exception to make the allocation fail and as a consequence, to make the entire flow execution to fail. When to extend ``OneTaskProcessorNode`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ **Videoflow** supports the parallelization of a **processor** in multiple processes. That functionality is very useful when the **processor** is or may become a bottleneck of the flow. But there are certain **processor** nodes that for one reason or another should not be parallelized. This usually happens if the **processor** node keeps an internal state. If that is the case, the **processor** should subclass ``videoflow.core.node.OneTaskProcessorNode``. A simple example is given below. More examples are all the subclasses of ``videoflow.processors.vision.trackers.BoundingBoxTracker``:: class MinAggregator(OneTaskProcessorNode): def __init__(self): self._min = float("inf") super(MinAggregator, self).__init__() def process(self, inp): if inp < self._min: self._min = inp return self._min Writing consumers ----------------- Consumers are the sinks of the flow. They are leafs in the computation graph, so they do not produce output, hence they are not parents to any node. A common use of consumers is to publish results to sources external to the flow, such as the file system, the command line, or a remote endpoint, etc. To write your own custom consumer, sublcass ``videoflow.core.Consumer``. The child class must implement the ``consume()`` method, and you may implement the ``open()`` and ``close()`` methods that ``videoflow.core.ConsumerNode`` inherits from ``videoflow.core.Node``. The ``consume`` method receives as parameters as many items as the number of parents the consumer node receives input from. Notice that the order in which the inputs are received is important. At flow definition time, be sure to pass the parents to the ``__call__`` method of the **consumer** in the same order that the ``consume`` method expects them. See below an example of a simple consumer that writes its input to the command line:: from ..core.node import ConsumerNode class CommandlineConsumer(ConsumerNode): def consume(self, item): print(item)