Skip to content

Instantly share code, notes, and snippets.

@MahmoudAshraf97
Created December 15, 2022 23:43
Show Gist options
  • Select an option

  • Save MahmoudAshraf97/ad0d99582312746b5cfab969806bf487 to your computer and use it in GitHub Desktop.

Select an option

Save MahmoudAshraf97/ad0d99582312746b5cfab969806bf487 to your computer and use it in GitHub Desktop.
import torch
from detectron2.config import get_cfg
from detectron2 import model_zoo
from detectron2.engine import DefaultPredictor
import numpy as np
import cv2
from datetime import datetime
import skvideo.io
import json
from torchvision.transforms import Compose, Lambda
from torchvision.transforms._transforms_video import (
CenterCropVideo,
NormalizeVideo,
)
from pytorchvideo.transforms import (
ShortSideScale,
UniformTemporalSubsample,
)
from pytorchvideo.models.hub.vision_transformers import mvit_base_32x3
from pytorchvideo.models.hub.slowfast import slowfast_r50
from typing import Union
class RecognizerModel:
"""
Implement action detection based on an action recognition model
"""
def __init__(self, model_name: str = 'slowfast', person_bbox_threshold: float = 0.2, device: str = 'cpu'):
"""
@param model_name: specify model to use either 'slowfast' or 'mvit'
@param person_bbox_threshold: minimum confidence threshold for person bounding boxes
@param device: device to use either 'cpu' or 'cuda'
"""
# load pretrained model and configure corresponding transforms in self._transform
if "slowfast" in model_name:
self._model = slowfast_r50(pretrained=True)
self._create_slowfast_transform()
elif "mvit" in model_name:
self._model = mvit_base_32x3(pretrained=True)
self._create_mvit_transform()
else:
raise Exception(f'Invalid model name {model_name}')
# set model to evaluation mode and move it to desired device
self._model = self._model.to(device).eval()
# load label map from json file
self._load_label_map()
self._device = device
self._person_bbox_threshold = person_bbox_threshold
self._model_name = model_name
# load detectron2 person detector model and store it in self._person_predictor
self._load_detectron2()
# store the post_act function that will be used for inference
self._post_act = torch.nn.Softmax(dim=1)
self._video_data = None
self._preds = None
self._clip_no = -1
self._timestamp = datetime.now()
with open('label_map.txt') as f:
self._classes = [line.rstrip() for line in f]
def _load_label_map(self, path: str = "models/kinetics_classnames.json"):
"""
load kinetics-400 label map
@param path: path to json file containing the kinetics-400 labels
@return: None
"""
with open(path, "r") as f: # load json file
kinetics_classnames = json.load(f)
# Create an id to label name mapping
kinetics_id_to_classname = {}
for k, v in kinetics_classnames.items():
kinetics_id_to_classname[v] = str(k).replace('"', "")
self._kinetics_id_to_classname = kinetics_id_to_classname
important_actions = ["eating", "tasting food"] # define eating actions
# Store a list of IDs for all eating classes
self._important_actions = [k for k, v in kinetics_id_to_classname.items()]
def _load_detectron2(self):
"""
load the detectron2 person detector model
return: None
"""
cfg = get_cfg()
# cfg.merge_from_file(model_zoo.get_config_file("COCO-Detection/faster_rcnn_R_50_FPN_3x.yaml"))
cfg.merge_from_file(model_zoo.get_config_file("COCO-Detection/faster_rcnn_X_101_32x8d_FPN_3x.yaml"))
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = self._person_bbox_threshold # set threshold for this model
# cfg.MODEL.WEIGHTS = model_zoo.get_checkpoint_url("COCO-Detection/faster_rcnn_R_50_FPN_3x.yaml")
cfg.MODEL.WEIGHTS = model_zoo.get_checkpoint_url("COCO-Detection/faster_rcnn_X_101_32x8d_FPN_3x.yaml")
cfg.MODEL.DEVICE = self._device
self._person_predictor = DefaultPredictor(cfg)
def _get_person_bboxes(self) -> np.ndarray:
"""
generate bounding boxes for people in self._video_data using self._person_predictor
return: predicted_boxes np.ndarray [[x_1, y_1, x_2, y_2], [x_1, y_1, x_2, y_2], ...]
"""
# key_frame used to predict bboxes is the frame at the middle of the clip
key_frame = self._video_data[:, self._video_data.shape[1] // 2, :, :]
key_frame = key_frame.permute(1, 2, 0)
predictions = self._person_predictor(key_frame.cpu().detach().numpy())['instances'].to('cpu')
boxes = predictions.pred_boxes if predictions.has("pred_boxes") else None
scores = predictions.scores if predictions.has("scores") else None
classes = np.array(predictions.pred_classes.tolist() if predictions.has("pred_classes") else None)
predicted_boxes = boxes[np.logical_and(classes == 0, scores > self._person_bbox_threshold)].tensor.cpu()
predicted_boxes = predicted_boxes.round().numpy().astype(int)
return predicted_boxes
def _create_slowfast_transform(self):
"""
create the slowfast video transform and store it in self._transform
@return: None
"""
class PackPathway(torch.nn.Module):
"""
Transform for converting video frames as a list of tensors.
"""
def __init__(self):
super().__init__()
def forward(self, frames: torch.Tensor):
fast_pathway = frames
# Perform temporal sampling from the fast pathway.
slow_pathway = torch.index_select(
frames,
1,
torch.linspace(
0, frames.shape[1] - 1, frames.shape[1] // 4 # 4 = slowfast alpha
).long(),
)
frame_list = [slow_pathway, fast_pathway]
return frame_list
side_size = 256
mean = [0.45, 0.45, 0.45]
std = [0.225, 0.225, 0.225]
crop_size = 256
num_frames = 32
self._transform = Compose(
[
UniformTemporalSubsample(num_frames),
Lambda(lambda x: x / 255.0),
NormalizeVideo(mean, std),
ShortSideScale(
size=side_size
),
CenterCropVideo(crop_size),
PackPathway()
]
)
def _create_mvit_transform(self):
"""
create the mvit video transform and store it in self._transform
@return: None
"""
side_size = 256
mean = [0.45, 0.45, 0.45]
std = [0.225, 0.225, 0.225]
crop_size = 224
num_frames = 32
self._transform = Compose(
[
UniformTemporalSubsample(num_frames),
Lambda(lambda x: x / 255.0),
NormalizeVideo(mean, std),
ShortSideScale(
size=side_size
),
CenterCropVideo(crop_size),
]
)
def _load_video(self, video_path: str):
"""
load video from video_path into pytorch tensor
:param video_path: path of video to load
:return: torch.Tensor [C, T, H, W] where T is the number of frames
"""
video_data = skvideo.io.vread(video_path) # load video to np.ndarray
video_data = np.einsum('klij->jkli', video_data) # reorder ndarray dimensions to match pytorch
return torch.from_numpy(video_data) # convert ndarray to pytorch tensor
def _crop_person(self, person_bbox: np.ndarray, output_video=False,
video_name: str = "person.mp4"):
"""
Crop a person given by person_bbox from self._video_data
@param person_bbox: 1x4 numpy ndarray for person bbox in the form [x1, y1, x2, y2]
@param output_video: Whether to write the cropped video for visualization purposes
@param video_name: Name of output video, unused if output_video is False
@return: cropped_video torch.Tensor [3, num_frames, height, width]
"""
x1, y1, x2, y2 = person_bbox[0], person_bbox[1], person_bbox[2], person_bbox[3]
person_data = self._video_data[:, :, y1:y2, x1:x2] # slice person from video tensor
if output_video:
person_data_np = person_data.numpy()
# save cropped video
skvideo.io.vwrite(video_name, np.einsum('klij->lijk', person_data_np))
return person_data
def _get_top_k(self, k=5):
"""
Get top k classes from self._preds
@param k: number of top k classes to return
@return: dict of top k classes in descending order of confidence {"action1": conf1, "action2": conf2,...}
"""
top_scores, top_classes = torch.topk(self._preds, k=k)
top_scores = top_scores.tolist()[0]
top_classes = list(map(self._kinetics_id_to_classname.get, top_classes[0].tolist()))
return dict(zip(top_classes, top_scores))
def _process_preds(self):
"""
process self._preds to combine classes of interest
@return: sorted dict of classes {"eating": conf1, "drinking": conf2, "smoking": conf3}
"""
predictions = dict(zip(self._classes, self._preds[0].tolist()))
# sort dict by confidence before returning it
return {k: v for k, v in sorted(predictions.items(), key=lambda item: item[1], reverse=True)}
def _draw_bboxes(self, video_name: str, predicted_boxes: np.ndarray, labels, box_color=(0, 114, 110),
text_background_color=(0, 114, 110), font_color=(255, 255, 255)):
"""
Draw bounding boxes around detected persons in video then write this video to disk
@param video_name: name of video to save
@param predicted_boxes: predicted_boxes np.ndarray [[x_1, y_1, x_2, y_2], [x_1, y_1, x_2, y_2], ...]
@param labels: list of strings containing label for each bounding box
@return: None
"""
def get_optimal_font_scale(text, width):
for scale in reversed(range(0, 60, 1)):
text_size = cv2.getTextSize(text, fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=scale / 10, thickness=1)
new_width = text_size[0][0]
if new_width <= width:
return scale / 10
return 1
self._video_data = np.einsum('klij->lijk', self._video_data.numpy())
for i in range(self._video_data.shape[0]): # loop over all frames and draw bbox in each one
img = self._video_data[i, :, :, :]
for i, box in enumerate(predicted_boxes):
x1, y1, x2, y2 = box[0], box[1], box[2], box[3]
img = cv2.rectangle(img, (x1, y1), (x2, y2), box_color)
img = cv2.rectangle(img, (x1, y1), (x2, y1 - 20), text_background_color, -1)
scale = get_optimal_font_scale(labels[i], x2 - x1)
img = cv2.putText(img, labels[i], (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, scale, font_color, 1)
self._video_data[i, :, :, :] = img
skvideo.io.vwrite(video_name, self._video_data)
def inference(self, video: Union[str, np.ndarray], visualize=False):
"""
Perform action detection inference on video numpy ndarray
@param video: video either filename or np.ndarray of shape [T, H, W, C] where T is number of frames
@param visualize: if true an output video containing action detection visualization is written to disk
@return: list of dictionaries where each dictionary contains action confidence pairs for each person detected
"""
if isinstance(video, str):
self._video_data = self._load_video(video) # load video to tensor
else:
self._video_data = np.einsum('klij->jkli', video) # reorder ndarray dimensions to match pytorch
self._video_data = torch.from_numpy(self._video_data) # convert ndarray to pytorch tensor
predicted_boxes = self._get_person_bboxes() # get bboxes of persons in video
self._clip_no += 1
if len(predicted_boxes) == 0: # if no persons detected in video skip it
return None
actions = []
for person_id, person_bbox in enumerate(predicted_boxes): # for each person detected
# crop person from video
person_data = self._crop_person(person_bbox, output_video=False)
# transform video data before feeding into the model
person_data = self._transform(person_data)
if "slowfast" in self._model_name:
# move video to device before feeding into the model
person_data = [i.to(self._device)[None, ...] for i in person_data]
preds = self._model(person_data) # perform inference
else:
# move video to device before feeding into the model
person_data = person_data.to(self._device)
preds = self._model(person_data[None, ...]) # perform inference
self._preds = self._post_act(preds) # apply softmax to predictions
actions.append(self._process_preds()) # process predictions and append new results to list
# actions.append(self._get_top_k()) # get top k predictions and append new results to list
if visualize: # if visualization is enabled
labels = [] # create labels from top 1 and confidence of each person
for p in actions:
labels.append(f"{list(p.keys())[0]} {round(100 * list(p.values())[0])}")
# output video with bboxes and labels
self._draw_bboxes('video/' + str(self._clip_no) + '.mp4', predicted_boxes, labels)
return actions
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment