YOLOv5源码逐行超详细注释与解读(7)——网络结构(2)

时间:2025-04-02 09:18:40
  • # YOLOv5 ???? by Ultralytics, GPL-3.0 license
  • """
  • Common modules
  • """
  • '''===============================================一、导入包==================================================='''
  • '''======================1.导入安装好的python库====================='''
  • import json # 用于json和Python数据之间的相互转换
  • import math # 数学函数模块
  • import platform # 获取操作系统的信息
  • import warnings # 警告程序员关于语言或库功能的变化的方法
  • from copy import copy # 数据拷贝模块 分浅拷贝和深拷贝
  • from pathlib import Path # Path将str转换为Path对象 使字符串路径易于操作的模块
  • import cv2 # 调用OpenCV的cv库
  • import numpy as np # numpy数组操作模块
  • import pandas as pd # panda数组操作模块
  • import requests # Python的HTTP客户端库
  • import torch # pytorch深度学习框架
  • import as nn # 专门为神经网络设计的模块化接口
  • from PIL import Image # 图像基础操作模块
  • from import amp # 混合精度训练模块
  • '''===================2.加载自定义模块============================'''
  • from import exif_transpose, letterbox # 加载数据集的函数
  • from import (LOGGER, check_requirements, check_suffix, colorstr, increment_path, make_divisible,
  • non_max_suppression, scale_coords, xywh2xyxy, xyxy2xywh) # 定义了一些常用的工具函数
  • from import Annotator, colors, plot_one_box # 定义了Annotator类,可以在图像上绘制矩形框和标注信息
  • from utils.torch_utils import time_sync # 定义了一些与PyTorch有关的工具函数
  • '''===============================================二、基础组件==================================================='''
  • '''===========:根据输入的卷积核计算该卷积模块所需的pad值================'''
  • # 为same卷积或者same池化自动扩充
  • # 通过卷积核的大小来计算需要的padding为多少才能把tensor补成原来的形状
  • def autopad(k, p=None): # kernel, padding
  • # Pad to 'same'
  • # 如果p是none 则进行下一步
  • if p is None:
  • # 如果k是int 则进行k//2 若不是则进行x//2
  • p = k // 2 if isinstance(k, int) else [x // 2 for x in k] # auto-pad
  • return p
  • '''===========:标准卷积 由Conv + BN + activate组成================'''
  • class Conv():
  • # Standard convolution
  • # init初始化构造函数
  • def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True): # ch_in, ch_out, kernel, stride, padding, groups
  • """在Focus、Bottleneck、BottleneckCSP、C3、SPP、DWConv、TransformerBloc等模块中调用
  • Standard convolution conv+BN+act
  • :params c1: 输入的channel值
  • :params c2: 输出的channel值
  • :params k: 卷积的kernel_size
  • :params s: 卷积的stride
  • :params p: 卷积的padding 一般是None 可以通过autopad自行计算需要pad的padding数
  • :params g: 卷积的groups数 =1就是普通的卷积 >1就是深度可分离卷积
  • :params act: 激活函数类型 True就是SiLU()/Swish False就是不使用激活函数
  • 类型是就使用传进来的激活函数类型
  • """
  • super().__init__()
  • # 卷积层
  • = nn.Conv2d(c1, c2, k, s, autopad(k, p), groups=g, bias=False)
  • # 归一化层
  • = nn.BatchNorm2d(c2)
  • # 激活函数
  • = () if act is True else (act if isinstance(act, ) else ())
  • # 正向计算,网络执行的顺序是根据forward函数来决定的
  • def forward(self, x):
  • # conv卷积 -> bn -> act激活
  • return (((x)))
  • # 正向融合计算
  • def forward_fuse(self, x):
  • # 这里只有卷积和激活
  • return ((x))
  • '''===========:深度可分离卷积================'''
  • class DWConv(Conv):
  • # Depth-wise convolution class
  • def __init__(self, c1, c2, k=1, s=1, act=True): # ch_in, ch_out, kernel, stride, padding, groups
  • super().__init__(c1, c2, k, s, g=(c1, c2), act=act)
  • '''===========:标准的瓶颈层 由1x1conv+3x3conv+残差块组成================'''
  • class Bottleneck():
  • # Standard bottleneck
  • def __init__(self, c1, c2, shortcut=True, g=1, e=0.5): # ch_in, ch_out, shortcut, groups, expansion
  • """在BottleneckCSP和的parse_model中调用
  • Standard bottleneck Conv+Conv+shortcut
  • :params c1: 第一个卷积的输入channel
  • :params c2: 第二个卷积的输出channel
  • :params shortcut: bool 是否有shortcut连接 默认是True
  • :params g: 卷积分组的个数 =1就是普通卷积 >1就是深度可分离卷积
  • :params e: expansion ratio e*c2就是第一个卷积的输出channel=第二个卷积的输入channel
  • """
  • super().__init__()
  • c_ = int(c2 * e) # hidden channels
  • # 1*1卷积层
  • self.cv1 = Conv(c1, c_, 1, 1)
  • # 3*3卷积层
  • self.cv2 = Conv(c_, c2, 3, 1, g=g)
  • # 如果shortcut为True就会将输入和输出相加之后再输出
  • = shortcut and c1 == c2
  • def forward(self, x):
  • return x + self.cv2(self.cv1(x)) if else self.cv2(self.cv1(x))
  • '''===========:瓶颈层 由几个Bottleneck模块的堆叠+CSP结构组成================'''
  • class BottleneckCSP():
  • # CSP Bottleneck /WongKinYiu/CrossStagePartialNetworks
  • def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion
  • """在C3模块和的parse_model模块调用
  • CSP Bottleneck /WongKinYiu/CrossStagePartialNetworks
  • :params c1: 整个BottleneckCSP的输入channel
  • :params c2: 整个BottleneckCSP的输出channel
  • :params n: 有n个Bottleneck
  • :params shortcut: bool Bottleneck中是否有shortcut,默认True
  • :params g: Bottleneck中的3x3卷积类型 =1普通卷积 >1深度可分离卷积
  • :params e: expansion ratio c2xe=中间其他所有层的卷积核个数/中间所有层的输入输出channel数
  • c_: bottleneckCSP 结构的中间层的通道数,由膨胀率e决定
  • """
  • super().__init__()
  • c_ = int(c2 * e) # hidden channels
  • # 4个1*1卷积层的堆叠
  • self.cv1 = Conv(c1, c_, 1, 1)
  • self.cv2 = nn.Conv2d(c1, c_, 1, 1, bias=False)
  • self.cv3 = nn.Conv2d(c_, c_, 1, 1, bias=False)
  • self.cv4 = Conv(2 * c_, c2, 1, 1)
  • # bn层
  • = nn.BatchNorm2d(2 * c_) # applied to cat(cv2, cv3)
  • # 激活函数
  • = ()
  • # m:叠加n次Bottleneck的操作
  • # 操作符*可以把一个list拆开成一个个独立的元素
  • = (*(Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n)))
  • def forward(self, x):
  • # y1相当于先做一次cv1操作然后进行m操作最后进行cv3操作,也就是BCSPn模块中的上面的分支操作
  • # 输入x ->Conv模块 ->n个bottleneck模块 ->Conv模块 ->y1
  • y1 = self.cv3((self.cv1(x)))
  • # y2就是进行cv2操作,也就是BCSPn模块中的下面的分支操作(直接逆行conv操作的分支, Conv--nXBottleneck--conv)
  • # 输入x -> Conv模块 -> 输出y2
  • y2 = self.cv2(x)
  • # 最后y1和y2做拼接, 接着进入bn层做归一化, 然后做act激活, 最后输出cv4
  • # 输入y1,y2->按照通道数融合 ->归一化 -> 激活函数 -> Conv输出 -> 输出
  • # (y1, y2), dim=1: 这里是指定在第一个维度上进行合并,即在channel维度上合并
  • return self.cv4(((((y1, y2), dim=1))))
  • '''===========6.C3:和BottleneckCSP模块类似,但是少了一个Conv模块================'''
  • # ===6.1 C3=== #
  • class C3():
  • # CSP Bottleneck with 3 convolutions
  • def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion
  • """在C3TR模块和的parse_model模块调用
  • CSP Bottleneck with 3 convolutions
  • :params c1: 整个BottleneckCSP的输入channel
  • :params c2: 整个BottleneckCSP的输出channel
  • :params n: 有n个Bottleneck
  • :params shortcut: bool Bottleneck中是否有shortcut,默认True
  • :params g: Bottleneck中的3x3卷积类型 =1普通卷积 >1深度可分离卷积
  • :params e: expansion ratio c2xe=中间其他所有层的卷积核个数/中间所有层的输入输出channel数
  • """
  • super().__init__()
  • c_ = int(c2 * e) # hidden channels
  • # 3个1*1卷积层的堆叠,比BottleneckCSP少一个
  • self.cv1 = Conv(c1, c_, 1, 1)
  • self.cv2 = Conv(c1, c_, 1, 1)
  • self.cv3 = Conv(2 * c_, c2, 1) # act=FReLU(c2)
  • = (*(Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n)))
  • # = (*[CrossConv(c_, c_, 3, 1, g, 1.0, shortcut) for _ in range(n)])
  • def forward(self, x):
  • # 将第一个卷积层与第二个卷积层的结果拼接在一起
  • return self.cv3((((self.cv1(x)), self.cv2(x)), dim=1))
  • # ===6.2 C3SPP(C3):继承自 C3,n 个 Bottleneck 更换为 1 个 SPP=== #
  • class C3SPP(C3):
  • # C3 module with SPP()
  • def __init__(self, c1, c2, k=(5, 9, 13), n=1, shortcut=True, g=1, e=0.5):
  • super().__init__(c1, c2, n, shortcut, g, e)
  • c_ = int(c2 * e)
  • = SPP(c_, c_, k)
  • # ===6.3 C3Ghost(C3):继承自 C3,Bottleneck 更换为 GhostBottleneck=== #
  • class C3Ghost(C3):
  • # C3 module with GhostBottleneck()
  • def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5):
  • super().__init__(c1, c2, n, shortcut, g, e)
  • c_ = int(c2 * e) # hidden channels
  • = (*(GhostBottleneck(c_, c_) for _ in range(n)))
  • '''===========:空间金字塔池化模块================'''
  • # 用在骨干网络收尾阶段,用于融合多尺度特征。
  • # ===7.1 SPP:空间金字塔池化=== #
  • class SPP():
  • # Spatial Pyramid Pooling (SPP) layer /abs/1406.4729
  • def __init__(self, c1, c2, k=(5, 9, 13)):
  • """在的parse_model模块调用
  • 空间金字塔池化 Spatial pyramid pooling layer used in YOLOv3-SPP
  • :params c1: SPP模块的输入channel
  • :params c2: SPP模块的输出channel
  • :params k: 保存着三个maxpool的卷积核大小 默认是(5, 9, 13)
  • """
  • super().__init__()
  • c_ = c1 // 2 # hidden channels
  • # 1*1卷积
  • self.cv1 = Conv(c1, c_, 1, 1)
  • # 这里+1是因为有len(k)+1个输入
  • self.cv2 = Conv(c_ * (len(k) + 1), c2, 1, 1)
  • # m先进行最大池化操作, 然后通过进行构造一个模块 在构造时对每一个k都要进行最大池化
  • = ([nn.MaxPool2d(kernel_size=x, stride=1, padding=x // 2) for x in k])
  • def forward(self, x):
  • # 先进行cv1的操作
  • x = self.cv1(x)
  • # 忽略了警告错误的输出
  • with warnings.catch_warnings():
  • ('ignore') # suppress torch 1.9.0 max_pool2d() warning
  • # 对每一个m进行最大池化 和没有做池化的每一个输入进行叠加 然后做拼接 最后做cv2操作
  • return self.cv2(([x] + [m(x) for m in ], 1))
  • # ===7.2 SPPF:快速版的空间金字塔池化=== #
  • class SPPF():
  • # Spatial Pyramid Pooling - Fast (SPPF) layer for YOLOv5 by Glenn Jocher
  • def __init__(self, c1, c2, k=5): # equivalent to SPP(k=(5, 9, 13))
  • super().__init__()
  • c_ = c1 // 2 # hidden channels
  • self.cv1 = Conv(c1, c_, 1, 1)
  • self.cv2 = Conv(c_ * 4, c2, 1, 1)
  • = nn.MaxPool2d(kernel_size=k, stride=1, padding=k // 2)
  • def forward(self, x):
  • x = self.cv1(x)
  • with warnings.catch_warnings():
  • ('ignore') # suppress torch 1.9.0 max_pool2d() warning
  • y1 = (x)
  • y2 = (y1)
  • return self.cv2(([x, y1, y2, (y2)], 1))
  • '''===========:把宽度w和高度h的信息整合到c空间================'''
  • class Focus():
  • # Focus wh information into c-space
  • def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True): # ch_in, ch_out, kernel, stride, padding, groups
  • """在的parse_model函数中被调用
  • 理论:从高分辨率图像中,周期性的抽出像素点重构到低分辨率图像中,即将图像相邻的四个位置进行堆叠,
  • 聚焦wh维度信息到c通道空,提高每个点感受野,并减少原始信息的丢失,该模块的设计主要是减少计算量加快速度。
  • Focus wh information into c-space 把宽度w和高度h的信息整合到c空间中
  • 先做4个slice 再concat 最后再做Conv
  • slice后 (b,c1,w,h) -> 分成4个slice 每个slice(b,c1,w/2,h/2)
  • concat(dim=1)后 4个slice(b,c1,w/2,h/2)) -> (b,4c1,w/2,h/2)
  • conv后 (b,4c1,w/2,h/2) -> (b,c2,w/2,h/2)
  • :params c1: slice后的channel
  • :params c2: Focus最终输出的channel
  • :params k: 最后卷积的kernel
  • :params s: 最后卷积的stride
  • :params p: 最后卷积的padding
  • :params g: 最后卷积的分组情况 =1普通卷积 >1深度可分离卷积
  • :params act: bool激活函数类型 默认True:SiLU()/Swish False:不用激活函数
  • """
  • super().__init__()
  • # concat后的卷积(最后的卷积)
  • = Conv(c1 * 4, c2, k, s, p, g, act)
  • # = Contract(gain=2)
  • def forward(self, x): # x(b,c,w,h) -> y(b,4c,w/2,h/2)
  • # 先进行切分, 然后进行拼接, 最后再做conv操作
  • return (([x[..., ::2, ::2], x[..., 1::2, ::2], x[..., ::2, 1::2], x[..., 1::2, 1::2]], 1))
  • # return ((x))
  • # 以下模块Contract,Expand,Concat是用来处理输入特征的shape的
  • '''===========:收缩模块:调整张量的大小,将宽高收缩到通道中。================'''
  • class Contract():
  • # Contract width-height into channels, . x(1,64,80,80) to x(1,256,40,40)
  • """用在的parse_model模块 用的不多
  • 改变输入特征的shape 将w和h维度(缩小)的数据收缩到channel维度上(放大)
  • Contract width-height into channels, . x(1,64,80,80) to x(1,256,40,40)
  • """
  • def __init__(self, gain=2):
  • super().__init__()
  • = gain
  • def forward(self, x):
  • b, c, h, w = () # assert (h / s == 0) and (W / s == 0), 'Indivisible gain'
  • s =
  • # permute: 改变tensor的维度顺序
  • x = (b, c, h // s, s, w // s, s) # x(1,64,40,2,40,2)
  • # .view: 改变tensor的维度
  • x = (0, 3, 5, 1, 2, 4).contiguous() # x(1,2,2,64,40,40)
  • return (b, c * s * s, h // s, w // s) # x(1,256,40,40)
  • '''===========:扩张模块,将特征图像素变大================'''
  • class Expand():
  • # Expand channels into width-height, . x(1,64,80,80) to x(1,16,160,160)
  • def __init__(self, gain=2):
  • super().__init__()
  • = gain
  • def forward(self, x):
  • b, c, h, w = () # assert C / s ** 2 == 0, 'Indivisible gain'
  • s =
  • x = (b, s, s, c // s ** 2, h, w) # x(1,2,2,16,80,80)
  • x = (0, 3, 4, 1, 5, 2).contiguous() # x(1,16,80,2,80,2)
  • return (b, c // s ** 2, h * s, w * s) # x(1,16,160,160)
  • '''===========:自定义concat模块,dimension就是维度值,说明沿着哪一个维度进行拼接================'''
  • # 作拼接的一个类
  • # 拼接函数,将两个tensor进行拼接
  • class Concat():
  • # Concatenate a list of tensors along dimension
  • def __init__(self, dimension=1):
  • super().__init__()
  • = dimension
  • def forward(self, x):
  • return (x, )
  • '''===============================================三、注意力模块==================================================='''
  • '''===========:================'''
  • class TransformerLayer():
  • # Transformer layer /abs/2010.11929 (LayerNorm layers removed for better performance)
  • """
  • Transformer layer /abs/2010.11929 (LayerNorm layers removed for better performance)
  • 这部分相当于原论文中的单个Encoder部分(只移除了两个Norm部分, 其他结构和原文中的Encoding一模一样)
  • """
  • def __init__(self, c, num_heads):
  • super().__init__()
  • = (c, c, bias=False)
  • = (c, c, bias=False)
  • = (c, c, bias=False)
  • # 输入: query、key、value
  • # 输出: 0 attn_output 即通过self-attention之后,从每一个词语位置输出来的attention 和输入的query它们形状一样的
  • # 1 attn_output_weights 即attention weights 每一个单词和任意另一个单词之间都会产生一个weight
  • = (embed_dim=c, num_heads=num_heads)
  • self.fc1 = (c, c, bias=False)
  • self.fc2 = (c, c, bias=False)
  • def forward(self, x):
  • # 多头注意力机制 + 残差(这里移除了LayerNorm for better performance)
  • x = ((x), (x), (x))[0] + x
  • # feed forward 前馈神经网络 + 残差(这里移除了LayerNorm for better performance)
  • x = self.fc2(self.fc1(x)) + x
  • return x
  • '''===========:================'''
  • class TransformerBlock():
  • # Vision Transformer /abs/2010.11929
  • def __init__(self, c1, c2, num_heads, num_layers):
  • super().__init__()
  • = None
  • if c1 != c2:
  • = Conv(c1, c2)
  • = (c2, c2) # learnable position embedding
  • = (*(TransformerLayer(c2, num_heads) for _ in range(num_layers)))
  • self.c2 = c2
  • def forward(self, x):
  • if is not None:
  • x = (x)
  • b, _, w, h =
  • p = (2).permute(2, 0, 1)
  • return (p + (p)).permute(1, 2, 0).reshape(b, self.c2, w, h)
  • '''===============================================四、幻象模块==================================================='''
  • '''===========:幻象卷积 轻量化网络卷积模块================'''
  • class GhostConv():
  • # Ghost Convolution /huawei-noah/ghostnet
  • def __init__(self, c1, c2, k=1, s=1, g=1, act=True): # ch_in, ch_out, kernel, stride, groups
  • super().__init__()
  • c_ = c2 // 2 # hidden channels
  • # 第一步卷积: 少量卷积, 一般是一半的计算量
  • self.cv1 = Conv(c1, c_, k, s, None, g, act)
  • # 第二步卷积: cheap operations 使用3x3或5x5的卷积, 并且是逐个特征图的进行卷积(Depth-wise convolutional
  • self.cv2 = Conv(c_, c_, 5, 1, None, c_, act)
  • def forward(self, x):
  • y = self.cv1(x)
  • return ([y, self.cv2(y)], 1)
  • '''===========:幻象瓶颈层 ================'''
  • class GhostBottleneck():
  • # Ghost Bottleneck /huawei-noah/ghostnet
  • def __init__(self, c1, c2, k=3, s=1): # ch_in, ch_out, kernel, stride
  • super().__init__()
  • c_ = c2 // 2
  • = (GhostConv(c1, c_, 1, 1), # pw
  • DWConv(c_, c_, k, s, act=False) if s == 2 else (), # dw
  • GhostConv(c_, c2, 1, 1, act=False)) # pw-linear
  • # 注意, 源码中并不是直接Identity连接, 而是先经过一个DWConv + Conv, 再进行shortcut连接的。
  • = (DWConv(c1, c1, k, s, act=False),
  • Conv(c1, c2, 1, 1, act=False)) if s == 2 else ()
  • def forward(self, x):
  • return (x) + (x)
  • '''===============================================五、模型扩展模块==================================================='''
  • '''===========1.C3TR(C3):继承自 C3,n 个 Bottleneck 更换为 1 个 TransformerBlock ================'''
  • class C3TR(C3):
  • """
  • 这部分是根据上面的C3结构改编而来的, 将原先的Bottleneck替换为调用TransformerBlock模块
  • """
  • # C3 module with TransformerBlock()
  • def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5):
  • ''' 在C3RT模块和的parse_model函数中被调用
  • :params c1: 整个C3的输入channel
  • :params c2: 整个C3的输出channel
  • :params n: 有n个子模块[Bottleneck/CrossConv]
  • :params shortcut: bool值,子模块[Bottlenec/CrossConv]中是否有shortcut,默认True
  • :params g: 子模块[Bottlenec/CrossConv]中的3x3卷积类型,=1普通卷积,>1深度可分离卷积
  • :params e: expansion ratio,e*c2=中间其它所有层的卷积核个数=中间所有层的的输入输出channel
  • '''
  • super().__init__(c1, c2, n, shortcut, g, e)
  • c_ = int(c2 * e)
  • = TransformerBlock(c_, c_, 4, n)
  • '''===========: ================'''
  • class DetectMultiBackend():
  • # YOLOv5 MultiBackend class for python inference on various backends
  • def __init__(self, weights='', device=None, dnn=True):
  • # Usage:
  • # PyTorch: weights = *.pt
  • # TorchScript: *.
  • # CoreML: *.mlmodel
  • # TensorFlow: *_saved_model
  • # TensorFlow: *.pb
  • # TensorFlow Lite: *.tflite
  • # ONNX Runtime: *.onnx
  • # OpenCV DNN: *.onnx with dnn=True
  • super().__init__()
  • # 判断weights是否为list,若是取出第一个值作为传入路径
  • w = str(weights[0] if isinstance(weights, list) else weights)
  • suffix, suffixes = Path(w).(), ['.pt', '.onnx', '.tflite', '.pb', '', '.mlmodel']
  • check_suffix(w, suffixes) # check weights have acceptable suffix
  • pt, onnx, tflite, pb, saved_model, coreml = (suffix == x for x in suffixes) # backend booleans
  • jit = pt and 'torchscript' in ()
  • stride, names = 64, [f'class{i}' for i in range(1000)] # assign defaults
  • if jit: # TorchScript
  • (f'Loading {w} for TorchScript inference...')
  • extra_files = {'': ''} # model metadata
  • model = (w, _extra_files=extra_files)
  • if extra_files['']:
  • d = (extra_files['']) # extra_files dict
  • stride, names = int(d['stride']), d['names']
  • elif pt: # PyTorch
  • from import attempt_load # scoped to avoid circular import
  • model = (w) if 'torchscript' in w else attempt_load(weights, map_location=device)
  • stride = int(.max()) # model stride
  • names = if hasattr(model, 'module') else # get class names
  • elif coreml: # CoreML *.mlmodel
  • import coremltools as ct
  • model = (w)
  • elif dnn: # ONNX OpenCV DNN
  • (f'Loading {w} for ONNX OpenCV DNN inference...')
  • check_requirements(('opencv-python>=4.5.4',))
  • net = (w)
  • elif onnx: # ONNX Runtime
  • (f'Loading {w} for ONNX Runtime inference...')
  • check_requirements(('onnx', 'onnxruntime-gpu' if torch.has_cuda else 'onnxruntime'))
  • import onnxruntime
  • session = (w, None)
  • else: # TensorFlow model (TFLite, pb, saved_model)
  • import tensorflow as tf
  • if pb: # /guide/migrate#a_graphpb_or_graphpbtxt
  • def wrap_frozen_graph(gd, inputs, outputs):
  • x = .v1.wrap_function(lambda: .v1.import_graph_def(gd, name=""), []) # wrapped
  • return (.map_structure(.as_graph_element, inputs),
  • .map_structure(.as_graph_element, outputs))
  • (f'Loading {w} for TensorFlow *.pb inference...')
  • graph_def = ().as_graph_def()
  • graph_def.ParseFromString(open(w, 'rb').read())
  • frozen_func = wrap_frozen_graph(gd=graph_def, inputs="x:0", outputs="Identity:0")
  • elif saved_model:
  • (f'Loading {w} for TensorFlow saved_model inference...')
  • model = .load_model(w)
  • elif tflite: # /lite/guide/python#install_tensorflow_lite_for_python
  • if 'edgetpu' in ():
  • (f'Loading {w} for TensorFlow Edge TPU inference...')
  • import tflite_runtime.interpreter as tfli
  • delegate = {'Linux': '.1', # install /software/#edgetpu-runtime
  • 'Darwin': 'libedgetpu.',
  • 'Windows': ''}[()]
  • interpreter = (model_path=w, experimental_delegates=[tfli.load_delegate(delegate)])
  • else:
  • (f'Loading {w} for TensorFlow Lite inference...')
  • interpreter = (model_path=w) # load TFLite model
  • interpreter.allocate_tensors() # allocate
  • input_details = interpreter.get_input_details() # inputs
  • output_details = interpreter.get_output_details() # outputs
  • self.__dict__.update(locals()) # assign all variables to self
  • def forward(self, im, augment=False, visualize=False, val=False):
  • # YOLOv5 MultiBackend inference
  • b, ch, h, w = # batch, channel, height, width
  • if : # PyTorch
  • y = (im) if else (im, augment=augment, visualize=visualize)
  • return y if val else y[0]
  • elif : # CoreML *.mlmodel
  • im = (0, 2, 3, 1).cpu().numpy() # torch BCHW to numpy BHWC shape(1,320,192,3)
  • im = ((im[0] * 255).astype('uint8'))
  • # im = ((192, 320), )
  • y = ({'image': im}) # coordinates are xywh normalized
  • box = xywh2xyxy(y['coordinates'] * [[w, h, w, h]]) # xyxy pixels
  • conf, cls = y['confidence'].max(1), y['confidence'].argmax(1).astype(np.float)
  • y = ((box, (-1, 1), (-1, 1)), 1)
  • elif : # ONNX
  • im = ().numpy() # torch to numpy
  • if : # ONNX OpenCV DNN
  • (im)
  • y = ()
  • else: # ONNX Runtime
  • y = ([.get_outputs()[0].name], {.get_inputs()[0].name: im})[0]
  • else: # TensorFlow model (TFLite, pb, saved_model)
  • im = (0, 2, 3, 1).cpu().numpy() # torch BCHW to numpy BHWC shape(1,320,192,3)
  • if :
  • y = self.frozen_func(x=(im)).numpy()
  • elif self.saved_model:
  • y = (im, training=False).numpy()
  • elif :
  • input, output = self.input_details[0], self.output_details[0]
  • int8 = input['dtype'] == np.uint8 # is TFLite quantized uint8 model
  • if int8:
  • scale, zero_point = input['quantization']
  • im = (im / scale + zero_point).astype(np.uint8) # de-scale
  • .set_tensor(input['index'], im)
  • ()
  • y = .get_tensor(output['index'])
  • if int8:
  • scale, zero_point = output['quantization']
  • y = ((np.float32) - zero_point) * scale # re-scale
  • y[..., 0] *= w # x
  • y[..., 1] *= h # y
  • y[..., 2] *= w # w
  • y[..., 3] *= h # h
  • y = (y)
  • return (y, []) if val else y
  • '''===========:自动调整shape,该类基本未用================'''
  • class AutoShape():
  • # YOLOv5 input-robust model wrapper for passing cv2/np/PIL/torch inputs. Includes preprocessing, inference and NMS
  • conf = 0.25 # NMS confidence threshold
  • iou = 0.45 # NMS IoU threshold
  • classes = None # (optional list) filter by class, . = [0, 15, 16] for COCO persons, cats and dogs
  • multi_label = False # NMS multiple labels per box
  • max_det = 1000 # maximum number of detections per image
  • def __init__(self, model):
  • super().__init__()
  • = model.eval()
  • def autoshape(self):
  • ('AutoShape already enabled, skipping... ') # model already converted to ()
  • return self
  • def _apply(self, fn):
  • # Apply to(), cpu(), cuda(), half() to model tensors that are not parameters or registered buffers
  • self = super()._apply(fn)
  • m = [-1] # Detect()
  • = fn()
  • = list(map(fn, ))
  • if isinstance(m.anchor_grid, list):
  • m.anchor_grid = list(map(fn, m.anchor_grid))
  • return self
  • @torch.no_grad()
  • def forward(self, imgs, size=640, augment=False, profile=False):
  • # Inference from various sources. For height=640, width=1280, RGB images example inputs are:
  • # file: imgs = 'data/images/' # str or PosixPath
  • # URI: = '/images/'
  • # OpenCV: = ('')[:,:,::-1] # HWC BGR to RGB x(640,1280,3)
  • # PIL: = ('') or () # HWC x(640,1280,3)
  • # numpy: = ((640,1280,3)) # HWC
  • # torch: = (16,3,320,640) # BCHW (scaled to size=640, 0-1 values)
  • # multiple: = [(''), (''), ...] # list of images
  • t = [time_sync()]
  • p = next(()) # for device and type
  • if isinstance(imgs, ): # torch
  • with (enabled=.type != 'cpu'):
  • return (().type_as(p), augment, profile) # inference
  • # Pre-process
  • n, imgs = (len(imgs), imgs) if isinstance(imgs, list) else (1, [imgs]) # number of images, list of images
  • shape0, shape1, files = [], [], [] # image and inference shapes, filenames
  • for i, im in enumerate(imgs):
  • f = f'image{i}' # filename
  • if isinstance(im, (str, Path)): # filename or uri
  • im, f = Image.open((im, stream=True).raw if str(im).startswith('http') else im), im
  • im = (exif_transpose(im))
  • elif isinstance(im, ): # PIL Image
  • im, f = (exif_transpose(im)), getattr(im, 'filename', f) or f
  • (Path(f).with_suffix('.jpg').name)
  • if [0] < 5: # image in CHW
  • im = ((1, 2, 0)) # reverse dataloader .transpose(2, 0, 1)
  • im = im[..., :3] if == 3 else (im[..., None], 3) # enforce 3ch input
  • s = [:2] # HWC
  • (s) # image shape
  • g = (size / max(s)) # gain
  • ([y * g for y in s])
  • imgs[i] = im if else (im) # update
  • shape1 = [make_divisible(x, int(.max())) for x in (shape1, 0).max(0)] # inference shape
  • x = [letterbox(im, new_shape=shape1, auto=False)[0] for im in imgs] # pad
  • x = (x, 0) if n > 1 else x[0][None] # stack
  • x = (((0, 3, 1, 2))) # BHWC to BCHW
  • x = torch.from_numpy(x).to().type_as(p) / 255 # uint8 to fp16/32
  • (time_sync())
  • with (enabled=.type != 'cpu'):
  • # Inference
  • y = (x, augment, profile)[0] # forward
  • (time_sync())
  • # Post-process
  • y = non_max_suppression(y, , iou_thres=, classes=,
  • multi_label=self.multi_label, max_det=self.max_det) # NMS
  • for i in range(n):
  • scale_coords(shape1, y[i][:, :4], shape0[i])
  • (time_sync())
  • return Detections(imgs, y, files, t, , )
  • '''===========:对推理结果进行处理================'''
  • class Detections:
  • # YOLOv5 detections class for inference results
  • """用在AutoShape函数结尾
  • detections class for YOLOv5 inference results
  • """
  • def __init__(self, imgs, pred, files, times=None, names=None, shape=None):
  • super().__init__()
  • d = pred[0].device # device
  • gn = [([*([i] for i in [1, 0, 1, 0]), 1, 1], device=d) for im in imgs] # normalizations
  • # imgs:原图
  • = imgs # list of images as numpy arrays
  • # pred:预测值(xyxy, conf, cls)
  • = pred # list of tensors pred[0] = (xyxy, conf, cls)
  • # names: 类名
  • = names # class names
  • # files: 图像文件名
  • = files # image filenames
  • # xyxy:左上角+右下角格式
  • = pred # xyxy pixels
  • # xywh:中心点+宽长格式
  • = [xyxy2xywh(x) for x in pred] # xywh pixels
  • # xyxyn:xyxy标准化
  • = [x / g for x, g in zip(, gn)] # xyxy normalized
  • # xywhn:xywhn标准化
  • = [x / g for x, g in zip(, gn)] # xywh normalized
  • = len() # number of images (batch size)
  • = tuple((times[i + 1] - times[i]) * 1000 / for i in range(3)) # timestamps (ms)
  • = shape # inference BCHW shape
  • def display(self, pprint=False, show=False, save=False, crop=False, render=False, save_dir=Path('')):
  • crops = []
  • for i, (im, pred) in enumerate(zip(, )):
  • s = f'image {i + 1}/{len()}: {[0]}x{[1]} ' # string
  • if [0]:
  • for c in pred[:, -1].unique():
  • n = (pred[:, -1] == c).sum() # detections per class
  • s += f"{n} {[int(c)]}{'s' * (n > 1)}, " # add to string
  • if show or save or render or crop:
  • annotator = Annotator(im, example=str())
  • for *box, conf, cls in reversed(pred): # xyxy, confidence, class
  • label = f'{[int(cls)]} {conf:.2f}'
  • if crop:
  • file = save_dir / 'crops' / [int(cls)] / [i] if save else None
  • ({'box': box, 'conf': conf, 'cls': cls, 'label': label,
  • 'im': save_one_box(box, im, file=file, save=save)})
  • else: # all others
  • annotator.box_label(box, label, color=colors(cls))
  • im =
  • else:
  • s += '(no detections)'
  • im = ((np.uint8)) if isinstance(im, ) else im # from np
  • if pprint:
  • ((', '))
  • if show:
  • ([i]) # show
  • if save:
  • f = [i]
  • (save_dir / f) # save
  • if i == - 1:
  • (f"Saved {} image{'s' * ( > 1)} to {colorstr('bold', save_dir)}")
  • if render:
  • [i] = (im)
  • if crop:
  • if save:
  • (f'Saved results to {save_dir}\n')
  • return crops
  • def print(self):
  • (pprint=True) # print results
  • (f'Speed: %.1fms pre-process, %.1fms inference, %.1fms NMS per image at shape {tuple()}' %
  • )
  • def show(self):
  • (show=True) # show results
  • def save(self, save_dir='runs/detect/exp'):
  • save_dir = increment_path(save_dir, exist_ok=save_dir != 'runs/detect/exp', mkdir=True) # increment save_dir
  • (save=True, save_dir=save_dir) # save results
  • def crop(self, save=True, save_dir='runs/detect/exp'):
  • save_dir = increment_path(save_dir, exist_ok=save_dir != 'runs/detect/exp', mkdir=True) if save else None
  • return (crop=True, save=save, save_dir=save_dir) # crop results
  • def render(self):
  • (render=True) # render results
  • return
  • def pandas(self):
  • # return detections as pandas DataFrames, . print(().xyxy[0])
  • new = copy(self) # return copy
  • ca = 'xmin', 'ymin', 'xmax', 'ymax', 'confidence', 'class', 'name' # xyxy columns
  • cb = 'xcenter', 'ycenter', 'width', 'height', 'confidence', 'class', 'name' # xywh columns
  • for k, c in zip(['xyxy', 'xyxyn', 'xywh', 'xywhn'], [ca, ca, cb, cb]):
  • a = [[x[:5] + [int(x[5]), [int(x[5])]] for x in ()] for x in getattr(self, k)] # update
  • setattr(new, k, [(x, columns=c) for x in a])
  • return new
  • def tolist(self):
  • # return a list of Detections objects, . 'for result in ():'
  • x = [Detections([[i]], [[i]], , ) for i in range()]
  • for d in x:
  • for k in ['imgs', 'pred', 'xyxy', 'xyxyn', 'xywh', 'xywhn']:
  • setattr(d, k, getattr(d, k)[0]) # pop out of list
  • return x
  • def __len__(self):
  • return
  • '''===========:二级分类模块================'''
  • class Classify():
  • # Classification head, . x(b,c1,20,20) to x(b,c2)
  • def __init__(self, c1, c2, k=1, s=1, p=None, g=1): # ch_in, ch_out, kernel, stride, padding, groups
  • """
  • 这是一个二级分类模块, 什么是二级分类模块? 比如做车牌的识别, 先识别出车牌, 如果想对车牌上的字进行识别, 就需要二级分类进一步检测.
  • 如果对模型输出的分类再进行分类, 就可以用这个模块. 不过这里这个类写的比较简单, 若进行复杂的二级分类, 可以根据自己的实际任务可以改写, 这里代码不唯一.
  • Classification head, . x(b,c1,20,20) to x(b,c2)
  • 用于第二级分类 可以根据自己的任务自己改写,比较简单
  • 比如车牌识别 检测到车牌之后还需要检测车牌在哪里,如果检测到侧拍后还想对车牌上的字再做识别的话就要进行二级分类
  • """
  • super().__init__()
  • = nn.AdaptiveAvgPool2d(1) # to x(b,c1,1,1)
  • # 自适应平均池化操作
  • = nn.Conv2d(c1, c2, k, s, autopad(k, p), groups=g) # to x(b,c2,1,1)
  • # 展平
  • = ()
  • def forward(self, x):
  • # 先自适应平均池化操作, 然后拼接
  • z = ([(y) for y in (x if isinstance(x, list) else [x])], 1) # cat if list
  • # 对z进行展平操作
  • return ((z)) # flatten to x(b,c2)