Last active
June 23, 2021 20:26
-
-
Save Rumidom/7eaec5f83da5aef1128847880638b802 to your computer and use it in GitHub Desktop.
YOLOV5_CustomDetect
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import cv2 | |
| import torch | |
| import torch.nn as nn | |
| import torchvision | |
| import numpy as np | |
| import math | |
| import time | |
| from pathlib import Path | |
| def DWConv(c1, c2, k=1, s=1, act=True): | |
| # Depthwise convolution | |
| return Conv(c1, c2, k, s, g=math.gcd(c1, c2), act=act) | |
| class Conv(nn.Module): | |
| # Standard convolution | |
| def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True): # ch_in, ch_out, kernel, stride, padding, groups | |
| super(Conv, self).__init__() | |
| self.conv = nn.Conv2d(c1, c2, k, s, autopad(k, p), groups=g, bias=False) | |
| self.bn = nn.BatchNorm2d(c2) | |
| self.act = nn.SiLU() if act is True else (act if isinstance(act, nn.Module) else nn.Identity()) | |
| def forward(self, x): | |
| return self.act(self.bn(self.conv(x))) | |
| def fuseforward(self, x): | |
| return self.act(self.conv(x)) | |
| def time_synchronized(): | |
| # pytorch-accurate time | |
| if torch.cuda.is_available(): | |
| torch.cuda.synchronize() | |
| return time.time() | |
| def xywh2xyxy(x): | |
| # Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2] where xy1=top-left, xy2=bottom-right | |
| y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) | |
| y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x | |
| y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y | |
| y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x | |
| y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y | |
| return y | |
| def non_max_suppression(prediction, conf_thres=0.25, iou_thres=0.45, classes=None, agnostic=False, multi_label=False, | |
| labels=(), max_det=300): | |
| """Runs Non-Maximum Suppression (NMS) on inference results | |
| Returns: | |
| list of detections, on (n,6) tensor per image [xyxy, conf, cls] | |
| """ | |
| nc = prediction.shape[2] - 5 # number of classes | |
| xc = prediction[..., 4] > conf_thres # candidates | |
| # Checks | |
| assert 0 <= conf_thres <= 1, f'Invalid Confidence threshold {conf_thres}, valid values are between 0.0 and 1.0' | |
| assert 0 <= iou_thres <= 1, f'Invalid IoU {iou_thres}, valid values are between 0.0 and 1.0' | |
| # Settings | |
| min_wh, max_wh = 2, 4096 # (pixels) minimum and maximum box width and height | |
| max_nms = 30000 # maximum number of boxes into torchvision.ops.nms() | |
| time_limit = 10.0 # seconds to quit after | |
| redundant = True # require redundant detections | |
| multi_label &= nc > 1 # multiple labels per box (adds 0.5ms/img) | |
| merge = False # use merge-NMS | |
| t = time.time() | |
| output = [torch.zeros((0, 6), device=prediction.device)] * prediction.shape[0] | |
| for xi, x in enumerate(prediction): # image index, image inference | |
| # Apply constraints | |
| # x[((x[..., 2:4] < min_wh) | (x[..., 2:4] > max_wh)).any(1), 4] = 0 # width-height | |
| x = x[xc[xi]] # confidence | |
| # Cat apriori labels if autolabelling | |
| if labels and len(labels[xi]): | |
| l = labels[xi] | |
| v = torch.zeros((len(l), nc + 5), device=x.device) | |
| v[:, :4] = l[:, 1:5] # box | |
| v[:, 4] = 1.0 # conf | |
| v[range(len(l)), l[:, 0].long() + 5] = 1.0 # cls | |
| x = torch.cat((x, v), 0) | |
| # If none remain process next image | |
| if not x.shape[0]: | |
| continue | |
| # Compute conf | |
| x[:, 5:] *= x[:, 4:5] # conf = obj_conf * cls_conf | |
| # Box (center x, center y, width, height) to (x1, y1, x2, y2) | |
| box = xywh2xyxy(x[:, :4]) | |
| # Detections matrix nx6 (xyxy, conf, cls) | |
| if multi_label: | |
| i, j = (x[:, 5:] > conf_thres).nonzero(as_tuple=False).T | |
| x = torch.cat((box[i], x[i, j + 5, None], j[:, None].float()), 1) | |
| else: # best class only | |
| conf, j = x[:, 5:].max(1, keepdim=True) | |
| x = torch.cat((box, conf, j.float()), 1)[conf.view(-1) > conf_thres] | |
| # Filter by class | |
| if classes is not None: | |
| x = x[(x[:, 5:6] == torch.tensor(classes, device=x.device)).any(1)] | |
| # Apply finite constraint | |
| # if not torch.isfinite(x).all(): | |
| # x = x[torch.isfinite(x).all(1)] | |
| # Check shape | |
| n = x.shape[0] # number of boxes | |
| if not n: # no boxes | |
| continue | |
| elif n > max_nms: # excess boxes | |
| x = x[x[:, 4].argsort(descending=True)[:max_nms]] # sort by confidence | |
| # Batched NMS | |
| c = x[:, 5:6] * (0 if agnostic else max_wh) # classes | |
| boxes, scores = x[:, :4] + c, x[:, 4] # boxes (offset by class), scores | |
| i = torchvision.ops.nms(boxes, scores, iou_thres) # NMS | |
| if i.shape[0] > max_det: # limit detections | |
| i = i[:max_det] | |
| if merge and (1 < n < 3E3): # Merge NMS (boxes merged using weighted mean) | |
| # update boxes as boxes(i,4) = weights(i,n) * boxes(n,4) | |
| iou = box_iou(boxes[i], boxes) > iou_thres # iou matrix | |
| weights = iou * scores[None] # box weights | |
| x[i, :4] = torch.mm(weights, x[:, :4]).float() / weights.sum(1, keepdim=True) # merged boxes | |
| if redundant: | |
| i = i[iou.sum(1) > 1] # require redundancy | |
| output[xi] = x[i] | |
| if (time.time() - t) > time_limit: | |
| print(f'WARNING: NMS time limit {time_limit}s exceeded') | |
| break # time limit exceeded | |
| return output | |
| def make_divisible(x, divisor): | |
| # Returns x evenly divisible by divisor | |
| return math.ceil(x / divisor) * divisor | |
| def check_img_size(img_size, s=32): | |
| # Verify img_size is a multiple of stride s | |
| new_size = make_divisible(img_size, int(s)) # ceil gs-multiple | |
| if new_size != img_size: | |
| print('WARNING: --img-size %g must be multiple of max stride %g, updating to %g' % (img_size, s, new_size)) | |
| return new_size | |
| class Ensemble(nn.ModuleList): | |
| # Ensemble of models | |
| def __init__(self): | |
| super(Ensemble, self).__init__() | |
| def forward(self, x, augment=False): | |
| y = [] | |
| for module in self: | |
| y.append(module(x, augment)[0]) | |
| # y = torch.stack(y).max(0)[0] # max ensemble | |
| # y = torch.stack(y).mean(0) # mean ensemble | |
| y = torch.cat(y, 1) # nms ensemble | |
| return y, None # inference, train output | |
| class Detect(nn.Module): | |
| stride = None # strides computed during build | |
| onnx_dynamic = False # ONNX export parameter | |
| def __init__(self, nc=80, anchors=(), ch=(), inplace=True): # detection layer | |
| super(Detect, self).__init__() | |
| self.nc = nc # number of classes | |
| self.no = nc + 5 # number of outputs per anchor | |
| self.nl = len(anchors) # number of detection layers | |
| self.na = len(anchors[0]) // 2 # number of anchors | |
| self.grid = [torch.zeros(1)] * self.nl # init grid | |
| a = torch.tensor(anchors).float().view(self.nl, -1, 2) | |
| self.register_buffer('anchors', a) # shape(nl,na,2) | |
| self.register_buffer('anchor_grid', a.clone().view(self.nl, 1, -1, 1, 1, 2)) # shape(nl,1,na,1,1,2) | |
| self.m = nn.ModuleList(nn.Conv2d(x, self.no * self.na, 1) for x in ch) # output conv | |
| self.inplace = inplace # use in-place ops (e.g. slice assignment) | |
| def forward(self, x): | |
| # x = x.copy() # for profiling | |
| z = [] # inference output | |
| for i in range(self.nl): | |
| x[i] = self.m[i](x[i]) # conv | |
| bs, _, ny, nx = x[i].shape # x(bs,255,20,20) to x(bs,3,20,20,85) | |
| x[i] = x[i].view(bs, self.na, self.no, ny, nx).permute(0, 1, 3, 4, 2).contiguous() | |
| if not self.training: # inference | |
| if self.grid[i].shape[2:4] != x[i].shape[2:4] or self.onnx_dynamic: | |
| self.grid[i] = self._make_grid(nx, ny).to(x[i].device) | |
| y = x[i].sigmoid() | |
| if self.inplace: | |
| y[..., 0:2] = (y[..., 0:2] * 2. - 0.5 + self.grid[i]) * self.stride[i] # xy | |
| y[..., 2:4] = (y[..., 2:4] * 2) ** 2 * self.anchor_grid[i] # wh | |
| else: # for YOLOv5 on AWS Inferentia https://github.com/ultralytics/yolov5/pull/2953 | |
| xy = (y[..., 0:2] * 2. - 0.5 + self.grid[i]) * self.stride[i] # xy | |
| wh = (y[..., 2:4] * 2) ** 2 * self.anchor_grid[i].view(1, self.na, 1, 1, 2) # wh | |
| y = torch.cat((xy, wh, y[..., 4:]), -1) | |
| z.append(y.view(bs, -1, self.no)) | |
| return x if self.training else (torch.cat(z, 1), x) | |
| @staticmethod | |
| def _make_grid(nx=20, ny=20): | |
| yv, xv = torch.meshgrid([torch.arange(ny), torch.arange(nx)]) | |
| return torch.stack((xv, yv), 2).view((1, 1, ny, nx, 2)).float() | |
| class Model(nn.Module): | |
| def __init__(self, cfg='yolov5s.yaml', ch=3, nc=None, anchors=None): # model, input channels, number of classes | |
| super(Model, self).__init__() | |
| if isinstance(cfg, dict): | |
| self.yaml = cfg # model dict | |
| else: # is *.yaml | |
| import yaml # for torch hub | |
| self.yaml_file = Path(cfg).name | |
| with open(cfg) as f: | |
| self.yaml = yaml.safe_load(f) # model dict | |
| # Define model | |
| ch = self.yaml['ch'] = self.yaml.get('ch', ch) # input channels | |
| if nc and nc != self.yaml['nc']: | |
| logger.info(f"Overriding model.yaml nc={self.yaml['nc']} with nc={nc}") | |
| self.yaml['nc'] = nc # override yaml value | |
| if anchors: | |
| logger.info(f'Overriding model.yaml anchors with anchors={anchors}') | |
| self.yaml['anchors'] = round(anchors) # override yaml value | |
| self.model, self.save = parse_model(deepcopy(self.yaml), ch=[ch]) # model, savelist | |
| self.names = [str(i) for i in range(self.yaml['nc'])] # default names | |
| self.inplace = self.yaml.get('inplace', True) | |
| # logger.info([x.shape for x in self.forward(torch.zeros(1, ch, 64, 64))]) | |
| # Build strides, anchors | |
| m = self.model[-1] # Detect() | |
| if isinstance(m, Detect): | |
| s = 256 # 2x min stride | |
| m.inplace = self.inplace | |
| m.stride = torch.tensor([s / x.shape[-2] for x in self.forward(torch.zeros(1, ch, s, s))]) # forward | |
| m.anchors /= m.stride.view(-1, 1, 1) | |
| check_anchor_order(m) | |
| self.stride = m.stride | |
| self._initialize_biases() # only run once | |
| # logger.info('Strides: %s' % m.stride.tolist()) | |
| # Init weights, biases | |
| initialize_weights(self) | |
| self.info() | |
| logger.info('') | |
| def forward(self, x, augment=False, profile=False): | |
| if augment: | |
| return self.forward_augment(x) # augmented inference, None | |
| else: | |
| return self.forward_once(x, profile) # single-scale inference, train | |
| def forward_augment(self, x): | |
| img_size = x.shape[-2:] # height, width | |
| s = [1, 0.83, 0.67] # scales | |
| f = [None, 3, None] # flips (2-ud, 3-lr) | |
| y = [] # outputs | |
| for si, fi in zip(s, f): | |
| xi = scale_img(x.flip(fi) if fi else x, si, gs=int(self.stride.max())) | |
| yi = self.forward_once(xi)[0] # forward | |
| # cv2.imwrite(f'img_{si}.jpg', 255 * xi[0].cpu().numpy().transpose((1, 2, 0))[:, :, ::-1]) # save | |
| yi = self._descale_pred(yi, fi, si, img_size) | |
| y.append(yi) | |
| return torch.cat(y, 1), None # augmented inference, train | |
| def forward_once(self, x, profile=False): | |
| y, dt = [], [] # outputs | |
| for m in self.model: | |
| if m.f != -1: # if not from previous layer | |
| x = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f] # from earlier layers | |
| if profile: | |
| o = thop.profile(m, inputs=(x,), verbose=False)[0] / 1E9 * 2 if thop else 0 # FLOPs | |
| t = time_synchronized() | |
| for _ in range(10): | |
| _ = m(x) | |
| dt.append((time_synchronized() - t) * 100) | |
| if m == self.model[0]: | |
| logger.info(f"{'time (ms)':>10s} {'GFLOPs':>10s} {'params':>10s} {'module'}") | |
| logger.info(f'{dt[-1]:10.2f} {o:10.2f} {m.np:10.0f} {m.type}') | |
| x = m(x) # run | |
| y.append(x if m.i in self.save else None) # save output | |
| if profile: | |
| logger.info('%.1fms total' % sum(dt)) | |
| return x | |
| def _descale_pred(self, p, flips, scale, img_size): | |
| # de-scale predictions following augmented inference (inverse operation) | |
| if self.inplace: | |
| p[..., :4] /= scale # de-scale | |
| if flips == 2: | |
| p[..., 1] = img_size[0] - p[..., 1] # de-flip ud | |
| elif flips == 3: | |
| p[..., 0] = img_size[1] - p[..., 0] # de-flip lr | |
| else: | |
| x, y, wh = p[..., 0:1] / scale, p[..., 1:2] / scale, p[..., 2:4] / scale # de-scale | |
| if flips == 2: | |
| y = img_size[0] - y # de-flip ud | |
| elif flips == 3: | |
| x = img_size[1] - x # de-flip lr | |
| p = torch.cat((x, y, wh, p[..., 4:]), -1) | |
| return p | |
| def _initialize_biases(self, cf=None): # initialize biases into Detect(), cf is class frequency | |
| # https://arxiv.org/abs/1708.02002 section 3.3 | |
| # cf = torch.bincount(torch.tensor(np.concatenate(dataset.labels, 0)[:, 0]).long(), minlength=nc) + 1. | |
| m = self.model[-1] # Detect() module | |
| for mi, s in zip(m.m, m.stride): # from | |
| b = mi.bias.view(m.na, -1) # conv.bias(255) to (3,85) | |
| b.data[:, 4] += math.log(8 / (640 / s) ** 2) # obj (8 objects per 640 image) | |
| b.data[:, 5:] += math.log(0.6 / (m.nc - 0.99)) if cf is None else torch.log(cf / cf.sum()) # cls | |
| mi.bias = torch.nn.Parameter(b.view(-1), requires_grad=True) | |
| def _print_biases(self): | |
| m = self.model[-1] # Detect() module | |
| for mi in m.m: # from | |
| b = mi.bias.detach().view(m.na, -1).T # conv.bias(255) to (3,85) | |
| logger.info( | |
| ('%6g Conv2d.bias:' + '%10.3g' * 6) % (mi.weight.shape[1], *b[:5].mean(1).tolist(), b[5:].mean())) | |
| # def _print_weights(self): | |
| # for m in self.model.modules(): | |
| # if type(m) is Bottleneck: | |
| # logger.info('%10.3g' % (m.w.detach().sigmoid() * 2)) # shortcut weights | |
| def fuse(self): # fuse model Conv2d() + BatchNorm2d() layers | |
| logger.info('Fusing layers... ') | |
| for m in self.model.modules(): | |
| if type(m) is Conv and hasattr(m, 'bn'): | |
| m.conv = fuse_conv_and_bn(m.conv, m.bn) # update conv | |
| delattr(m, 'bn') # remove batchnorm | |
| m.forward = m.fuseforward # update forward | |
| self.info() | |
| return self | |
| def nms(self, mode=True): # add or remove NMS module | |
| present = type(self.model[-1]) is NMS # last layer is NMS | |
| if mode and not present: | |
| logger.info('Adding NMS... ') | |
| m = NMS() # module | |
| m.f = -1 # from | |
| m.i = self.model[-1].i + 1 # index | |
| self.model.add_module(name='%s' % m.i, module=m) # add | |
| self.eval() | |
| elif not mode and present: | |
| logger.info('Removing NMS... ') | |
| self.model = self.model[:-1] # remove | |
| return self | |
| def autoshape(self): # add AutoShape module | |
| logger.info('Adding AutoShape... ') | |
| m = AutoShape(self) # wrap model | |
| copy_attr(m, self, include=('yaml', 'nc', 'hyp', 'names', 'stride'), exclude=()) # copy attributes | |
| return m | |
| def info(self, verbose=False, img_size=640): # print model information | |
| model_info(self, verbose, img_size) | |
| def attempt_load(weights, map_location=None, inplace=True): | |
| # Loads an ensemble of models weights=[a,b,c] or a single model weights=[a] or weights=a | |
| model = Ensemble() | |
| for w in weights if isinstance(weights, list) else [weights]: | |
| ckpt = torch.load(Path(str(w).strip().replace("'", '')), map_location=map_location) # load | |
| model.append(ckpt['ema' if ckpt.get('ema') else 'model'].float().fuse().eval()) # FP32 model | |
| # Compatibility updates | |
| for m in model.modules(): | |
| if type(m) in [nn.Hardswish, nn.LeakyReLU, nn.ReLU, nn.ReLU6, nn.SiLU, Detect, Model]: | |
| m.inplace = inplace # pytorch 1.7.0 compatibility | |
| elif type(m) is Conv: | |
| m._non_persistent_buffers_set = set() # pytorch 1.6.0 compatibility | |
| if len(model) == 1: | |
| return model[-1] # return model | |
| else: | |
| print(f'Ensemble created with {weights}\n') | |
| for k in ['names']: | |
| setattr(model, k, getattr(model[-1], k)) | |
| model.stride = model[torch.argmax(torch.tensor([m.stride.max() for m in model])).int()].stride # max stride | |
| return model # return ensemble | |
| def letterbox(img, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32): | |
| # Resize and pad image while meeting stride-multiple constraints | |
| shape = img.shape[:2] # current shape [height, width] | |
| if isinstance(new_shape, int): | |
| new_shape = (new_shape, new_shape) | |
| # Scale ratio (new / old) | |
| r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) | |
| if not scaleup: # only scale down, do not scale up (for better test mAP) | |
| r = min(r, 1.0) | |
| # Compute padding | |
| ratio = r, r # width, height ratios | |
| new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) | |
| dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding | |
| if auto: # minimum rectangle | |
| dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding | |
| elif scaleFill: # stretch | |
| dw, dh = 0.0, 0.0 | |
| new_unpad = (new_shape[1], new_shape[0]) | |
| ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios | |
| dw /= 2 # divide padding into 2 sides | |
| dh /= 2 | |
| if shape[::-1] != new_unpad: # resize | |
| img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR) | |
| top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) | |
| left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) | |
| img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border | |
| return img, ratio, (dw, dh) | |
| weights = "/content/yolov5/last.pt" #"/content/yolov5/runs/train/exp/weights/last.pt" | |
| source = "/content/DatasetYolo_6/images/val/10.png" | |
| device=None | |
| imgsz=640 | |
| model = attempt_load(weights, map_location=device) # load FP32 model | |
| stride = int(model.stride.max()) # model stride | |
| imgsz = check_img_size(imgsz, s=stride) # check image size | |
| names = model.module.names if hasattr(model, 'module') else model.names | |
| conf_thres = 0.25 #acertividade da classificação | |
| iou_thres=0.45 #acertividade da bounding box | |
| classes=None | |
| agnostic_nms=False | |
| max_det=1000 | |
| img0 = cv2.imread(source) | |
| # Padded resize | |
| img = letterbox(img0, imgsz, stride=stride)[0] | |
| # Convert | |
| img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, to 3x416x416 | |
| img = np.ascontiguousarray(img) | |
| img = torch.from_numpy(img).to(device) | |
| #img = img.half() if half else img.float() # uint8 to fp16/32 | |
| img = img.float() | |
| img /= 255.0 # 0 - 255 to 0.0 - 1.0 | |
| if img.ndimension() == 3: | |
| img = img.unsqueeze(0) | |
| # Inference | |
| t1 = time_synchronized() | |
| pred = model(img, augment=False)[0] | |
| # Apply NMS | |
| pred = non_max_suppression(pred, conf_thres, iou_thres, classes, agnostic_nms, max_det=max_det) | |
| t2 = time_synchronized() | |
| print(pred) | |
| print("time: ",round(t2-t1,3), " s") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment