Source code for vuba.imio

from tqdm import tqdm
import cv2
import numpy as np
import glob
import math
import copy
from more_itertools import prepend
from natsort import natsorted, ns
import warnings

from vuba import ops


[docs]def take_first(it): """ Retrieve the first value from an iterable object. Parameters ---------- it : iterable Iterable object to retrieve the first value from. Returns ------- first : number or string or ndarray or None First value from the iterable object. Will be None if ``StopIteration`` is raised. """ it = iter(it) try: return next(it) except StopIteration: return None
[docs]def fourcc_to_string(fourcc): """ Convert fourcc integer code into codec string. Parameters ---------- fourcc : int Fourcc integer code. Returns ------- codec : str Codec string corresponding to fourcc code. """ char1 = str(chr(fourcc & 255)) char2 = str(chr((fourcc >> 8) & 255)) char3 = str(chr((fourcc >> 16) & 255)) char4 = str(chr((fourcc >> 24) & 255)) return char1 + char2 + char3 + char4
[docs]def open_video(video): """ Convenience function for creating a ``Video`` instance. Parameters ---------- video : str or cv2.VideoCapture or list. Full filename or VideoCapture object to a video (e.g. AVI or MP4), or a glob string or series of filenames to a series of individual images. Returns ------- video : Video Instance of ``Video``. release : bool True if footage was not an instance to ``Video``, else False. """ release = False if not isinstance(video, Video): video = Video(video) release = True return video, release
[docs]class Frames: """ Container for frames that is used in ``Video.read``. Parameters ---------- reader : callable Frame reader method supplied from ``Video`` class. start : int Index of first frame. stop : int Index of last frame. step : int Step size. low_memory : bool Whether to import frames into memory. grayscale : bool Whether to convert frames to grayscale. Returns ------- frames : Frames Container for frames. Notes ----- Frames are imported into memory at initiation if low_memory=False. See Also -------- Video """
[docs] def __init__(self, reader, start, stop, step, low_memory, grayscale): self._reader = reader self.idxs = (start, stop, step) self.grayscale = grayscale self.in_mem = False self._roi = None if not low_memory: self.import_to_ndarray()
[docs] def import_to_ndarray(self) -> "Frames": """ Import the declared frames into a contiguous numpy array. """ gen_fr = self._reader(*self.idxs, self.grayscale) first = take_first(gen_fr) self.ndarray = np.ascontiguousarray( np.empty((len(self), *first.shape), dtype="uint8") ) gen_fr = prepend(first, gen_fr) print("Importing frames into memory...") pg = tqdm(len(self)) for i, frame in enumerate(gen_fr): self.ndarray[i, :] = frame[:] pg.update(1) pg.close() self.in_mem = True
[docs] def __len__(self) -> "Frames": """ Retrieve the length of the provided frames without having to iterate across them. Returns ------- length : int Length of the provided frames. """ (start, stop, step) = self.idxs return math.floor((stop - start) / step)
[docs] def __iter__(self) -> "Frames": """ Retrieve the frames declared at initiation. Returns ------- frames : generator Generator that supplies frames. """ if self.in_mem: for frame in self.ndarray: yield frame else: for frame in self._reader(*self.idxs, self.grayscale): yield frame
[docs]class Video: """ Wrapper around various image readers that provides a simple API to achieve the same functions regardless of format. Parameters ---------- footage : str or ``cv2.VideoCapture`` or list. Full filename or VideoCapture object to a video (e.g. AVI or MP4), or a glob string or series of filenames to a series of individual images. Note that series of all individual filenames will be sorted prior to being read if a glob string is supplied. Returns ------- video : Video Video handler for footage supplied. See Also -------- open_video Frames Writer Examples -------- For example usage of this handler please see example scripts located at ``examples/reading``. """
[docs] def __init__(self, footage): def open_videocv(footage): self.video = footage self.video_release = False if not isinstance(self.video, cv2.VideoCapture): self.video = cv2.VideoCapture(self.video) self.video_release = True self.filenames = None if isinstance(footage, list): self.filenames = footage self.video = self.video_release = None else: try: # Somewhat hacky but provides consistent behaviour if "*" in footage: files = glob.glob(footage) self.filenames = natsorted(files, alg=ns.IGNORECASE) self.video = self.video_release = None else: open_videocv(footage) except TypeError: open_videocv(footage) except: raise self._grab_info()
[docs] def __len__(self) -> "Video": """ Retrieve the length of the provided footage without having to iterate across it. Returns ------- length : int Length of the provided footage. """ if self.video: return int(self.video.get(cv2.CAP_PROP_FRAME_COUNT)) else: return len(self.filenames)
def _grab_info(self): """ Grab summary footage information. This method is called on initiation of ``Video``. """ if self.video: self.fps = round(self.video.get(cv2.CAP_PROP_FPS)) width = self.video.get(cv2.CAP_PROP_FRAME_WIDTH) height = self.video.get(cv2.CAP_PROP_FRAME_HEIGHT) self.resolution = (self.width, self.height) = tuple( map(round, (width, height)) ) self.fourcc_code = int(self.video.get(cv2.CAP_PROP_FOURCC)) self.codec = fourcc_to_string(self.fourcc_code) else: first_frame = self.read(0) width = first_frame.shape[1] height = first_frame.shape[0] self.resolution = (self.width, self.height) = tuple( map(round, (width, height)) ) self.fps = self.codec = self.fourcc_code = None
[docs] def read(self, *args, **kwargs) -> "Video": """ Read single or multiple frames from the provided footage. Note that reading multiple frames follows slice behaviour, whereby there is a 'start', 'stop' and 'step'. However steps will only give a performance uplift for reading images stored as individual files (e.g. multiple pngs). If you are reading from videos, the read time will be the same as if you were reading all the frames between 'start' and 'stop'. This is because it is faster to pass on frames that don't match the step size, than it is to repeatedly locate the correct frames by index in the video file. Parameters ---------- * For single frames: index : int Index of frame in footage to read. grayscale : bool Whether to convert frame to grayscale. * For multiple frames: start : int Index of frame to start reading from. stop : int Index of frame to stop reading at. step : int Step size. low_memory : bool Whether to import frames into RAM, default is True i.e. not to. grayscale Whether to convert frames to grayscale. Returns ------- For single frames: frame : ndarray Frame at the given index. For multiple frames: frames : Frames Container that will either supply the frames from an ndarray or from a generator. Note that this container contains both a len and iter method. For the latter, a new generator is created upon calling the method if the frames have not been imported into memory. This is to maintain implementation parity with the in-memory container. Raises ------ ValueError If the supplied indices are not within the available range of the selected footage, or if there is a missing required argument. Examples -------- For example usage of this method please see example scripts located at ``examples/reading``. """ try: frame = self._read_single(*args, **kwargs) return frame except TypeError: args = self._prep_args_kwargs(*args, **kwargs) frames = Frames(self._read_multi, *args) return frames except: raise
def _prep_args_kwargs( self, start=None, stop=None, step=1, low_memory=True, grayscale=False ): """ Prep any args and kwargs supplied from Video.read(). """ def check_indexes(index): if index is not None: if index < 0 or index > len(self): raise IndexError( "Indices out of available range for provided footage." ) else: raise ValueError(f"Missing required argument.") check_indexes(start) check_indexes(stop) return (start, stop, step, low_memory, grayscale) def _read_single(self, index, grayscale=False): """ Read a single frame at a given location in the requested footage. """ if self.video: self.video.set(1, index) success, frame = self.video.read() else: frame = cv2.imread(self.filenames[index]) if grayscale: frame = ops.gray(frame) return frame def _read_multi(self, start, stop, step, grayscale): """ Read multiple frames from the requested footage at the given indices. """ if self.video: self.video.set(1, start) if step: if step > 1: # Floor is used here to stop out-of-range indices being created number = math.floor((stop - start) / step) frames_to_yield = [round((step * n) + start) for n in range(number)] steps = True else: steps = False for fr in range(start, stop): success, frame = self.video.read() if not success: break if grayscale: frame = ops.gray(frame) if steps: if fr in frames_to_yield: yield frame else: pass else: yield frame else: for fn in self.filenames[slice(start, stop, step)]: frame = cv2.imread(fn) if grayscale: frame = ops.gray(frame) yield frame
[docs] def close(self) -> "Video": """ Close/teardown attached video handlers. """ if self.video: self.video.release() elif self.filenames: self.filenames = None
[docs]class Writer: """ Create an encoder for exporting frames at a given output. Regardless of input format, this encoder can encode footage to both individual images and to video files. For encoding to individual images, a sequence of filenames needs to be supplied that will match the length of the number of frames that need to be encoded. Conversely, for encoding to a video file, a filename to the video file needs to be provided. Parameters ---------- output : str or list Output to export frames. footage : Video Instance of Video to create writer based on. If this is not supplied then you must supply all encoder specific arguments. fps : float Framerate to export footage at, default is the fps of the video supplied. Note that this argument must be supplied if working with individual images. resolution : tuple Width and height to export footage at (both must be supplied as integers). Default is the resolution of the footage supplied. codec : str Codec to encode footage with. Default is to encode with MJPG codec or that of the Video instance supplied (if any). grayscale : bool Whether to export footage as grayscale. Default is False. Returns ------- encoder : Writer Encoder to export frames to a given output. Raises ------ ValueError If a frame-rate is not supplied when encoding to a movie using individual images. See Also -------- Video Examples -------- For example usage of this handler please see example scripts located at ``examples/writing``. """
[docs] def __init__( self, output, footage=None, fps=None, resolution=None, codec=None, grayscale=False, ): self.footage_release = None if footage: footage, self.footage_release = open_video(footage) self.is_color = True if grayscale: self.is_color = False self.resolution = resolution if not resolution: self.resolution = footage.resolution self.encoder_release = False if isinstance(output, str): if not codec: if footage: if footage.video: codec = footage.codec else: codec = "MJPG" else: codec = "MJPG" fourcc = cv2.VideoWriter_fourcc(*codec) if not fps: if footage: if footage.fps: fps = footage.fps else: raise ValueError( "When working with individual images you must supply a frame-rate to export footage at." ) else: raise ValueError( "Integer must be supplied to fps when an instance of vuba.Video is not supplied." ) self.encoder = cv2.VideoWriter( output, fourcc, fps, self.resolution, self.is_color ) self.encoder_release = True elif isinstance(output, list): self.encoder = iter(output) self.fps = self.fourcc = None
[docs] def write(self, frame) -> "Writer": """ Write a frame using the declared encoder. Parameters ---------- frame : ndarray Frame to export. Warns ----- UserWarning If the supplied frame is not of the correct resolution or colour space, and to notify the user that the frame has been correctly converted so encoding is successful. Notes ----- For encoding to video, note that if the frames supplied for encoding are not at the same resolution as that set for the encoder upon initiation, the frames will be resized accordingly. Also note that frames will be converted to the colour space set at initiation if they are not the same. These features are built-in so that a complete movie is always created, rather than an empty container which is not of much use for debugging a failed component of a script. Examples -------- For example usage of this method please see example scripts located at ``examples/writing``. """ if isinstance(self.encoder, cv2.VideoWriter): size = (frame.shape[1], frame.shape[0]) if size != self.resolution: frame = cv2.resize(frame, self.resolution) warnings.warn( f"Frame has been resized to ensure successful encoding. Supplied frame vs encoder resolution: {size} , {self.resolution}", UserWarning, ) channels = len(frame.shape) if channels > 2 and not self.is_color: frame = ops.gray(frame) warnings.warn( f"Frame has been grayscaled to ensure successful encoding. Supplied frame vs encoder format: {channels} , {2, self.is_color}", UserWarning, ) elif channels == 2 and self.is_color: frame = ops.bgr(frame) warnings.warn( f"Frame has been converted to BGR format to ensure successful encoding. Supplied frame vs encoder format: {channels} , {3, self.is_color}", UserWarning, ) self.encoder.write(frame) else: cv2.imwrite(next(self.encoder), frame)
[docs] def close(self) -> "Writer": """ Close/teardown declared encoders. """ if self.encoder_release: self.encoder.release() if self.footage_release: self.footage.close()