MobileNetV2是一种轻量级的深度神经网络模型,专为移动和边缘设备设计。它是MobileNetV1的改进版本,于2018年由Google推出。MobileNetV2基于一个高效的深度可分离卷积架构,它可以显著减少模型的大小和计算成本,同时保持较高的准确性。

MobileNetV2的主要特点包括:

  1. 深度可分离卷积(Depthwise Separable Convolution):MobileNetV2使用深度可分离卷积来代替传统的卷积层,这种结构可以将卷积操作分解为深度卷积和逐点卷积,从而大大减少参数数量和计算量。

  2. 线性瓶颈(Linear Bottlenecks):MobileNetV2引入了线性瓶颈层,这一层使用线性激活函数(即没有ReLU),以保持特征的完整性。

  3. 倒残差结构(Inverted Residuals):MobileNetV2采用了倒残差结构,即在每个残差块中,首先使用一个扩展层(通过1x1卷积)增加通道的数量,然后使用深度可分离卷积进行特征提取,最后再用一个1x1卷积减少通道数量。这样的结构可以提高网络的表达能力。

  4. 轻量级:由于其独特的网络结构和深度可分离卷积的使用,MobileNetV2在保持较高准确性的同时,模型尺寸和计算需求远小于传统卷积网络,适合在资源有限的设备上运行。

    from download import download
    
    # 下载data_en数据集
    url = "https://ascend-professional-construction-dataset.obs.cn-north-4.myhuaweicloud.com:443/MindStudio-pc/data_en.zip" 
    path = download(url, "./", kind="zip", replace=True)
    from download import download
    
    # 下载预训练权重文件
    url = "https://ascend-professional-construction-dataset.obs.cn-north-4.myhuaweicloud.com:443/ComputerVision/mobilenetV2-200_1067.zip" 
    path = download(url, "./", kind="zip", replace=True)
    import math
    import numpy as np
    import os
    import random
    
    from matplotlib import pyplot as plt
    from easydict import EasyDict
    from PIL import Image
    import numpy as np
    import mindspore.nn as nn
    from mindspore import ops as P
    from mindspore.ops import add
    from mindspore import Tensor
    import mindspore.common.dtype as mstype
    import mindspore.dataset as de
    import mindspore.dataset.vision as C
    import mindspore.dataset.transforms as C2
    import mindspore as ms
    from mindspore import set_context, nn, Tensor, load_checkpoint, save_checkpoint, export
    from mindspore.train import Model
    from mindspore.train import Callback, LossMonitor, ModelCheckpoint, CheckpointConfig
    
    os.environ['GLOG_v'] = '3' # Log level includes 3(ERROR), 2(WARNING), 1(INFO), 0(DEBUG).
    os.environ['GLOG_logtostderr'] = '0' # 0:输出到文件,1:输出到屏幕
    os.environ['GLOG_log_dir'] = '../../log' # 日志目录
    os.environ['GLOG_stderrthreshold'] = '2' # 输出到目录也输出到屏幕:3(ERROR), 2(WARNING), 1(INFO), 0(DEBUG).
    set_context(mode=ms.GRAPH_MODE, device_target="CPU", device_id=0) # 设置采用图模式执行,设备为Ascend#
    # 垃圾分类数据集标签,以及用于标签映射的字典。
    garbage_classes = {
        '干垃圾': ['贝壳', '打火机', '旧镜子', '扫把', '陶瓷碗', '牙刷', '一次性筷子', '脏污衣服'],
        '可回收物': ['报纸', '玻璃制品', '篮球', '塑料瓶', '硬纸板', '玻璃瓶', '金属制品', '帽子', '易拉罐', '纸张'],
        '湿垃圾': ['菜叶', '橙皮', '蛋壳', '香蕉皮'],
        '有害垃圾': ['电池', '药片胶囊', '荧光灯', '油漆桶']
    }
    
    class_cn = ['贝壳', '打火机', '旧镜子', '扫把', '陶瓷碗', '牙刷', '一次性筷子', '脏污衣服',
                '报纸', '玻璃制品', '篮球', '塑料瓶', '硬纸板', '玻璃瓶', '金属制品', '帽子', '易拉罐', '纸张',
                '菜叶', '橙皮', '蛋壳', '香蕉皮',
                '电池', '药片胶囊', '荧光灯', '油漆桶']
    class_en = ['Seashell', 'Lighter','Old Mirror', 'Broom','Ceramic Bowl', 'Toothbrush','Disposable Chopsticks','Dirty Cloth',
                'Newspaper', 'Glassware', 'Basketball', 'Plastic Bottle', 'Cardboard','Glass Bottle', 'Metalware', 'Hats', 'Cans', 'Paper',
                'Vegetable Leaf','Orange Peel', 'Eggshell','Banana Peel',
                'Battery', 'Tablet capsules','Fluorescent lamp', 'Paint bucket']
    
    index_en = {'Seashell': 0, 'Lighter': 1, 'Old Mirror': 2, 'Broom': 3, 'Ceramic Bowl': 4, 'Toothbrush': 5, 'Disposable Chopsticks': 6, 'Dirty Cloth': 7,
                'Newspaper': 8, 'Glassware': 9, 'Basketball': 10, 'Plastic Bottle': 11, 'Cardboard': 12, 'Glass Bottle': 13, 'Metalware': 14, 'Hats': 15, 'Cans': 16, 'Paper': 17,
                'Vegetable Leaf': 18, 'Orange Peel': 19, 'Eggshell': 20, 'Banana Peel': 21,
                'Battery': 22, 'Tablet capsules': 23, 'Fluorescent lamp': 24, 'Paint bucket': 25}
    
    # 训练超参
    config = EasyDict({
        "num_classes": 26,
        "image_height": 224,
        "image_width": 224,
        #"data_split": [0.9, 0.1],
        "backbone_out_channels":1280,
        "batch_size": 16,
        "eval_batch_size": 8,
        "epochs": 10,
        "lr_max": 0.05,
        "momentum": 0.9,
        "weight_decay": 1e-4,
        "save_ckpt_epochs": 1,
        "dataset_path": "./data_en",
        "class_index": index_en,
        "pretrained_ckpt": "./mobilenetV2-200_1067.ckpt" # mobilenetV2-200_1067.ckpt 
    })
    def create_dataset(dataset_path, config, training=True, buffer_size=1000):
        """
        create a train or eval dataset
    
        Args:
            dataset_path(string): the path of dataset.
            config(struct): the config of train and eval in diffirent platform.
    
        Returns:
            train_dataset, val_dataset
        """
        data_path = os.path.join(dataset_path, 'train' if training else 'test')
        ds = de.ImageFolderDataset(data_path, num_parallel_workers=4, class_indexing=config.class_index)
        resize_height = config.image_height
        resize_width = config.image_width
        
        normalize_op = C.Normalize(mean=[0.485*255, 0.456*255, 0.406*255], std=[0.229*255, 0.224*255, 0.225*255])
        change_swap_op = C.HWC2CHW()
        type_cast_op = C2.TypeCast(mstype.int32)
    
        if training:
            crop_decode_resize = C.RandomCropDecodeResize(resize_height, scale=(0.08, 1.0), ratio=(0.75, 1.333))
            horizontal_flip_op = C.RandomHorizontalFlip(prob=0.5)
            color_adjust = C.RandomColorAdjust(brightness=0.4, contrast=0.4, saturation=0.4)
        
            train_trans = [crop_decode_resize, horizontal_flip_op, color_adjust, normalize_op, change_swap_op]
            train_ds = ds.map(input_columns="image", operations=train_trans, num_parallel_workers=4)
            train_ds = train_ds.map(input_columns="label", operations=type_cast_op, num_parallel_workers=4)
            
            train_ds = train_ds.shuffle(buffer_size=buffer_size)
            ds = train_ds.batch(config.batch_size, drop_remainder=True)
        else:
            decode_op = C.Decode()
            resize_op = C.Resize((int(resize_width/0.875), int(resize_width/0.875)))
            center_crop = C.CenterCrop(resize_width)
            
            eval_trans = [decode_op, resize_op, center_crop, normalize_op, change_swap_op]
            eval_ds = ds.map(input_columns="image", operations=eval_trans, num_parallel_workers=4)
            eval_ds = eval_ds.map(input_columns="label", operations=type_cast_op, num_parallel_workers=4)
            ds = eval_ds.batch(config.eval_batch_size, drop_remainder=True)
    
        return ds
    ds = create_dataset(dataset_path=config.dataset_path, config=config, training=False)
    print(ds.get_dataset_size())
    data = ds.create_dict_iterator(output_numpy=True)._get_next()
    images = data['image']
    labels = data['label']
    
    for i in range(1, 5):
        plt.subplot(2, 2, i)
        plt.imshow(np.transpose(images[i], (1,2,0)))
        plt.title('label: %s' % class_en[labels[i]])
        plt.xticks([])
    plt.show()

    MobileNetV2模型搭建

    使用MindSpore定义MobileNetV2网络的各模块时需要继承mindspore.nn.Cell。Cell是所有神经网络(Conv2d等)的基类。

    神经网络的各层需要预先在__init__方法中定义,然后通过定义construct方法来完成神经网络的前向构造。原始模型激活函数为ReLU6,池化模块采用是全局平均池化层。

__all__ = ['MobileNetV2', 'MobileNetV2Backbone', 'MobileNetV2Head', 'mobilenet_v2']

def _make_divisible(v, divisor, min_value=None):
    if min_value is None:
        min_value = divisor
    new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
    if new_v < 0.9 * v:
        new_v += divisor
    return new_v

class GlobalAvgPooling(nn.Cell):
    """
    Global avg pooling definition.

    Args:

    Returns:
        Tensor, output tensor.

    Examples:
        >>> GlobalAvgPooling()
    """

    def __init__(self):
        super(GlobalAvgPooling, self).__init__()

    def construct(self, x):
        x = P.mean(x, (2, 3))
        return x

class ConvBNReLU(nn.Cell):
    """
    Convolution/Depthwise fused with Batchnorm and ReLU block definition.

    Args:
        in_planes (int): Input channel.
        out_planes (int): Output channel.
        kernel_size (int): Input kernel size.
        stride (int): Stride size for the first convolutional layer. Default: 1.
        groups (int): channel group. Convolution is 1 while Depthiwse is input channel. Default: 1.

    Returns:
        Tensor, output tensor.

    Examples:
        >>> ConvBNReLU(16, 256, kernel_size=1, stride=1, groups=1)
    """

    def __init__(self, in_planes, out_planes, kernel_size=3, stride=1, groups=1):
        super(ConvBNReLU, self).__init__()
        padding = (kernel_size - 1) // 2
        in_channels = in_planes
        out_channels = out_planes
        if groups == 1:
            conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, pad_mode='pad', padding=padding)
        else:
            out_channels = in_planes
            conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, pad_mode='pad',
                             padding=padding, group=in_channels)

        layers = [conv, nn.BatchNorm2d(out_planes), nn.ReLU6()]
        self.features = nn.SequentialCell(layers)

    def construct(self, x):
        output = self.features(x)
        return output

class InvertedResidual(nn.Cell):
    """
    Mobilenetv2 residual block definition.

    Args:
        inp (int): Input channel.
        oup (int): Output channel.
        stride (int): Stride size for the first convolutional layer. Default: 1.
        expand_ratio (int): expand ration of input channel

    Returns:
        Tensor, output tensor.

    Examples:
        >>> ResidualBlock(3, 256, 1, 1)
    """

    def __init__(self, inp, oup, stride, expand_ratio):
        super(InvertedResidual, self).__init__()
        assert stride in [1, 2]

        hidden_dim = int(round(inp * expand_ratio))
        self.use_res_connect = stride == 1 and inp == oup

        layers = []
        if expand_ratio != 1:
            layers.append(ConvBNReLU(inp, hidden_dim, kernel_size=1))
        layers.extend([
            ConvBNReLU(hidden_dim, hidden_dim,
                       stride=stride, groups=hidden_dim),
            nn.Conv2d(hidden_dim, oup, kernel_size=1,
                      stride=1, has_bias=False),
            nn.BatchNorm2d(oup),
        ])
        self.conv = nn.SequentialCell(layers)
        self.cast = P.Cast()

    def construct(self, x):
        identity = x
        x = self.conv(x)
        if self.use_res_connect:
            return P.add(identity, x)
        return x

class MobileNetV2Backbone(nn.Cell):
    """
    MobileNetV2 architecture.

    Args:
        class_num (int): number of classes.
        width_mult (int): Channels multiplier for round to 8/16 and others. Default is 1.
        has_dropout (bool): Is dropout used. Default is false
        inverted_residual_setting (list): Inverted residual settings. Default is None
        round_nearest (list): Channel round to . Default is 8
    Returns:
        Tensor, output tensor.

    Examples:
        >>> MobileNetV2(num_classes=1000)
    """

    def __init__(self, width_mult=1., inverted_residual_setting=None, round_nearest=8,
                 input_channel=32, last_channel=1280):
        super(MobileNetV2Backbone, self).__init__()
        block = InvertedResidual
        # setting of inverted residual blocks
        self.cfgs = inverted_residual_setting
        if inverted_residual_setting is None:
            self.cfgs = [
                # t, c, n, s
                [1, 16, 1, 1],
                [6, 24, 2, 2],
                [6, 32, 3, 2],
                [6, 64, 4, 2],
                [6, 96, 3, 1],
                [6, 160, 3, 2],
                [6, 320, 1, 1],
            ]

        # building first layer
        input_channel = _make_divisible(input_channel * width_mult, round_nearest)
        self.out_channels = _make_divisible(last_channel * max(1.0, width_mult), round_nearest)
        features = [ConvBNReLU(3, input_channel, stride=2)]
        # building inverted residual blocks
        for t, c, n, s in self.cfgs:
            output_channel = _make_divisible(c * width_mult, round_nearest)
            for i in range(n):
                stride = s if i == 0 else 1
                features.append(block(input_channel, output_channel, stride, expand_ratio=t))
                input_channel = output_channel
        features.append(ConvBNReLU(input_channel, self.out_channels, kernel_size=1))
        self.features = nn.SequentialCell(features)
        self._initialize_weights()

    def construct(self, x):
        x = self.features(x)
        return x

    def _initialize_weights(self):
        """
        Initialize weights.

        Args:

        Returns:
            None.

        Examples:
            >>> _initialize_weights()
        """
        self.init_parameters_data()
        for _, m in self.cells_and_names():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.set_data(Tensor(np.random.normal(0, np.sqrt(2. / n),
                                                          m.weight.data.shape).astype("float32")))
                if m.bias is not None:
                    m.bias.set_data(
                        Tensor(np.zeros(m.bias.data.shape, dtype="float32")))
            elif isinstance(m, nn.BatchNorm2d):
                m.gamma.set_data(
                    Tensor(np.ones(m.gamma.data.shape, dtype="float32")))
                m.beta.set_data(
                    Tensor(np.zeros(m.beta.data.shape, dtype="float32")))

    @property
    def get_features(self):
        return self.features

class MobileNetV2Head(nn.Cell):
    """
    MobileNetV2 architecture.

    Args:
        class_num (int): Number of classes. Default is 1000.
        has_dropout (bool): Is dropout used. Default is false
    Returns:
        Tensor, output tensor.

    Examples:
        >>> MobileNetV2(num_classes=1000)
    """

    def __init__(self, input_channel=1280, num_classes=1000, has_dropout=False, activation="None"):
        super(MobileNetV2Head, self).__init__()
        # mobilenet head
        head = ([GlobalAvgPooling(), nn.Dense(input_channel, num_classes, has_bias=True)] if not has_dropout else
                [GlobalAvgPooling(), nn.Dropout(0.2), nn.Dense(input_channel, num_classes, has_bias=True)])
        self.head = nn.SequentialCell(head)
        self.need_activation = True
        if activation == "Sigmoid":
            self.activation = nn.Sigmoid()
        elif activation == "Softmax":
            self.activation = nn.Softmax()
        else:
            self.need_activation = False
        self._initialize_weights()

    def construct(self, x):
        x = self.head(x)
        if self.need_activation:
            x = self.activation(x)
        return x

    def _initialize_weights(self):
        """
        Initialize weights.

        Args:

        Returns:
            None.

        Examples:
            >>> _initialize_weights()
        """
        self.init_parameters_data()
        for _, m in self.cells_and_names():
            if isinstance(m, nn.Dense):
                m.weight.set_data(Tensor(np.random.normal(
                    0, 0.01, m.weight.data.shape).astype("float32")))
                if m.bias is not None:
                    m.bias.set_data(
                        Tensor(np.zeros(m.bias.data.shape, dtype="float32")))
    @property
    def get_head(self):
        return self.head

class MobileNetV2(nn.Cell):
    """
    MobileNetV2 architecture.

    Args:
        class_num (int): number of classes.
        width_mult (int): Channels multiplier for round to 8/16 and others. Default is 1.
        has_dropout (bool): Is dropout used. Default is false
        inverted_residual_setting (list): Inverted residual settings. Default is None
        round_nearest (list): Channel round to . Default is 8
    Returns:
        Tensor, output tensor.

    Examples:
        >>> MobileNetV2(backbone, head)
    """

    def __init__(self, num_classes=1000, width_mult=1., has_dropout=False, inverted_residual_setting=None, \
        round_nearest=8, input_channel=32, last_channel=1280):
        super(MobileNetV2, self).__init__()
        self.backbone = MobileNetV2Backbone(width_mult=width_mult, \
            inverted_residual_setting=inverted_residual_setting, \
            round_nearest=round_nearest, input_channel=input_channel, last_channel=last_channel).get_features
        self.head = MobileNetV2Head(input_channel=self.backbone.out_channel, num_classes=num_classes, \
            has_dropout=has_dropout).get_head

    def construct(self, x):
        x = self.backbone(x)
        x = self.head(x)
        return x

class MobileNetV2Combine(nn.Cell):
    """
    MobileNetV2Combine architecture.

    Args:
        backbone (Cell): the features extract layers.
        head (Cell):  the fully connected layers.
    Returns:
        Tensor, output tensor.

    Examples:
        >>> MobileNetV2(num_classes=1000)
    """

    def __init__(self, backbone, head):
        super(MobileNetV2Combine, self).__init__(auto_prefix=False)
        self.backbone = backbone
        self.head = head

    def construct(self, x):
        x = self.backbone(x)
        x = self.head(x)
        return x

def mobilenet_v2(backbone, head):
    return MobileNetV2Combine(backbone, head)

导出AIR/GEIR/ONNX模型文件

backbone = MobileNetV2Backbone(last_channel=config.backbone_out_channels)
head = MobileNetV2Head(input_channel=backbone.out_channels, num_classes=config.num_classes)
network = mobilenet_v2(backbone, head)
load_checkpoint(CKPT, network)

input = np.random.uniform(0.0, 1.0, size=[1, 3, 224, 224]).astype(np.float32)
# export(network, Tensor(input), file_name='mobilenetv2.air', file_format='AIR')
# export(network, Tensor(input), file_name='mobilenetv2.pb', file_format='GEIR')
export(network, Tensor(input), file_name='mobilenetv2.onnx', file_format='ONNX')

Logo

昇腾计算产业是基于昇腾系列(HUAWEI Ascend)处理器和基础软件构建的全栈 AI计算基础设施、行业应用及服务,https://devpress.csdn.net/organization/setting/general/146749包括昇腾系列处理器、系列硬件、CANN、AI计算框架、应用使能、开发工具链、管理运维工具、行业应用及服务等全产业链

更多推荐