DETR代码学习笔记(一)

       

按照训练流程首先介绍backbone以及数据进入encoder之前的部分

        当训练时,使用torch.manual_seed(seed)函数,只要网络参数不变的情况下,网络的参数初始化每次都是固定的;如果没有设置,每次训练时的初始化都是随机的,导致结果不确定。

        例如:要训练自己的数据集通常需要对num_classes进行设置。(其中num_classes的设置根据自己数据集类别数量+1,也就是说,假设coco的数据集中总共有90个类,此时的num_classes就是91)

        假设刚开始设置的num_classes=5,那么只要训练过程中网络的参数不变,那么网络的初始化参数都是一样的;如果下次训练时num_classes=6,最直观的变化就是数据集中图像的读取顺序发生了变化,并且由于训练时有对图像进行随机拉伸,也就导致参数变化后,同一张图像在上一次训练时的尺寸和当前训练尺寸不一。

        backbone调用的是torchvision中定义的resnet50,这里的backbone也就是送入transformer之前用来提取图像特征图的骨架,所以张量在经过resnet50的卷积后得到的特征图通道数由原来的3通道变为2048,W*H = W/32 * H/32,具体来说,假设一开始输入的张量是由batch size为2,长宽都为768组成的3通道图像,即[b, c, h, w] = [2,3,768,768],经过resnet50后,shape变为[2,2048,24,24]。

        具体的代码如下:

class ResNet(nn.Module):

    def __init__(
        self,
        block: Type[Union[BasicBlock, Bottleneck]],
        layers: List[int],
        num_classes: int = 1000,
        zero_init_residual: bool = False,
        groups: int = 1,
        width_per_group: int = 64,
        replace_stride_with_dilation: Optional[List[bool]] = None,
        norm_layer: Optional[Callable[..., nn.Module]] = None
    ) -> None:
        super(ResNet, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        self._norm_layer = norm_layer

        self.inplanes = 64
        self.dilation = 1
        if replace_stride_with_dilation is None:
            # each element in the tuple indicates if we should replace
            # the 2x2 stride with a dilated convolution instead
            replace_stride_with_dilation = [False, False, False]
        if len(replace_stride_with_dilation) != 3:
            raise ValueError("replace_stride_with_dilation should be None "
                             "or a 3-element tuple, got {}".format(replace_stride_with_dilation))
        self.groups = groups
        self.base_width = width_per_group

        # 输入的W * H = W / 2 * H / 2
        # 特征图分辨率降低为1/2,通道数从3升为64
        self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3,
                               bias=False)
        self.bn1 = norm_layer(self.inplanes)
        self.relu = nn.ReLU(inplace=True)
        
        # W * H = W / 2 * H / 2
        # 特征图分辨率降低为1/4,通道数仍然为64
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        # stride为1,不改变分辨率,依然为1/4,通道数从64升为256
        self.layer1 = self._make_layer(block, 64, layers[0])
        
        # W * H = W / 2 * H / 2
        # stride为2,特征图分辨率降低为1/8,通道数从256升为512
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2,
                                       dilate=replace_stride_with_dilation[0])
        
        # W * H = W / 2 * H / 2
        # stride为2,特征图分辨率降低为1/16,通道数从512升为1024
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2,
                                       dilate=replace_stride_with_dilation[1])
        
        # W * H = W / 2 * H / 2
        # stride为2,特征图分辨率降低为1/32,通道数从512升为2048
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2,
                                       dilate=replace_stride_with_dilation[2])
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

        # Zero-initialize the last BN in each residual branch,
        # so that the residual branch starts with zeros, and each residual block behaves like an identity.
        # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, Bottleneck):
                    nn.init.constant_(m.bn3.weight, 0)  # type: ignore[arg-type]
                elif isinstance(m, BasicBlock):
                    nn.init.constant_(m.bn2.weight, 0)  # type: ignore[arg-type]

        还是假设输入为[b, c, h, w] = [2,3,768,768]的张量(没有特殊声明的话后面提到的张量也是基于[2,3,768,768]的计算得来),通过resnet50得到[2,2048,24,24]的张量(这个张量也称为Feature Map)后需要对原始输入中的mask进行相应的reshape,mask其实是在dataloader生成时,原始输入中原始图像位置的映射。

        怎么解释这个“原始输入中原始图像位置的映射”呢?

        由于生成数据时对数据集中图像进行随机load,再加上对图像进行随机裁剪,所以同一batch的数据尺寸存在差异,但是同一batch输入resnet的大小需要保持一致,就需要对图像进行padding(全0)操作以保证同一batch的尺寸相同。具体来说就是找到该batch下最大的W和最大的H,然后batch下所有的图像根据这个最大的W*H进行padding。

        而mask就是为了记录未padding前的原始图像在padding后的图像中的位置。举例来说就是假设batch为2,其中一张图像的w*h=768*768,另一张图像的w*h为576*580,这个最大的W和H就是768。生成大小为[768,786]全0的张量,而较小的图像填充在这全0张量的左上角,也就是说张量中[576:768]以及[580:768]的部分都为0,反应在mask上就是[0:580,0:576]的部分为False,表示未被padding部分,[576:768]以及[580:768]的部分为True,表示被padding部分。效果如下图:

        具体操作见如下代码:

def nested_tensor_from_tensor_list(tensor_list: List[Tensor]):
    # TODO make this more general
    if tensor_list[0].ndim == 3:
        if torchvision._is_tracing():
            # nested_tensor_from_tensor_list() does not export well to ONNX
            # call _onnx_nested_tensor_from_tensor_list() instead
            return _onnx_nested_tensor_from_tensor_list(tensor_list)

        # TODO make it support different-sized images
        max_size = _max_by_axis([list(img.shape) for img in tensor_list])
        # min_size = tuple(min(s) for s in zip(*[img.shape for img in tensor_list]))
        batch_shape = [len(tensor_list)] + max_size
        b, c, h, w = batch_shape
        dtype = tensor_list[0].dtype
        device = tensor_list[0].device
        tensor = torch.zeros(batch_shape, dtype=dtype, device=device)
        mask = torch.ones((b, h, w), dtype=torch.bool, device=device)
        for img, pad_img, m in zip(tensor_list, tensor, mask):
            # 根据同一batch中最大的W和H对所有余图像进行padding
            pad_img[: img.shape[0], : img.shape[1], : img.shape[2]].copy_(img)
            # 有padding的部分设为True
            m[: img.shape[1], :img.shape[2]] = False
    else:
        raise ValueError('not supported')
    return NestedTensor(tensor, mask)

        mask按照Feature Map的h和w进行reshape,即原始输入中的mask为[2,768,768],将其shape变为[2,24,24],最终输出的out为{mask,[2,24,24],tensor_list,[2,2048,24,24]}。得到out之后,就需要根据Transformer所需要的数据结构,将out转化为能够被Transformer Encoder处理的序列化数据。如位置编码,降维,shape转换。

        首先是位置编码,也就是position encoding(PE),这里的位置编码是基于out中的mask,也就是[2,24,24]进行的。算法类似于最初在《Attention is all you need》中提出的PE,这里只是将位置编码作用于图像中。最后输出编码后的位置信息,shape为[2,256,24,24]。

class BackboneBase(nn.Module):
    def __init__(self, backbone: nn.Module, train_backbone: bool, num_channels: int, return_interm_layers: bool):
        super().__init__()
        for name, parameter in backbone.named_parameters():
            if not train_backbone or 'layer2' not in name and 'layer3' not in name and 'layer4' not in name:
                parameter.requires_grad_(False)
        if return_interm_layers:
            return_layers = {"layer1": "0", "layer2": "1", "layer3": "2", "layer4": "3"}
        else:
            return_layers = {'layer4': "0"}
        self.body = IntermediateLayerGetter(backbone, return_layers=return_layers)
        self.num_channels = num_channels

    def forward(self, tensor_list: NestedTensor):
        # 通过self.body即resnet50获取最后一层卷积得到的张量[2,3,768,768]->[2,2048,24,24]
        xs = self.body(tensor_list.tensors)
        out: Dict[str, NestedTensor] = {}
        for name, x in xs.items():
            # 获取原始输入的mask[2,768,768]
            m = tensor_list.mask
            assert m is not None
            # 根据resnet50输出的wh维度进行reshape即[2,768,768]->[2,24,24]
            mask = F.interpolate(m[None].float(), size=x.shape[-2:]).to(torch.bool)[0]
            out[name] = NestedTensor(x, mask)
        # 此时的输出为{mask,[2,24,24],tensor_list,[2,2048,24,24]}
        return out


class Backbone(BackboneBase):
    """ResNet backbone with frozen BatchNorm."""
    def __init__(self, name: str,
                 train_backbone: bool,
                 return_interm_layers: bool,
                 dilation: bool):
        backbone = getattr(torchvision.models, name)(
            replace_stride_with_dilation=[False, False, dilation],
            pretrained=is_main_process(), norm_layer=FrozenBatchNorm2d)
        num_channels = 512 if name in ('resnet18', 'resnet34') else 2048
        super().__init__(backbone, train_backbone, num_channels, return_interm_layers)


class Joiner(nn.Sequential):
    def __init__(self, backbone, position_embedding):
        super().__init__(backbone, position_embedding)

    def forward(self, tensor_list: NestedTensor):
        # xs为{mask,[2,24,24],tensor_list,[2,2048,24,24]},self[0]即为Backbone中resnet50输出结果
        xs = self[0](tensor_list)
        out: List[NestedTensor] = []
        pos = []
        for name, x in xs.items():
            out.append(x)
            # position encoding
            # 位置编码使用的是PositionEmbeddingSine()
            pos.append(self[1](x).to(x.tensors.dtype))
        # 其中out为{mask,[2,24,24],tensor_list,[2,2048,24,24]},pos为根据mask得到的PE,shape为[2,256,24,24]
        return out, pos

      再者是降维,在数据送入transformer之前,即:

hs = self.transformer(self.input_proj(src), mask, self.query_embed.weight, pos[-1])[0]

需要将backbone网络输出的Feature Map使用1×1的线性层降维,得到与mask相同的channel,即[2,256,24,24]

class DETR(nn.Module):
    """ This is the DETR module that performs object detection """
    def __init__(self, backbone, transformer, num_classes, num_queries, aux_loss=False):
        """ Initializes the model.
        Parameters:
            backbone: torch module of the backbone to be used. See backbone.py
            transformer: torch module of the transformer architecture. See transformer.py
            num_classes: number of object classes
            num_queries: number of object queries, ie detection slot. This is the maximal number of objects
                         DETR can detect in a single image. For COCO, we recommend 100 queries.
            aux_loss: True if auxiliary decoding losses (loss at each decoder layer) are to be used.
        """
        super().__init__()
        self.num_queries = num_queries
        self.transformer = transformer
        hidden_dim = transformer.d_model
        self.class_embed = nn.Linear(hidden_dim, num_classes + 1)
        self.bbox_embed = MLP(hidden_dim, hidden_dim, 4, 3)
        self.query_embed = nn.Embedding(num_queries, hidden_dim)
        self.input_proj = nn.Conv2d(backbone.num_channels, hidden_dim, kernel_size=1)
        self.backbone = backbone
        self.aux_loss = aux_loss

    def forward(self, samples: NestedTensor):
        """ The forward expects a NestedTensor, which consists of:
               - samples.tensor: batched images, of shape [batch_size x 3 x H x W]
               - samples.mask: a binary mask of shape [batch_size x H x W], containing 1 on padded pixels

            It returns a dict with the following elements:
               - "pred_logits": the classification logits (including no-object) for all queries.
                                Shape= [batch_size x num_queries x (num_classes + 1)]
               - "pred_boxes": The normalized boxes coordinates for all queries, represented as
                               (center_x, center_y, height, width). These values are normalized in [0, 1],
                               relative to the size of each individual image (disregarding possible padding).
                               See PostProcess for information on how to retrieve the unnormalized bounding box.
               - "aux_outputs": Optional, only returned when auxilary losses are activated. It is a list of
                                dictionnaries containing the two above keys for each decoder layer.
        """
        if isinstance(samples, (list, torch.Tensor)):
            samples = nested_tensor_from_tensor_list(samples)
        # features:{mask,[2,24,24],tensor_list,[2,2048,24,24]},pos:[2,256,24,24]
        features, pos = self.backbone(samples)
        # src:[2,2048,24,24], mask:[2,24,24]
        src, mask = features[-1].decompose()
        assert mask is not None
        # 将数据送入transformer
        # self.input_proj() 将src降维:[2,2048,24,24] -> [2,256,24,24]
        # query_embed由nn.Embedding初始化,shape[100,256]
        hs = self.transformer(self.input_proj(src), mask, self.query_embed.weight, pos[-1])[0]

        outputs_class = self.class_embed(hs)
        outputs_coord = self.bbox_embed(hs).sigmoid()
        out = {'pred_logits': outputs_class[-1], 'pred_boxes': outputs_coord[-1]}
        if self.aux_loss:
            out['aux_outputs'] = self._set_aux_loss(outputs_class, outputs_coord)
        return out

        最后是reshape,将降维后的H和W维度合并,然后进行维度转化[NxCxHxW]->[HWxNxC],即[2,256,24,24]->[576,2,256],此时输入transformer的Feature Map的shape转换为[576,2,256],同时由mask生成的位置编码(pos)维度也转化为[576,2,256]。词嵌入向量由[num_embeddings, embedding_dim]->[num_embeddings, N, embedding_dim],即[100,256]->[100,2,256]( 对于torch.nn.Embedding的理解可以看这篇文章),mask的也将H和W维度合并,shape由[2,24,24]转化为[2,576]。 

class Transformer(nn.Module):

    def __init__(self, d_model=512, nhead=8, num_encoder_layers=6,
                 num_decoder_layers=6, dim_feedforward=2048, dropout=0.1,
                 activation="relu", normalize_before=False,
                 return_intermediate_dec=False):
        super().__init__()

        encoder_layer = TransformerEncoderLayer(d_model, nhead, dim_feedforward,
                                                dropout, activation, normalize_before)
        encoder_norm = nn.LayerNorm(d_model) if normalize_before else None
        self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers, encoder_norm)

        decoder_layer = TransformerDecoderLayer(d_model, nhead, dim_feedforward,
                                                dropout, activation, normalize_before)
        decoder_norm = nn.LayerNorm(d_model)
        self.decoder = TransformerDecoder(decoder_layer, num_decoder_layers, decoder_norm,
                                          return_intermediate=return_intermediate_dec)

        self._reset_parameters()

        self.d_model = d_model
        self.nhead = nhead

    def _reset_parameters(self):
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)

    def forward(self, src, mask, query_embed, pos_embed):
        # flatten NxCxHxW to HWxNxC
        bs, c, h, w = src.shape
        # 将降维后的src转换维度[NxCxHxW]->[HWxNxC],即[2,256,24,24]->[576,2,256]
        src = src.flatten(2).permute(2, 0, 1)
        # 将位置编码转换维度[NxCxHxW]->[HWxNxC],即[2,256,24,24]->[576,2,256]
        pos_embed = pos_embed.flatten(2).permute(2, 0, 1)
        # 词嵌入向量由[num_embeddings, embedding_dim]->[num_embeddings, N, embedding_dim]
        # 即[100,256]->[100,2,256]
        query_embed = query_embed.unsqueeze(1).repeat(1, bs, 1)
        # 将mask[2,24,24]->[2,576]
        mask = mask.flatten(1)

        tgt = torch.zeros_like(query_embed)
        memory = self.encoder(src, src_key_padding_mask=mask, pos=pos_embed)
        hs = self.decoder(tgt, memory, memory_key_padding_mask=mask,
                          pos=pos_embed, query_pos=query_embed)
        return hs.transpose(1, 2), memory.permute(1, 2, 0).view(bs, c, h, w)

        到这里大致把输入transformer之前的数据处理过程理清,接下来是transformer部分,不了解transformer的可以看一下我的另一篇文章transformer学习笔记。与NLP中的transformer有一定的区别,具体可见DETR代码学习笔记(二)

来源:athrunsunny

物联沃分享整理
物联沃-IOTWORD物联网 » DETR代码学习笔记(一)

发表评论