# YOLOv5 ???? by Ultralytics, GPL-3.0 license
"""
Common modules
"""
'''===============================================一、导入包==================================================='''
'''======================1.导入安装好的python库====================='''
import json # 用于json和Python数据之间的相互转换
import math # 数学函数模块
import platform # 获取操作系统的信息
import warnings # 警告程序员关于语言或库功能的变化的方法
from copy import copy # 数据拷贝模块 分浅拷贝和深拷贝
from pathlib import Path # Path将str转换为Path对象 使字符串路径易于操作的模块
import cv2 # 调用OpenCV的cv库
import numpy as np # numpy数组操作模块
import pandas as pd # panda数组操作模块
import requests # Python的HTTP客户端库
import torch # pytorch深度学习框架
import as nn # 专门为神经网络设计的模块化接口
from PIL import Image # 图像基础操作模块
from import amp # 混合精度训练模块
'''===================2.加载自定义模块============================'''
from import exif_transpose, letterbox # 加载数据集的函数
from import (LOGGER, check_requirements, check_suffix, colorstr, increment_path, make_divisible,
non_max_suppression, scale_coords, xywh2xyxy, xyxy2xywh) # 定义了一些常用的工具函数
from import Annotator, colors, plot_one_box # 定义了Annotator类,可以在图像上绘制矩形框和标注信息
from utils.torch_utils import time_sync # 定义了一些与PyTorch有关的工具函数
'''===============================================二、基础组件==================================================='''
'''===========:根据输入的卷积核计算该卷积模块所需的pad值================'''
# 为same卷积或者same池化自动扩充
# 通过卷积核的大小来计算需要的padding为多少才能把tensor补成原来的形状
def autopad(k, p=None): # kernel, padding
# Pad to 'same'
# 如果p是none 则进行下一步
if p is None:
# 如果k是int 则进行k//2 若不是则进行x//2
p = k // 2 if isinstance(k, int) else [x // 2 for x in k] # auto-pad
return p
'''===========:标准卷积 由Conv + BN + activate组成================'''
class Conv():
# Standard convolution
# init初始化构造函数
def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True): # ch_in, ch_out, kernel, stride, padding, groups
"""在Focus、Bottleneck、BottleneckCSP、C3、SPP、DWConv、TransformerBloc等模块中调用
Standard convolution conv+BN+act
:params c1: 输入的channel值
:params c2: 输出的channel值
:params k: 卷积的kernel_size
:params s: 卷积的stride
:params p: 卷积的padding 一般是None 可以通过autopad自行计算需要pad的padding数
:params g: 卷积的groups数 =1就是普通的卷积 >1就是深度可分离卷积
:params act: 激活函数类型 True就是SiLU()/Swish False就是不使用激活函数
类型是就使用传进来的激活函数类型
"""
super().__init__()
# 卷积层
= nn.Conv2d(c1, c2, k, s, autopad(k, p), groups=g, bias=False)
# 归一化层
= nn.BatchNorm2d(c2)
# 激活函数
= () if act is True else (act if isinstance(act, ) else ())
# 正向计算,网络执行的顺序是根据forward函数来决定的
def forward(self, x):
# conv卷积 -> bn -> act激活
return (((x)))
# 正向融合计算
def forward_fuse(self, x):
# 这里只有卷积和激活
return ((x))
'''===========:深度可分离卷积================'''
class DWConv(Conv):
# Depth-wise convolution class
def __init__(self, c1, c2, k=1, s=1, act=True): # ch_in, ch_out, kernel, stride, padding, groups
super().__init__(c1, c2, k, s, g=(c1, c2), act=act)
'''===========:标准的瓶颈层 由1x1conv+3x3conv+残差块组成================'''
class Bottleneck():
# Standard bottleneck
def __init__(self, c1, c2, shortcut=True, g=1, e=0.5): # ch_in, ch_out, shortcut, groups, expansion
"""在BottleneckCSP和的parse_model中调用
Standard bottleneck Conv+Conv+shortcut
:params c1: 第一个卷积的输入channel
:params c2: 第二个卷积的输出channel
:params shortcut: bool 是否有shortcut连接 默认是True
:params g: 卷积分组的个数 =1就是普通卷积 >1就是深度可分离卷积
:params e: expansion ratio e*c2就是第一个卷积的输出channel=第二个卷积的输入channel
"""
super().__init__()
c_ = int(c2 * e) # hidden channels
# 1*1卷积层
self.cv1 = Conv(c1, c_, 1, 1)
# 3*3卷积层
self.cv2 = Conv(c_, c2, 3, 1, g=g)
# 如果shortcut为True就会将输入和输出相加之后再输出
= shortcut and c1 == c2
def forward(self, x):
return x + self.cv2(self.cv1(x)) if else self.cv2(self.cv1(x))
'''===========:瓶颈层 由几个Bottleneck模块的堆叠+CSP结构组成================'''
class BottleneckCSP():
# CSP Bottleneck /WongKinYiu/CrossStagePartialNetworks
def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion
"""在C3模块和的parse_model模块调用
CSP Bottleneck /WongKinYiu/CrossStagePartialNetworks
:params c1: 整个BottleneckCSP的输入channel
:params c2: 整个BottleneckCSP的输出channel
:params n: 有n个Bottleneck
:params shortcut: bool Bottleneck中是否有shortcut,默认True
:params g: Bottleneck中的3x3卷积类型 =1普通卷积 >1深度可分离卷积
:params e: expansion ratio c2xe=中间其他所有层的卷积核个数/中间所有层的输入输出channel数
c_: bottleneckCSP 结构的中间层的通道数,由膨胀率e决定
"""
super().__init__()
c_ = int(c2 * e) # hidden channels
# 4个1*1卷积层的堆叠
self.cv1 = Conv(c1, c_, 1, 1)
self.cv2 = nn.Conv2d(c1, c_, 1, 1, bias=False)
self.cv3 = nn.Conv2d(c_, c_, 1, 1, bias=False)
self.cv4 = Conv(2 * c_, c2, 1, 1)
# bn层
= nn.BatchNorm2d(2 * c_) # applied to cat(cv2, cv3)
# 激活函数
= ()
# m:叠加n次Bottleneck的操作
# 操作符*可以把一个list拆开成一个个独立的元素
= (*(Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n)))
def forward(self, x):
# y1相当于先做一次cv1操作然后进行m操作最后进行cv3操作,也就是BCSPn模块中的上面的分支操作
# 输入x ->Conv模块 ->n个bottleneck模块 ->Conv模块 ->y1
y1 = self.cv3((self.cv1(x)))
# y2就是进行cv2操作,也就是BCSPn模块中的下面的分支操作(直接逆行conv操作的分支, Conv--nXBottleneck--conv)
# 输入x -> Conv模块 -> 输出y2
y2 = self.cv2(x)
# 最后y1和y2做拼接, 接着进入bn层做归一化, 然后做act激活, 最后输出cv4
# 输入y1,y2->按照通道数融合 ->归一化 -> 激活函数 -> Conv输出 -> 输出
# (y1, y2), dim=1: 这里是指定在第一个维度上进行合并,即在channel维度上合并
return self.cv4(((((y1, y2), dim=1))))
'''===========6.C3:和BottleneckCSP模块类似,但是少了一个Conv模块================'''
# ===6.1 C3=== #
class C3():
# CSP Bottleneck with 3 convolutions
def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion
"""在C3TR模块和的parse_model模块调用
CSP Bottleneck with 3 convolutions
:params c1: 整个BottleneckCSP的输入channel
:params c2: 整个BottleneckCSP的输出channel
:params n: 有n个Bottleneck
:params shortcut: bool Bottleneck中是否有shortcut,默认True
:params g: Bottleneck中的3x3卷积类型 =1普通卷积 >1深度可分离卷积
:params e: expansion ratio c2xe=中间其他所有层的卷积核个数/中间所有层的输入输出channel数
"""
super().__init__()
c_ = int(c2 * e) # hidden channels
# 3个1*1卷积层的堆叠,比BottleneckCSP少一个
self.cv1 = Conv(c1, c_, 1, 1)
self.cv2 = Conv(c1, c_, 1, 1)
self.cv3 = Conv(2 * c_, c2, 1) # act=FReLU(c2)
= (*(Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n)))
# = (*[CrossConv(c_, c_, 3, 1, g, 1.0, shortcut) for _ in range(n)])
def forward(self, x):
# 将第一个卷积层与第二个卷积层的结果拼接在一起
return self.cv3((((self.cv1(x)), self.cv2(x)), dim=1))
# ===6.2 C3SPP(C3):继承自 C3,n 个 Bottleneck 更换为 1 个 SPP=== #
class C3SPP(C3):
# C3 module with SPP()
def __init__(self, c1, c2, k=(5, 9, 13), n=1, shortcut=True, g=1, e=0.5):
super().__init__(c1, c2, n, shortcut, g, e)
c_ = int(c2 * e)
= SPP(c_, c_, k)
# ===6.3 C3Ghost(C3):继承自 C3,Bottleneck 更换为 GhostBottleneck=== #
class C3Ghost(C3):
# C3 module with GhostBottleneck()
def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5):
super().__init__(c1, c2, n, shortcut, g, e)
c_ = int(c2 * e) # hidden channels
= (*(GhostBottleneck(c_, c_) for _ in range(n)))
'''===========:空间金字塔池化模块================'''
# 用在骨干网络收尾阶段,用于融合多尺度特征。
# ===7.1 SPP:空间金字塔池化=== #
class SPP():
# Spatial Pyramid Pooling (SPP) layer /abs/1406.4729
def __init__(self, c1, c2, k=(5, 9, 13)):
"""在的parse_model模块调用
空间金字塔池化 Spatial pyramid pooling layer used in YOLOv3-SPP
:params c1: SPP模块的输入channel
:params c2: SPP模块的输出channel
:params k: 保存着三个maxpool的卷积核大小 默认是(5, 9, 13)
"""
super().__init__()
c_ = c1 // 2 # hidden channels
# 1*1卷积
self.cv1 = Conv(c1, c_, 1, 1)
# 这里+1是因为有len(k)+1个输入
self.cv2 = Conv(c_ * (len(k) + 1), c2, 1, 1)
# m先进行最大池化操作, 然后通过进行构造一个模块 在构造时对每一个k都要进行最大池化
= ([nn.MaxPool2d(kernel_size=x, stride=1, padding=x // 2) for x in k])
def forward(self, x):
# 先进行cv1的操作
x = self.cv1(x)
# 忽略了警告错误的输出
with warnings.catch_warnings():
('ignore') # suppress torch 1.9.0 max_pool2d() warning
# 对每一个m进行最大池化 和没有做池化的每一个输入进行叠加 然后做拼接 最后做cv2操作
return self.cv2(([x] + [m(x) for m in ], 1))
# ===7.2 SPPF:快速版的空间金字塔池化=== #
class SPPF():
# Spatial Pyramid Pooling - Fast (SPPF) layer for YOLOv5 by Glenn Jocher
def __init__(self, c1, c2, k=5): # equivalent to SPP(k=(5, 9, 13))
super().__init__()
c_ = c1 // 2 # hidden channels
self.cv1 = Conv(c1, c_, 1, 1)
self.cv2 = Conv(c_ * 4, c2, 1, 1)
= nn.MaxPool2d(kernel_size=k, stride=1, padding=k // 2)
def forward(self, x):
x = self.cv1(x)
with warnings.catch_warnings():
('ignore') # suppress torch 1.9.0 max_pool2d() warning
y1 = (x)
y2 = (y1)
return self.cv2(([x, y1, y2, (y2)], 1))
'''===========:把宽度w和高度h的信息整合到c空间================'''
class Focus():
# Focus wh information into c-space
def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True): # ch_in, ch_out, kernel, stride, padding, groups
"""在的parse_model函数中被调用
理论:从高分辨率图像中,周期性的抽出像素点重构到低分辨率图像中,即将图像相邻的四个位置进行堆叠,
聚焦wh维度信息到c通道空,提高每个点感受野,并减少原始信息的丢失,该模块的设计主要是减少计算量加快速度。
Focus wh information into c-space 把宽度w和高度h的信息整合到c空间中
先做4个slice 再concat 最后再做Conv
slice后 (b,c1,w,h) -> 分成4个slice 每个slice(b,c1,w/2,h/2)
concat(dim=1)后 4个slice(b,c1,w/2,h/2)) -> (b,4c1,w/2,h/2)
conv后 (b,4c1,w/2,h/2) -> (b,c2,w/2,h/2)
:params c1: slice后的channel
:params c2: Focus最终输出的channel
:params k: 最后卷积的kernel
:params s: 最后卷积的stride
:params p: 最后卷积的padding
:params g: 最后卷积的分组情况 =1普通卷积 >1深度可分离卷积
:params act: bool激活函数类型 默认True:SiLU()/Swish False:不用激活函数
"""
super().__init__()
# concat后的卷积(最后的卷积)
= Conv(c1 * 4, c2, k, s, p, g, act)
# = Contract(gain=2)
def forward(self, x): # x(b,c,w,h) -> y(b,4c,w/2,h/2)
# 先进行切分, 然后进行拼接, 最后再做conv操作
return (([x[..., ::2, ::2], x[..., 1::2, ::2], x[..., ::2, 1::2], x[..., 1::2, 1::2]], 1))
# return ((x))
# 以下模块Contract,Expand,Concat是用来处理输入特征的shape的
'''===========:收缩模块:调整张量的大小,将宽高收缩到通道中。================'''
class Contract():
# Contract width-height into channels, . x(1,64,80,80) to x(1,256,40,40)
"""用在的parse_model模块 用的不多
改变输入特征的shape 将w和h维度(缩小)的数据收缩到channel维度上(放大)
Contract width-height into channels, . x(1,64,80,80) to x(1,256,40,40)
"""
def __init__(self, gain=2):
super().__init__()
= gain
def forward(self, x):
b, c, h, w = () # assert (h / s == 0) and (W / s == 0), 'Indivisible gain'
s =
# permute: 改变tensor的维度顺序
x = (b, c, h // s, s, w // s, s) # x(1,64,40,2,40,2)
# .view: 改变tensor的维度
x = (0, 3, 5, 1, 2, 4).contiguous() # x(1,2,2,64,40,40)
return (b, c * s * s, h // s, w // s) # x(1,256,40,40)
'''===========:扩张模块,将特征图像素变大================'''
class Expand():
# Expand channels into width-height, . x(1,64,80,80) to x(1,16,160,160)
def __init__(self, gain=2):
super().__init__()
= gain
def forward(self, x):
b, c, h, w = () # assert C / s ** 2 == 0, 'Indivisible gain'
s =
x = (b, s, s, c // s ** 2, h, w) # x(1,2,2,16,80,80)
x = (0, 3, 4, 1, 5, 2).contiguous() # x(1,16,80,2,80,2)
return (b, c // s ** 2, h * s, w * s) # x(1,16,160,160)
'''===========:自定义concat模块,dimension就是维度值,说明沿着哪一个维度进行拼接================'''
# 作拼接的一个类
# 拼接函数,将两个tensor进行拼接
class Concat():
# Concatenate a list of tensors along dimension
def __init__(self, dimension=1):
super().__init__()
= dimension
def forward(self, x):
return (x, )
'''===============================================三、注意力模块==================================================='''
'''===========:================'''
class TransformerLayer():
# Transformer layer /abs/2010.11929 (LayerNorm layers removed for better performance)
"""
Transformer layer /abs/2010.11929 (LayerNorm layers removed for better performance)
这部分相当于原论文中的单个Encoder部分(只移除了两个Norm部分, 其他结构和原文中的Encoding一模一样)
"""
def __init__(self, c, num_heads):
super().__init__()
= (c, c, bias=False)
= (c, c, bias=False)
= (c, c, bias=False)
# 输入: query、key、value
# 输出: 0 attn_output 即通过self-attention之后,从每一个词语位置输出来的attention 和输入的query它们形状一样的
# 1 attn_output_weights 即attention weights 每一个单词和任意另一个单词之间都会产生一个weight
= (embed_dim=c, num_heads=num_heads)
self.fc1 = (c, c, bias=False)
self.fc2 = (c, c, bias=False)
def forward(self, x):
# 多头注意力机制 + 残差(这里移除了LayerNorm for better performance)
x = ((x), (x), (x))[0] + x
# feed forward 前馈神经网络 + 残差(这里移除了LayerNorm for better performance)
x = self.fc2(self.fc1(x)) + x
return x
'''===========:================'''
class TransformerBlock():
# Vision Transformer /abs/2010.11929
def __init__(self, c1, c2, num_heads, num_layers):
super().__init__()
= None
if c1 != c2:
= Conv(c1, c2)
= (c2, c2) # learnable position embedding
= (*(TransformerLayer(c2, num_heads) for _ in range(num_layers)))
self.c2 = c2
def forward(self, x):
if is not None:
x = (x)
b, _, w, h =
p = (2).permute(2, 0, 1)
return (p + (p)).permute(1, 2, 0).reshape(b, self.c2, w, h)
'''===============================================四、幻象模块==================================================='''
'''===========:幻象卷积 轻量化网络卷积模块================'''
class GhostConv():
# Ghost Convolution /huawei-noah/ghostnet
def __init__(self, c1, c2, k=1, s=1, g=1, act=True): # ch_in, ch_out, kernel, stride, groups
super().__init__()
c_ = c2 // 2 # hidden channels
# 第一步卷积: 少量卷积, 一般是一半的计算量
self.cv1 = Conv(c1, c_, k, s, None, g, act)
# 第二步卷积: cheap operations 使用3x3或5x5的卷积, 并且是逐个特征图的进行卷积(Depth-wise convolutional
self.cv2 = Conv(c_, c_, 5, 1, None, c_, act)
def forward(self, x):
y = self.cv1(x)
return ([y, self.cv2(y)], 1)
'''===========:幻象瓶颈层 ================'''
class GhostBottleneck():
# Ghost Bottleneck /huawei-noah/ghostnet
def __init__(self, c1, c2, k=3, s=1): # ch_in, ch_out, kernel, stride
super().__init__()
c_ = c2 // 2
= (GhostConv(c1, c_, 1, 1), # pw
DWConv(c_, c_, k, s, act=False) if s == 2 else (), # dw
GhostConv(c_, c2, 1, 1, act=False)) # pw-linear
# 注意, 源码中并不是直接Identity连接, 而是先经过一个DWConv + Conv, 再进行shortcut连接的。
= (DWConv(c1, c1, k, s, act=False),
Conv(c1, c2, 1, 1, act=False)) if s == 2 else ()
def forward(self, x):
return (x) + (x)
'''===============================================五、模型扩展模块==================================================='''
'''===========1.C3TR(C3):继承自 C3,n 个 Bottleneck 更换为 1 个 TransformerBlock ================'''
class C3TR(C3):
"""
这部分是根据上面的C3结构改编而来的, 将原先的Bottleneck替换为调用TransformerBlock模块
"""
# C3 module with TransformerBlock()
def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5):
''' 在C3RT模块和的parse_model函数中被调用
:params c1: 整个C3的输入channel
:params c2: 整个C3的输出channel
:params n: 有n个子模块[Bottleneck/CrossConv]
:params shortcut: bool值,子模块[Bottlenec/CrossConv]中是否有shortcut,默认True
:params g: 子模块[Bottlenec/CrossConv]中的3x3卷积类型,=1普通卷积,>1深度可分离卷积
:params e: expansion ratio,e*c2=中间其它所有层的卷积核个数=中间所有层的的输入输出channel
'''
super().__init__(c1, c2, n, shortcut, g, e)
c_ = int(c2 * e)
= TransformerBlock(c_, c_, 4, n)
'''===========: ================'''
class DetectMultiBackend():
# YOLOv5 MultiBackend class for python inference on various backends
def __init__(self, weights='', device=None, dnn=True):
# Usage:
# PyTorch: weights = *.pt
# TorchScript: *.
# CoreML: *.mlmodel
# TensorFlow: *_saved_model
# TensorFlow: *.pb
# TensorFlow Lite: *.tflite
# ONNX Runtime: *.onnx
# OpenCV DNN: *.onnx with dnn=True
super().__init__()
# 判断weights是否为list,若是取出第一个值作为传入路径
w = str(weights[0] if isinstance(weights, list) else weights)
suffix, suffixes = Path(w).(), ['.pt', '.onnx', '.tflite', '.pb', '', '.mlmodel']
check_suffix(w, suffixes) # check weights have acceptable suffix
pt, onnx, tflite, pb, saved_model, coreml = (suffix == x for x in suffixes) # backend booleans
jit = pt and 'torchscript' in ()
stride, names = 64, [f'class{i}' for i in range(1000)] # assign defaults
if jit: # TorchScript
(f'Loading {w} for TorchScript inference...')
extra_files = {'': ''} # model metadata
model = (w, _extra_files=extra_files)
if extra_files['']:
d = (extra_files['']) # extra_files dict
stride, names = int(d['stride']), d['names']
elif pt: # PyTorch
from import attempt_load # scoped to avoid circular import
model = (w) if 'torchscript' in w else attempt_load(weights, map_location=device)
stride = int(.max()) # model stride
names = if hasattr(model, 'module') else # get class names
elif coreml: # CoreML *.mlmodel
import coremltools as ct
model = (w)
elif dnn: # ONNX OpenCV DNN
(f'Loading {w} for ONNX OpenCV DNN inference...')
check_requirements(('opencv-python>=4.5.4',))
net = (w)
elif onnx: # ONNX Runtime
(f'Loading {w} for ONNX Runtime inference...')
check_requirements(('onnx', 'onnxruntime-gpu' if torch.has_cuda else 'onnxruntime'))
import onnxruntime
session = (w, None)
else: # TensorFlow model (TFLite, pb, saved_model)
import tensorflow as tf
if pb: # /guide/migrate#a_graphpb_or_graphpbtxt
def wrap_frozen_graph(gd, inputs, outputs):
x = .v1.wrap_function(lambda: .v1.import_graph_def(gd, name=""), []) # wrapped
return (.map_structure(.as_graph_element, inputs),
.map_structure(.as_graph_element, outputs))
(f'Loading {w} for TensorFlow *.pb inference...')
graph_def = ().as_graph_def()
graph_def.ParseFromString(open(w, 'rb').read())
frozen_func = wrap_frozen_graph(gd=graph_def, inputs="x:0", outputs="Identity:0")
elif saved_model:
(f'Loading {w} for TensorFlow saved_model inference...')
model = .load_model(w)
elif tflite: # /lite/guide/python#install_tensorflow_lite_for_python
if 'edgetpu' in ():
(f'Loading {w} for TensorFlow Edge TPU inference...')
import tflite_runtime.interpreter as tfli
delegate = {'Linux': '.1', # install /software/#edgetpu-runtime
'Darwin': 'libedgetpu.',
'Windows': ''}[()]
interpreter = (model_path=w, experimental_delegates=[tfli.load_delegate(delegate)])
else:
(f'Loading {w} for TensorFlow Lite inference...')
interpreter = (model_path=w) # load TFLite model
interpreter.allocate_tensors() # allocate
input_details = interpreter.get_input_details() # inputs
output_details = interpreter.get_output_details() # outputs
self.__dict__.update(locals()) # assign all variables to self
def forward(self, im, augment=False, visualize=False, val=False):
# YOLOv5 MultiBackend inference
b, ch, h, w = # batch, channel, height, width
if : # PyTorch
y = (im) if else (im, augment=augment, visualize=visualize)
return y if val else y[0]
elif : # CoreML *.mlmodel
im = (0, 2, 3, 1).cpu().numpy() # torch BCHW to numpy BHWC shape(1,320,192,3)
im = ((im[0] * 255).astype('uint8'))
# im = ((192, 320), )
y = ({'image': im}) # coordinates are xywh normalized
box = xywh2xyxy(y['coordinates'] * [[w, h, w, h]]) # xyxy pixels
conf, cls = y['confidence'].max(1), y['confidence'].argmax(1).astype(np.float)
y = ((box, (-1, 1), (-1, 1)), 1)
elif : # ONNX
im = ().numpy() # torch to numpy
if : # ONNX OpenCV DNN
(im)
y = ()
else: # ONNX Runtime
y = ([.get_outputs()[0].name], {.get_inputs()[0].name: im})[0]
else: # TensorFlow model (TFLite, pb, saved_model)
im = (0, 2, 3, 1).cpu().numpy() # torch BCHW to numpy BHWC shape(1,320,192,3)
if :
y = self.frozen_func(x=(im)).numpy()
elif self.saved_model:
y = (im, training=False).numpy()
elif :
input, output = self.input_details[0], self.output_details[0]
int8 = input['dtype'] == np.uint8 # is TFLite quantized uint8 model
if int8:
scale, zero_point = input['quantization']
im = (im / scale + zero_point).astype(np.uint8) # de-scale
.set_tensor(input['index'], im)
()
y = .get_tensor(output['index'])
if int8:
scale, zero_point = output['quantization']
y = ((np.float32) - zero_point) * scale # re-scale
y[..., 0] *= w # x
y[..., 1] *= h # y
y[..., 2] *= w # w
y[..., 3] *= h # h
y = (y)
return (y, []) if val else y
'''===========:自动调整shape,该类基本未用================'''
class AutoShape():
# YOLOv5 input-robust model wrapper for passing cv2/np/PIL/torch inputs. Includes preprocessing, inference and NMS
conf = 0.25 # NMS confidence threshold
iou = 0.45 # NMS IoU threshold
classes = None # (optional list) filter by class, . = [0, 15, 16] for COCO persons, cats and dogs
multi_label = False # NMS multiple labels per box
max_det = 1000 # maximum number of detections per image
def __init__(self, model):
super().__init__()
= model.eval()
def autoshape(self):
('AutoShape already enabled, skipping... ') # model already converted to ()
return self
def _apply(self, fn):
# Apply to(), cpu(), cuda(), half() to model tensors that are not parameters or registered buffers
self = super()._apply(fn)
m = [-1] # Detect()
= fn()
= list(map(fn, ))
if isinstance(m.anchor_grid, list):
m.anchor_grid = list(map(fn, m.anchor_grid))
return self
def forward(self, imgs, size=640, augment=False, profile=False):
# Inference from various sources. For height=640, width=1280, RGB images example inputs are:
# file: imgs = 'data/images/' # str or PosixPath
# URI: = '/images/'
# OpenCV: = ('')[:,:,::-1] # HWC BGR to RGB x(640,1280,3)
# PIL: = ('') or () # HWC x(640,1280,3)
# numpy: = ((640,1280,3)) # HWC
# torch: = (16,3,320,640) # BCHW (scaled to size=640, 0-1 values)
# multiple: = [(''), (''), ...] # list of images
t = [time_sync()]
p = next(()) # for device and type
if isinstance(imgs, ): # torch
with (enabled=.type != 'cpu'):
return (().type_as(p), augment, profile) # inference
# Pre-process
n, imgs = (len(imgs), imgs) if isinstance(imgs, list) else (1, [imgs]) # number of images, list of images
shape0, shape1, files = [], [], [] # image and inference shapes, filenames
for i, im in enumerate(imgs):
f = f'image{i}' # filename
if isinstance(im, (str, Path)): # filename or uri
im, f = Image.open((im, stream=True).raw if str(im).startswith('http') else im), im
im = (exif_transpose(im))
elif isinstance(im, ): # PIL Image
im, f = (exif_transpose(im)), getattr(im, 'filename', f) or f
(Path(f).with_suffix('.jpg').name)
if [0] < 5: # image in CHW
im = ((1, 2, 0)) # reverse dataloader .transpose(2, 0, 1)
im = im[..., :3] if == 3 else (im[..., None], 3) # enforce 3ch input
s = [:2] # HWC
(s) # image shape
g = (size / max(s)) # gain
([y * g for y in s])
imgs[i] = im if else (im) # update
shape1 = [make_divisible(x, int(.max())) for x in (shape1, 0).max(0)] # inference shape
x = [letterbox(im, new_shape=shape1, auto=False)[0] for im in imgs] # pad
x = (x, 0) if n > 1 else x[0][None] # stack
x = (((0, 3, 1, 2))) # BHWC to BCHW
x = torch.from_numpy(x).to().type_as(p) / 255 # uint8 to fp16/32
(time_sync())
with (enabled=.type != 'cpu'):
# Inference
y = (x, augment, profile)[0] # forward
(time_sync())
# Post-process
y = non_max_suppression(y, , iou_thres=, classes=,
multi_label=self.multi_label, max_det=self.max_det) # NMS
for i in range(n):
scale_coords(shape1, y[i][:, :4], shape0[i])
(time_sync())
return Detections(imgs, y, files, t, , )
'''===========:对推理结果进行处理================'''
class Detections:
# YOLOv5 detections class for inference results
"""用在AutoShape函数结尾
detections class for YOLOv5 inference results
"""
def __init__(self, imgs, pred, files, times=None, names=None, shape=None):
super().__init__()
d = pred[0].device # device
gn = [([*([i] for i in [1, 0, 1, 0]), 1, 1], device=d) for im in imgs] # normalizations
# imgs:原图
= imgs # list of images as numpy arrays
# pred:预测值(xyxy, conf, cls)
= pred # list of tensors pred[0] = (xyxy, conf, cls)
# names: 类名
= names # class names
# files: 图像文件名
= files # image filenames
# xyxy:左上角+右下角格式
= pred # xyxy pixels
# xywh:中心点+宽长格式
= [xyxy2xywh(x) for x in pred] # xywh pixels
# xyxyn:xyxy标准化
= [x / g for x, g in zip(, gn)] # xyxy normalized
# xywhn:xywhn标准化
= [x / g for x, g in zip(, gn)] # xywh normalized
= len() # number of images (batch size)
= tuple((times[i + 1] - times[i]) * 1000 / for i in range(3)) # timestamps (ms)
= shape # inference BCHW shape
def display(self, pprint=False, show=False, save=False, crop=False, render=False, save_dir=Path('')):
crops = []
for i, (im, pred) in enumerate(zip(, )):
s = f'image {i + 1}/{len()}: {[0]}x{[1]} ' # string
if [0]:
for c in pred[:, -1].unique():
n = (pred[:, -1] == c).sum() # detections per class
s += f"{n} {[int(c)]}{'s' * (n > 1)}, " # add to string
if show or save or render or crop:
annotator = Annotator(im, example=str())
for *box, conf, cls in reversed(pred): # xyxy, confidence, class
label = f'{[int(cls)]} {conf:.2f}'
if crop:
file = save_dir / 'crops' / [int(cls)] / [i] if save else None
({'box': box, 'conf': conf, 'cls': cls, 'label': label,
'im': save_one_box(box, im, file=file, save=save)})
else: # all others
annotator.box_label(box, label, color=colors(cls))
im =
else:
s += '(no detections)'
im = ((np.uint8)) if isinstance(im, ) else im # from np
if pprint:
((', '))
if show:
([i]) # show
if save:
f = [i]
(save_dir / f) # save
if i == - 1:
(f"Saved {} image{'s' * ( > 1)} to {colorstr('bold', save_dir)}")
if render:
[i] = (im)
if crop:
if save:
(f'Saved results to {save_dir}\n')
return crops
def print(self):
(pprint=True) # print results
(f'Speed: %.1fms pre-process, %.1fms inference, %.1fms NMS per image at shape {tuple()}' %
)
def show(self):
(show=True) # show results
def save(self, save_dir='runs/detect/exp'):
save_dir = increment_path(save_dir, exist_ok=save_dir != 'runs/detect/exp', mkdir=True) # increment save_dir
(save=True, save_dir=save_dir) # save results
def crop(self, save=True, save_dir='runs/detect/exp'):
save_dir = increment_path(save_dir, exist_ok=save_dir != 'runs/detect/exp', mkdir=True) if save else None
return (crop=True, save=save, save_dir=save_dir) # crop results
def render(self):
(render=True) # render results
return
def pandas(self):
# return detections as pandas DataFrames, . print(().xyxy[0])
new = copy(self) # return copy
ca = 'xmin', 'ymin', 'xmax', 'ymax', 'confidence', 'class', 'name' # xyxy columns
cb = 'xcenter', 'ycenter', 'width', 'height', 'confidence', 'class', 'name' # xywh columns
for k, c in zip(['xyxy', 'xyxyn', 'xywh', 'xywhn'], [ca, ca, cb, cb]):
a = [[x[:5] + [int(x[5]), [int(x[5])]] for x in ()] for x in getattr(self, k)] # update
setattr(new, k, [(x, columns=c) for x in a])
return new
def tolist(self):
# return a list of Detections objects, . 'for result in ():'
x = [Detections([[i]], [[i]], , ) for i in range()]
for d in x:
for k in ['imgs', 'pred', 'xyxy', 'xyxyn', 'xywh', 'xywhn']:
setattr(d, k, getattr(d, k)[0]) # pop out of list
return x
def __len__(self):
return
'''===========:二级分类模块================'''
class Classify():
# Classification head, . x(b,c1,20,20) to x(b,c2)
def __init__(self, c1, c2, k=1, s=1, p=None, g=1): # ch_in, ch_out, kernel, stride, padding, groups
"""
这是一个二级分类模块, 什么是二级分类模块? 比如做车牌的识别, 先识别出车牌, 如果想对车牌上的字进行识别, 就需要二级分类进一步检测.
如果对模型输出的分类再进行分类, 就可以用这个模块. 不过这里这个类写的比较简单, 若进行复杂的二级分类, 可以根据自己的实际任务可以改写, 这里代码不唯一.
Classification head, . x(b,c1,20,20) to x(b,c2)
用于第二级分类 可以根据自己的任务自己改写,比较简单
比如车牌识别 检测到车牌之后还需要检测车牌在哪里,如果检测到侧拍后还想对车牌上的字再做识别的话就要进行二级分类
"""
super().__init__()
= nn.AdaptiveAvgPool2d(1) # to x(b,c1,1,1)
# 自适应平均池化操作
= nn.Conv2d(c1, c2, k, s, autopad(k, p), groups=g) # to x(b,c2,1,1)
# 展平
= ()
def forward(self, x):
# 先自适应平均池化操作, 然后拼接
z = ([(y) for y in (x if isinstance(x, list) else [x])], 1) # cat if list
# 对z进行展平操作
return ((z)) # flatten to x(b,c2)