语义分割系列5-Pspnet(pytorch实现)

Pspnet全名Pyramid Scene Parsing Network,论文地址:Pyramid Scene Parsing Network

论文名就是《Pyramid Scene Parsing Network》。

该模型提出是为了解决场景分析问题。针对FCN网络在场景分析数据集上存在的问题,Pspnet提出一系列改进方案,以提升场景分析中对于相似颜色、形状的物体的检测精度。

图1 ADE20k场景分析

作者在ADE20K数据集上进行实验时,主要发现有如下3个问题:

  1. 错误匹配,FCN模型把水里的船预测成汽车,但是汽车是不会在水上的。因此,作者认为FCN缺乏收集上下文能力,导致了分类错误。
  2. 作者发现相似的标签会导致一些奇怪的错误,比如earth和field,mountain和hill,wall,house,building和skyscraper。FCN模型会出现混淆。
  3. 第三是小目标的丢失问题,像一些路灯、信号牌这种小物体,很难被FCN所发现。相反的,一些特别大的物体预测中,在感受野不够大的情况下,往往会丢失一部分信息,导致预测不连续。

为了解决这些问题,作者提出了Pyramid Pooling Module。

Pyramid Pooling Module

作者在文章中提出了Pyramid Pooling Module(池化金字塔结构)这一模块。

作者提到,在深层网络中,感受野的大小大致上体现了模型能获得的上下文新消息。尽管在理论上Resnet的感受野已经大于图像尺寸,但是实际上会小得多。这就导致了很多网络不能充分的将上下文信息结合起来,于是作者就提出了一种全局的先验方法-全局平均池化。

作者在PPM模块中并联了四个不同大小的全局池化层,将原始的feature map池化生成不同级别的特征图,经过卷积和上采样恢复到原始大小。这种操作聚合了多尺度的图像特征,生成了一个“hierarchical global prior”,融合了不同尺度和不同子区域之间的信息。最后,这个先验信息再和原始特征图进行相加,输入到最后的卷积模块完成预测。

图2 Pspnet

Pspnet的核心就是PPM模块。其网络架构十分简单,backbone为resnet网络,将原始图像下采样8倍成特征图,特征图输入到PPM模块,并与其输出相加,最后经过卷积和8倍双线性差值上采样得到结果(图2)。

论文复现

本文主要在CamVid数据集上进行复现,数据集可以在另一篇博客中找到CamVid数据集的创建和使用

Resnet

这里调用了pytorch官方写的ResNet50,替换最后两个layer为dialation模式,只采用8倍下采样。

from torchvision.models import resnet50, resnet101
from torchvision.models._utils import IntermediateLayerGetter
import torch
import torch.nn as nn

backbone=IntermediateLayerGetter(
            resnet50(pretrained=False, replace_stride_with_dilation=[False, True, True]),
            return_layers={'layer4': 'stage4'}
        )


x = torch.randn(3, 3, 224, 224).cpu()
result = backbone(x)
for k, v in result.items():
    print(k, v.shape)

pspnet

class PPM(nn.ModuleList):
    def __init__(self, pool_sizes, in_channels, out_channels):
        super(PPM, self).__init__()
        self.pool_sizes = pool_sizes
        self.in_channels = in_channels
        self.out_channels = out_channels
        
        for pool_size in pool_sizes:
            self.append(
                nn.Sequential(
                    nn.AdaptiveMaxPool2d(pool_size),
                    nn.Conv2d(self.in_channels, self.out_channels, kernel_size=1),
                )
            )
            
    def forward(self, x):
        out_puts = []
        for ppm in self:
            ppm_out = nn.functional.interpolate(ppm(x),size=(x.size(2), x.size(3)),mode='bilinear', align_corners=True)
            out_puts.append(ppm_out)
        return out_puts
 
    
class PSPHEAD(nn.Module):
    def __init__(self, in_channels, out_channels,pool_sizes = [1, 2, 3, 6],num_classes=31):
        super(PSPHEAD, self).__init__()
        self.pool_sizes = pool_sizes
        self.num_classes = num_classes
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.psp_modules = PPM(self.pool_sizes, self.in_channels, self.out_channels)
        self.final = nn.Sequential(
            nn.Conv2d(self.in_channels + len(self.pool_sizes)*self.out_channels, self.out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(self.out_channels),
            nn.ReLU(),
        )
        
    def forward(self, x):
        out = self.psp_modules(x)
        out.append(x)
        out = torch.cat(out, 1)
        out = self.final(out)
        return out
 
class Pspnet(nn.Module):
    def __init__(self, num_classes, pretrained_path=None):
        super(Pspnet, self).__init__()
        self.num_classes = num_classes
        self.backbone = IntermediateLayerGetter(
            resnet50(pretrained=False, replace_stride_with_dilation=[False, True, True]),
            return_layers={'layer4': 'stage4'}
        )

        self.decoder = PSPHEAD(in_channels=2048, out_channels=512, pool_sizes = [1, 2, 3, 6], num_classes=self.num_classes)
        self.cls_seg = nn.Sequential(
            nn.Conv2d(512, self.num_classes, kernel_size=3, padding=1),
        )
        
    def forward(self, x):
        _, _, h, w = x.size()
        feats = self.backbone(x) 
        x = self.decoder(feats["stage4"])
        x = nn.functional.interpolate(x, size=(h, w),mode='bilinear', align_corners=True)
        x = self.cls_seg(x)
        return x


if __name__ == "__main__":
    model = Pspnet(num_classes=33)
    model = model.cuda()
    a = torch.ones([2, 3, 224, 224])
    a = a.cuda()
    print(model(a).shape)

数据集构建

# 导入库
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch import optim
from torch.utils.data import Dataset, DataLoader, random_split
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")
import os.path as osp
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

torch.manual_seed(17)
# 自定义数据集CamVidDataset
class CamVidDataset(torch.utils.data.Dataset):
    """CamVid Dataset. Read images, apply augmentation and preprocessing transformations.
    
    Args:
        images_dir (str): path to images folder
        masks_dir (str): path to segmentation masks folder
        class_values (list): values of classes to extract from segmentation mask
        augmentation (albumentations.Compose): data transfromation pipeline 
            (e.g. flip, scale, etc.)
        preprocessing (albumentations.Compose): data preprocessing 
            (e.g. noralization, shape manipulation, etc.)
    """
    
    def __init__(self, images_dir, masks_dir):
        self.transform = A.Compose([
            A.Resize(448, 448),
            A.HorizontalFlip(),
            A.VerticalFlip(),
            A.Normalize(),
            ToTensorV2(),
        ]) 
        self.ids = os.listdir(images_dir)
        self.images_fps = [os.path.join(images_dir, image_id) for image_id in self.ids]
        self.masks_fps = [os.path.join(masks_dir, image_id) for image_id in self.ids]

    
    def __getitem__(self, i):
        # read data
        image = np.array(Image.open(self.images_fps[i]).convert('RGB'))
        mask = np.array( Image.open(self.masks_fps[i]).convert('RGB'))
        image = self.transform(image=image,mask=mask)
        
        return image['image'], image['mask'][:,:,0]
        
    def __len__(self):
        return len(self.ids)
    
    
# 设置数据集路径
DATA_DIR = r'dataset\camvid' # 根据自己的路径来设置
x_train_dir = os.path.join(DATA_DIR, 'train_images')
y_train_dir = os.path.join(DATA_DIR, 'train_labels')
x_valid_dir = os.path.join(DATA_DIR, 'valid_images')
y_valid_dir = os.path.join(DATA_DIR, 'valid_labels')
    
train_dataset = CamVidDataset(
    x_train_dir, 
    y_train_dir, 
)
val_dataset = CamVidDataset(
    x_valid_dir, 
    y_valid_dir, 
)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True,drop_last=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=True,drop_last=True)

模型训练

model = Pspnet(num_classes=33)
model = model.cuda()


from d2l import torch as d2l
from tqdm import tqdm
import pandas as pd
#损失函数选用多分类交叉熵损失函数
lossf = nn.CrossEntropyLoss()
#选用adam优化器来训练
optimizer = optim.SGD(model.parameters(),lr=0.1)
#训练50轮
epochs_num = 100
def train_ch13(net, train_iter, test_iter, loss, trainer, num_epochs,
               devices=d2l.try_all_gpus()):
    timer, num_batches = d2l.Timer(), len(train_iter)
    animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs], ylim=[0, 1],
                            legend=['train loss', 'train acc', 'test acc'])
    net = nn.DataParallel(net, device_ids=devices).to(devices[0])
    
    loss_list = []
    train_acc_list = []
    test_acc_list = []
    epochs_list = []
    
    for epoch in range(num_epochs):
        # Sum of training loss, sum of training accuracy, no. of examples,
        # no. of predictions
        metric = d2l.Accumulator(4)
        for i, (features, labels) in enumerate(train_iter):
            timer.start()
            l, acc = d2l.train_batch_ch13(
                net, features, labels.long(), loss, trainer, devices)
            metric.add(l, acc, labels.shape[0], labels.numel())
            timer.stop()
            if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
                animator.add(epoch + (i + 1) / num_batches,
                             (metric[0] / metric[2], metric[1] / metric[3],
                              None))
        test_acc = d2l.evaluate_accuracy_gpu(net, test_iter)
        animator.add(epoch + 1, (None, None, test_acc))
        print(f'loss {metric[0] / metric[2]:.3f}, train acc '
              f'{metric[1] / metric[3]:.3f}, test acc {test_acc:.3f}')
        print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec on '
              f'{str(devices)}')
        #---------保存训练数据---------------
        df = pd.DataFrame()
        loss_list.append(metric[0] / metric[2])
        train_acc_list.append(metric[1] / metric[3])
        test_acc_list.append(test_acc)
        epochs_list.append(epoch)
        df['epoch'] = epochs_list
        df['loss'] = loss_list
        df['train_acc'] = train_acc_list
        df['test_acc'] = test_acc_list
        df.to_excel("savefile/psp_voc2012.xlsx")
        #----------------保存模型-------------------
        if np.mod(epoch+1, 5) == 0:
            torch.save(model.state_dict(), f'checkpoints/psp_{epoch}.pth')
train_ch13(model, train_loader, val_loader, lossf, optimizer, epochs_num)

训练结果

来源:yumaomi

物联沃分享整理
物联沃-IOTWORD物联网 » 语义分割系列5-Pspnet(pytorch实现)

发表评论