PyTorch图像分割模型——segmentation_models_pytorch库的使用

一、概要

segmentation_models_pytorch是一个基于PyTorch的图像分割神经网络

这个新集合由俄罗斯的程序员小哥Pavel Yakubovskiy一手打造,对于图像分割而言简直就是神器般的存在。
github地址:https://github.com/qubvel/segmentation_models.pytorch

该库的主要功能有:

  1. 高级API(只需两行即可创建神经网络);
  2. 用于二分类和多类分割的9种模型架构(包括传奇的Unet)(Unet、Unet++、MAnet、Linknet、FPN、PSPNet、PAN、DeepLabV3、DeepLabV3+);
  3. 每种架构有113种可用的编码器;
  4. 所有编码器均具有预训练的权重,以实现更快更好的收敛。

这篇文章我主要介绍UNet++模型的训练和检测,UNet在图像分割中已经足够传奇,可以利用底层的特征(同分辨率级联)改善上采样的信息不足,对训练所要求的数据集并不是特别大,而且准确率也相当的高,UNet++又在UNet的基础上增加了剪枝的操作配合深监督,整合不同层次的特征以提升精度,且减少了图像分割所用的时间,只是在内存的要求上相应的增加了些。关于UNet和UNet++具体的解读可参考这两篇文章:
UNET详解和UNET++介绍(零基础)
[论文解读]UNet++解读 + 它是如何对UNet改进 + 作者的研究态度和方式

话不多说,我们开始基于segmentation_models_pytorch库进行UNet++模型训练。

二、环境配置

首先我们需创建一个虚拟环境,我这里采用的是python3.7版本。

conda create -n segmentation python==3.7

创建好后进入segmentation虚拟环境,并安装我们这里要用的图像分割神器segmentation-models-pytorch。

pip install segmentation-models-pytorch

安装segmentation-models-pytorch会一同安装上torch和torchvision,但是这时要注意了,这里安装进去的是CPU版的而且是最新版的pytorch,如果你确实打算用cpu来做的话那后面的也可以忽略,但是做目标检测和图像分割不用GPU简直没有灵魂啊,而且也为了方便我们同目标检测的神经网络结合,所以我们在segmentation-models-pytorch安装好后要先卸载掉torch和torchvision,然后再安装相应的cuda版本的torch和torchvision。我电脑采用的是cuda11.0,所以首先下载后对应cuda11.0的torch和torchvision,并进行安装。我这里采用的是下载好的离线安装包。

pip install torch-1.7.1+cu110-cp37-cp37m-win_amd64.whl
pip install torchvision-0.8.2+cu110-cp37-cp37m-win_amd64.whl

安装好后虚拟环境中的pytorch如下图所示。

同时,我们还需安装以下库:

pip install albumentations
pip install matplotlib
pip install imageio
pip install opencv-python

安装好后,虚拟环境中所含的库如下图所示。

此时,我们的前期工作就准备好了,接下来开始图像分割模型的训练和测试。

三、segmentation_models_pytorch库之UNet++模型训练

3.1 UNet++训练

首先我们看一下segmentation_models_pytorch库是怎么使用他所包含的9种模型的。

由于该库是基于PyTorch框架构建的,因此创建的细分模型只是一个PyTorch nn.Module,可以轻松地创建它。

import segmentation_models_pytorch as smp

model = smp.UnetPlusPlus(
    encoder_name="resnet34",        # 选择解码器, 例如 mobilenet_v2 或 efficientnet-b7
    encoder_weights="imagenet",     # 使用预先训练的权重imagenet进行解码器初始化
    in_channels=1,                  # 模型输入通道(1个用于灰度图像,3个用于RGB等)
    classes=3,                      # 模型输出通道(数据集所分的类别总数)
)

如果要使用UNet、FPN等模型,只要改成model = smp.Unet()或model = smp.FPN()即可。

所有模型均具有预训练的编码器,因此必须按照权重预训练的相同方法准备数据。

from segmentation_models_pytorch.encoders import get_preprocessing_fn

preprocess_input = get_preprocessing_fn('resnet18', pretrained='imagenet')

授人以鱼不如授人以渔,放出了9种模型还不算,俄罗斯大神还贴心地提供了如何使用CamVid数据集进行训练的示例(详情可见segmentation_models_pytorch在GitHub中的examples/cars segmentation (camvid).ipynb)。

CamVid数据集是计算机视觉领域常用的一个数据集,拥有汽车、建筑、人行道、云、树等12种不同的类别,通常用来进行街景分割。方便起见,我们从GitHub上拿来一套已经标注好的图像分割的数据集来进行训练和测试,下载地址:CamVid数据集

这里尤其要注意的是我们训练所用的数据集,不管是原图还是含有标注信息的图像,都必须是png格式,因为png图像可以做到无损压缩,能在保证最不失真的情况下尽可能压缩图像文件的大小。我们从CamVid数据集中所下载的数据也都是png格式的。

segmentation包含9种用于二分类和多类分割的模型架构,这里我们只采用UNet++,并使用了se_resnext50_32x4d这个预训练骨干模型。

训练UNet++模型的代码如下所示:

import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

import numpy as np
import cv2
import matplotlib.pyplot as plt
import albumentations as albu
import torch
import segmentation_models_pytorch as smp
from torch.utils.data import DataLoader
from torch.utils.data import Dataset as BaseDataset


# ---------------------------------------------------------------
### 加载数据

class Dataset(BaseDataset):
    """CamVid数据集。进行图像读取,图像增强增强和图像预处理.

    Args:
        images_dir (str): 图像文件夹所在路径
        masks_dir (str): 图像分割的标签图像所在路径
        class_values (list): 用于图像分割的所有类别数
        augmentation (albumentations.Compose): 数据传输管道
        preprocessing (albumentations.Compose): 数据预处理
    """
	# CamVid数据集中用于图像分割的所有标签类别
    CLASSES = ['sky', 'building', 'pole', 'road', 'pavement',
               'tree', 'signsymbol', 'fence', 'car',
               'pedestrian', 'bicyclist', 'unlabelled']

    def __init__(
            self,
            images_dir,
            masks_dir,
            classes=None,
            augmentation=None,
            preprocessing=None,
    ):
        self.ids = os.listdir(images_dir)
        self.images_fps = [os.path.join(images_dir, image_id) for image_id in self.ids]
        self.masks_fps = [os.path.join(masks_dir, image_id) for image_id in self.ids]

        # convert str names to class values on masks
        self.class_values = [self.CLASSES.index(cls.lower()) for cls in classes]

        self.augmentation = augmentation
        self.preprocessing = preprocessing

    def __getitem__(self, i):

        # read data
        image = cv2.imread(self.images_fps[i])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(self.masks_fps[i], 0)

        # 从标签中提取特定的类别 (e.g. cars)
        masks = [(mask == v) for v in self.class_values]
        mask = np.stack(masks, axis=-1).astype('float')

        # 图像增强应用
        if self.augmentation:
            sample = self.augmentation(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']

        # 图像预处理应用
        if self.preprocessing:
            sample = self.preprocessing(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']

        return image, mask

    def __len__(self):
        return len(self.ids)

# ---------------------------------------------------------------
### 图像增强

def get_training_augmentation():
    train_transform = [

        albu.HorizontalFlip(p=0.5),

        albu.ShiftScaleRotate(scale_limit=0.5, rotate_limit=0, shift_limit=0.1, p=1, border_mode=0),

        albu.PadIfNeeded(min_height=320, min_width=320, always_apply=True, border_mode=0),
        albu.RandomCrop(height=320, width=320, always_apply=True),

        albu.IAAAdditiveGaussianNoise(p=0.2),
        albu.IAAPerspective(p=0.5),

        albu.OneOf(
            [
                albu.CLAHE(p=1),
                albu.RandomBrightness(p=1),
                albu.RandomGamma(p=1),
            ],
            p=0.9,
        ),

        albu.OneOf(
            [
                albu.IAASharpen(p=1),
                albu.Blur(blur_limit=3, p=1),
                albu.MotionBlur(blur_limit=3, p=1),
            ],
            p=0.9,
        ),

        albu.OneOf(
            [
                albu.RandomContrast(p=1),
                albu.HueSaturationValue(p=1),
            ],
            p=0.9,
        ),
    ]
    return albu.Compose(train_transform)


def get_validation_augmentation():
    """调整图像使得图片的分辨率长宽能被32整除"""
    test_transform = [
        albu.PadIfNeeded(384, 480)
    ]
    return albu.Compose(test_transform)


def to_tensor(x, **kwargs):
    return x.transpose(2, 0, 1).astype('float32')


def get_preprocessing(preprocessing_fn):
    """进行图像预处理操作

    Args:
        preprocessing_fn (callbale): 数据规范化的函数
            (针对每种预训练的神经网络)
    Return:
        transform: albumentations.Compose
    """

    _transform = [
        albu.Lambda(image=preprocessing_fn),
        albu.Lambda(image=to_tensor, mask=to_tensor),
    ]
    return albu.Compose(_transform)


#$# 创建模型并训练
# ---------------------------------------------------------------
if __name__ == '__main__':
	
	# 数据集所在的目录
    DATA_DIR = './data/CamVid/'

    # 如果目录下不存在CamVid数据集,则克隆下载
    if not os.path.exists(DATA_DIR):
        print('Loading data...')
        os.system('git clone https://github.com/alexgkendall/SegNet-Tutorial ./data')
        print('Done!')

    # 训练集
    x_train_dir = os.path.join(DATA_DIR, 'train')
    y_train_dir = os.path.join(DATA_DIR, 'trainannot')

    # 验证集
    x_valid_dir = os.path.join(DATA_DIR, 'val')
    y_valid_dir = os.path.join(DATA_DIR, 'valannot')

    ENCODER = 'se_resnext50_32x4d'
    ENCODER_WEIGHTS = 'imagenet'
    CLASSES = ['car']
    ACTIVATION = 'sigmoid' # could be None for logits or 'softmax2d' for multiclass segmentation
    DEVICE = 'cuda'

    # 用预训练编码器建立分割模型
    # 使用FPN模型
    # model = smp.FPN(
    #     encoder_name=ENCODER,
    #     encoder_weights=ENCODER_WEIGHTS,
    #     classes=len(CLASSES),
    #     activation=ACTIVATION,
    # )
    # 使用unet++模型
    model = smp.UnetPlusPlus(
        encoder_name=ENCODER,
        encoder_weights=ENCODER_WEIGHTS,
        classes=len(CLASSES),
        activation=ACTIVATION,
    )

    preprocessing_fn = smp.encoders.get_preprocessing_fn(ENCODER, ENCODER_WEIGHTS)
	
	# 加载训练数据集
    train_dataset = Dataset(
        x_train_dir,
        y_train_dir,
        augmentation=get_training_augmentation(),
        preprocessing=get_preprocessing(preprocessing_fn),
        classes=CLASSES,
    )

	# 加载验证数据集
    valid_dataset = Dataset(
        x_valid_dir,
        y_valid_dir,
        augmentation=get_validation_augmentation(),
        preprocessing=get_preprocessing(preprocessing_fn),
        classes=CLASSES,
    )

	# 需根据显卡的性能进行设置,batch_size为每次迭代中一次训练的图片数,num_workers为训练时的工作进程数,如果显卡不太行或者显存空间不够,将batch_size调低并将num_workers调为0
    train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True, num_workers=0)
    valid_loader = DataLoader(valid_dataset, batch_size=1, shuffle=False, num_workers=0)

    loss = smp.utils.losses.DiceLoss()
    metrics = [
        smp.utils.metrics.IoU(threshold=0.5),
    ]

    optimizer = torch.optim.Adam([
        dict(params=model.parameters(), lr=0.0001),
    ])

    # 创建一个简单的循环,用于迭代数据样本
    train_epoch = smp.utils.train.TrainEpoch(
        model,
        loss=loss,
        metrics=metrics,
        optimizer=optimizer,
        device=DEVICE,
        verbose=True,
    )

    valid_epoch = smp.utils.train.ValidEpoch(
        model,
        loss=loss,
        metrics=metrics,
        device=DEVICE,
        verbose=True,
    )

    # 进行40轮次迭代的模型训练
    max_score = 0

    for i in range(0, 40):

        print('\nEpoch: {}'.format(i))
        train_logs = train_epoch.run(train_loader)
        valid_logs = valid_epoch.run(valid_loader)
		
		# 每次迭代保存下训练最好的模型
        if max_score < valid_logs['iou_score']:
            max_score = valid_logs['iou_score']
            torch.save(model, './best_model.pth')
            print('Model saved!')

        if i == 25:
            optimizer.param_groups[0]['lr'] = 1e-5
            print('Decrease decoder learning rate to 1e-5!')

这里我们只训练 ‘car’ 的类别进行图像分割,设置好数据集后开始训练,训练场景如图所示。

因为数据集量并不大,40轮次的训练很快就结束了,此时在文件夹中出现一个 best_model.pth 的文件,即我们迭代40轮次训练后得到的最好的模型。

3.2 UNet++测试

训练好后我们开始测试UNet++所训练的最好的那个模型,测试代码如下:

import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

import numpy as np
import cv2
import matplotlib.pyplot as plt
import albumentations as albu
import torch
import segmentation_models_pytorch as smp
from torch.utils.data import Dataset as BaseDataset


# ---------------------------------------------------------------
### 加载数据

class Dataset(BaseDataset):
    """CamVid数据集。进行图像读取,图像增强增强和图像预处理.

    Args:
        images_dir (str): 图像文件夹所在路径
        masks_dir (str): 图像分割的标签图像所在路径
        class_values (list): 用于图像分割的所有类别数
        augmentation (albumentations.Compose): 数据传输管道
        preprocessing (albumentations.Compose): 数据预处理
    """
    # CamVid数据集中用于图像分割的所有标签类别
    CLASSES = ['sky', 'building', 'pole', 'road', 'pavement',
               'tree', 'signsymbol', 'fence', 'car',
               'pedestrian', 'bicyclist', 'unlabelled']

    def __init__(
            self,
            images_dir,
            masks_dir,
            classes=None,
            augmentation=None,
            preprocessing=None,
    ):
        self.ids = os.listdir(images_dir)
        self.images_fps = [os.path.join(images_dir, image_id) for image_id in self.ids]
        self.masks_fps = [os.path.join(masks_dir, image_id) for image_id in self.ids]

        # convert str names to class values on masks
        self.class_values = [self.CLASSES.index(cls.lower()) for cls in classes]

        self.augmentation = augmentation
        self.preprocessing = preprocessing

    def __getitem__(self, i):

        # read data
        image = cv2.imread(self.images_fps[i])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(self.masks_fps[i], 0)

        # 从标签中提取特定的类别 (e.g. cars)
        masks = [(mask == v) for v in self.class_values]
        mask = np.stack(masks, axis=-1).astype('float')

        # 图像增强应用
        if self.augmentation:
            sample = self.augmentation(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']

        # 图像预处理应用
        if self.preprocessing:
            sample = self.preprocessing(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']

        return image, mask

    def __len__(self):
        return len(self.ids)


# ---------------------------------------------------------------
### 图像增强

def get_validation_augmentation():
    """调整图像使得图片的分辨率长宽能被32整除"""
    test_transform = [
        albu.PadIfNeeded(384, 480)
    ]
    return albu.Compose(test_transform)


def to_tensor(x, **kwargs):
    return x.transpose(2, 0, 1).astype('float32')


def get_preprocessing(preprocessing_fn):
    """进行图像预处理操作

    Args:
        preprocessing_fn (callbale): 数据规范化的函数
            (针对每种预训练的神经网络)
    Return:
        transform: albumentations.Compose
    """

    _transform = [
        albu.Lambda(image=preprocessing_fn),
        albu.Lambda(image=to_tensor, mask=to_tensor),
    ]
    return albu.Compose(_transform)


# 图像分割结果的可视化展示
def visualize(**images):
    """PLot images in one row."""
    n = len(images)
    plt.figure(figsize=(16, 5))
    for i, (name, image) in enumerate(images.items()):
        plt.subplot(1, n, i + 1)
        plt.xticks([])
        plt.yticks([])
        plt.title(' '.join(name.split('_')).title())
        plt.imshow(image)
    plt.show()


# ---------------------------------------------------------------
if __name__ == '__main__':

    DATA_DIR = './data/CamVid/'

    # 测试集
    x_test_dir = os.path.join(DATA_DIR, 'test')
    y_test_dir = os.path.join(DATA_DIR, 'testannot')

    ENCODER = 'se_resnext50_32x4d'
    ENCODER_WEIGHTS = 'imagenet'
    CLASSES = ['car']
    ACTIVATION = 'sigmoid' # could be None for logits or 'softmax2d' for multiclass segmentation
    DEVICE = 'cuda'

    preprocessing_fn = smp.encoders.get_preprocessing_fn(ENCODER, ENCODER_WEIGHTS)
    # ---------------------------------------------------------------
    #$# 测试训练出来的最佳模型

    # 加载最佳模型
    best_model = torch.load('./best_model.pth')

    # 创建测试数据集
    test_dataset = Dataset(
        x_test_dir,
        y_test_dir,
        augmentation=get_validation_augmentation(),
        preprocessing=get_preprocessing(preprocessing_fn),
        classes=CLASSES,
    )

    # ---------------------------------------------------------------
    #$# 图像分割结果可视化展示
    # 对没有进行图像处理转化的测试集进行图像可视化展示
    test_dataset_vis = Dataset(
        x_test_dir, y_test_dir,
        classes=CLASSES,
    )
	# 从测试集中随机挑选3张图片进行测试
    for i in range(3):
        n = np.random.choice(len(test_dataset))

        image_vis = test_dataset_vis[n][0].astype('uint8')
        image, gt_mask = test_dataset[n]

        gt_mask = gt_mask.squeeze()

        x_tensor = torch.from_numpy(image).to(DEVICE).unsqueeze(0)
        pr_mask = best_model.predict(x_tensor)
        pr_mask = (pr_mask.squeeze().cpu().numpy().round())

        visualize(
            image=image_vis,
            ground_truth_mask=gt_mask,
            predicted_mask=pr_mask
        )

这里我们也只针对 ‘car’ 的类别进行图像分割测试,运行代码后测试场景如图所示。左边为原图,中间为标注的图像分割场景,右边为测试的图像分歌结果,可以看出测试的结果和标注的图像分割场景还是很相似的,说明训练结果还不错。

3.3 UNet++图像分割检测

通过测试后,我们可以使用我们训练好后UNet++模型,进行图像分割的实际操作。检测代码如下:

import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

import numpy as np
import cv2
import matplotlib.pyplot as plt
import albumentations as albu
import torch
import segmentation_models_pytorch as smp
from torch.utils.data import Dataset as BaseDataset
import imageio


# ---------------------------------------------------------------
### Dataloader

class Dataset(BaseDataset):
    """CamVid数据集。进行图像读取,图像增强增强和图像预处理.

    Args:
        images_dir (str): 图像文件夹所在路径
        masks_dir (str): 图像分割的标签图像所在路径
        class_values (list): 用于图像分割的所有类别数
        augmentation (albumentations.Compose): 数据传输管道
        preprocessing (albumentations.Compose): 数据预处理
    """
    # CamVid数据集中用于图像分割的所有标签类别
    CLASSES = ['sky', 'building', 'pole', 'road', 'pavement',
               'tree', 'signsymbol', 'fence', 'car',
               'pedestrian', 'bicyclist', 'unlabelled']

    def __init__(
            self,
            images_dir,
            # masks_dir,
            classes=None,
            augmentation=None,
            preprocessing=None,
    ):
        self.ids = os.listdir(images_dir)
        self.images_fps = [os.path.join(images_dir, image_id) for image_id in self.ids]

        # convert str names to class values on masks
        self.class_values = [self.CLASSES.index(cls.lower()) for cls in classes]

        self.augmentation = augmentation
        self.preprocessing = preprocessing

    def __getitem__(self, i):

        # read data
        image = cv2.imread(self.images_fps[i])
        image = cv2.resize(image, (480, 384))   # 改变图片分辨率
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # 图像增强应用
        if self.augmentation:
            sample = self.augmentation(image=image)
            image = sample['image']

        # 图像预处理应用
        if self.preprocessing:
            sample = self.preprocessing(image=image)
            image = sample['image']

        return image

    def __len__(self):
        return len(self.ids)

# ---------------------------------------------------------------

def get_validation_augmentation():
    """调整图像使得图片的分辨率长宽能被32整除"""
    test_transform = [
        albu.PadIfNeeded(384, 480)
    ]
    return albu.Compose(test_transform)


def to_tensor(x, **kwargs):
    return x.transpose(2, 0, 1).astype('float32')


def get_preprocessing(preprocessing_fn):
    """进行图像预处理操作

    Args:
        preprocessing_fn (callbale): 数据规范化的函数
            (针对每种预训练的神经网络)
    Return:
        transform: albumentations.Compose
    """

    _transform = [
        albu.Lambda(image=preprocessing_fn),
        albu.Lambda(image=to_tensor),
    ]
    return albu.Compose(_transform)


# 图像分割结果的可视化展示
def visualize(**images):
    """PLot images in one row."""
    n = len(images)
    plt.figure(figsize=(16, 5))
    for i, (name, image) in enumerate(images.items()):
        plt.subplot(1, n, i + 1)
        plt.xticks([])
        plt.yticks([])
        plt.title(' '.join(name.split('_')).title())
        plt.imshow(image)
    plt.show()


# ---------------------------------------------------------------
if __name__ == '__main__':

    DATA_DIR = './data/CamVid/'

    x_test_dir = os.path.join(DATA_DIR, 'abc')

    img_test = cv2.imread('data/CamVid/abc/car_test.jpg')
    height = img_test.shape[0]
    weight = img_test.shape[1]

    ENCODER = 'se_resnext50_32x4d'
    ENCODER_WEIGHTS = 'imagenet'
    CLASSES = ['car']
    ACTIVATION = 'sigmoid' # could be None for logits or 'softmax2d' for multiclass segmentation
    DEVICE = 'cuda'

    # 按照权重预训练的相同方法准备数据
    preprocessing_fn = smp.encoders.get_preprocessing_fn(ENCODER, ENCODER_WEIGHTS)

    # 加载最佳模型
    best_model = torch.load('./best_model.pth')

    # 创建检测数据集
    predict_dataset = Dataset(
        x_test_dir,
        augmentation=get_validation_augmentation(),
        preprocessing=get_preprocessing(preprocessing_fn),
        classes=CLASSES,
    )

    # 对检测图像进行图像分割并进行图像可视化展示
    predict_dataset_vis = Dataset(
        x_test_dir,
        classes=CLASSES,
    )

    for i in range(len(predict_dataset)):
        # 原始图像image_vis
        image_vis = predict_dataset_vis[i].astype('uint8')
        image = predict_dataset[i]

        # 通过图像分割得到的0-1图像pr_mask
        x_tensor = torch.from_numpy(image).to(DEVICE).unsqueeze(0)
        pr_mask = best_model.predict(x_tensor)
        pr_mask = (pr_mask.squeeze().cpu().numpy().round())
        print(pr_mask.shape)

        # 恢复图片原来的分辨率
        image_vis = cv2.resize(image_vis, (weight, height))
        pr_mask = cv2.resize(pr_mask, (weight, height))
        # 保存图像分割后的黑白结果图像
        imageio.imwrite('car_test_out.png', pr_mask)
        # 原始图像和图像分割结果的可视化展示
        visualize(
            image=image_vis,
            predicted_mask=pr_mask
        )

这里我采用了一张大桥上车辆的图片进行图像分割检测,检测结果如图所示:

这里我们可以看出只检测出了两辆车,说明我们更换检测场景后,检测的结果并不是很令人满意,但整体思路是没有问题的。如需应用在实际场景下,还需针对实际场景的数据集进行相应的训练。

好了,到底为止,使用segmentation_models_pytorch库进行UNet++图像分割模型训练的使用方法就到此结束,是不是很easy,那还不赶快去GitHub给这位俄罗斯大神点个星!

来源:姜饼饼

物联沃分享整理
物联沃-IOTWORD物联网 » PyTorch图像分割模型——segmentation_models_pytorch库的使用

发表评论