基于PaddleOCR的DBNet多分类文本检测网络

目录

目的

模型网络结构对比

代码实现

1、数据集格式

2、配置文件调整

3、数据预处理

4、模型代码调整 

5、添加多分类loss

6、修改db_postprocess.py

7、修改train.py、eval.py、infer_det.py和export_model.py

完毕!!!

目的

之前一直思考如果DBnet文本检测网络能够加入多分类的话,就可以实现模型很小又能够区分类别的功能,在端侧部署的话就能达到非常高的精度和效率。在参考了大佬的pytorch版的DBnet多分类功能,在此实现Paddle版的DBnet多分类文本检测网络,注意此方式不适合多个分类有重叠的情况。

模型网络结构对比

修改前 vs 修改后:从图明显发现多出来一个分支用来判定分类的

       

代码实现

经过测试以下方式在官方release/2.5分支中同样好使,本git中代码版本较低可做参考。

git代码地址:GitHub – yangy996/PaddleOCR

1、数据集格式

新增label_list.txt文件

调整数据集中的 “transcription”对应的值,为上图中的label_name

2、配置文件调整

Global:
  ...
  label_list: "../../2.4/train_data/sfz/label_list.txt" #新增一个分类文件
  num_classes: 8    # 新增一个分类数量

...

Train:
  dataset:
    ...
    transforms:
      ...
      - KeepKeys:
          keep_keys: [ 'image', 'threshold_map', 'threshold_mask', 'shrink_map', 'shrink_mask', 'class_mask' ] # 新增一个class_mask
    ...

...

3、数据预处理

将不同分类按照[1,2,3,4…]的样子进行填充,有三个地方需要调整

label_ops.py

class DetLabelEncode(object):
    # def __init__(self, **kwargs):
    #     pass
    def __init__(self, label_list, num_classes=1, **kwargs):
        self.num_classes = num_classes
        self.label_list = []
        if label_list is not None:
            if isinstance(label_list, str):
                with open(label_list, "r+", encoding="utf-8") as f:
                    for line in f.readlines():
                        self.label_list.append(line.replace("\n", ""))
            else:
                self.label_list = label_list

        if num_classes != len(self.label_list):
            assert "label_list长度与num_classes长度不符合"

    def __call__(self, data):
        label = data['label']
        label = json.loads(label)
        nBox = len(label)
        boxes, txts, txt_tags = [], [], []
        classes = []
        for bno in range(0, nBox):
            box = label[bno]['points']
            txt = label[bno]['transcription']
            boxes.append(box)
            txts.append(txt)
            if txt in ['*', '###']:
                txt_tags.append(True)
                if self.num_classes > 1:
                    classes.append(-2)
            else:
                txt_tags.append(False)
                if self.num_classes > 1:
                    classes.append(int(self.label_list.index(txt)))
        if len(boxes) == 0:
            return None
        boxes = self.expand_points_num(boxes)
        boxes = np.array(boxes, dtype=np.float32)
        txt_tags = np.array(txt_tags, dtype=np.bool)
        # classes = np.array(classes, dtype=np.int)
        classes = classes

        data['polys'] = boxes
        data['texts'] = txts
        data['ignore_tags'] = txt_tags
        if self.num_classes > 1:
            data['classes'] = classes
        return data
make_shrink_map.py

 random_crop_data.py

4、模型代码调整 

添加新分支,只需要调整head模块就可以了,det_db_head.py代码如下

class Head(nn.Layer):
    def __init__(self, in_channels, name_list, num_classes=1):
        super(Head, self).__init__()
        self.num_classes = num_classes

        ...

        self.conv3 = nn.Conv2DTranspose(
            in_channels=in_channels // 4,
            out_channels=num_classes,
            kernel_size=2,
            stride=2,
            weight_attr=ParamAttr(
                initializer=paddle.nn.initializer.KaimingUniform()),
            bias_attr=get_bias_attr(in_channels // 4), )

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv_bn1(x)
        x = self.conv2(x)
        x = self.conv_bn2(x)
        x = self.conv3(x)
        if self.num_classes == 1:
            x = F.sigmoid(x)
        return x


class DBHead(nn.Layer):
    def __init__(self, in_channels, num_classes=1, k=50, **kwargs):
        super(DBHead, self).__init__()
        self.k = k
        self.num_classes = num_classes

        ...

        if num_classes != 1:
            self.classes = Head(in_channels, binarize_name_list, num_classes=num_classes)
        else:
            self.classes = None

    def step_function(self, x, y):
        return paddle.reciprocal(1 + paddle.exp(-self.k * (x - y)))

    def forward(self, x, targets=None):
        shrink_maps = self.binarize(x)
        if not self.training:
            if self.num_classes == 1:
                return {'maps': shrink_maps}
            else:
                classes = paddle.argmax(self.classes(x), axis=1, keepdim=True, dtype='int32')
                return {'maps': shrink_maps, "classes": classes}

        threshold_maps = self.thresh(x)
        binary_maps = self.step_function(shrink_maps, threshold_maps)
        y = paddle.concat([shrink_maps, threshold_maps, binary_maps], axis=1)
        if self.num_classes == 1:
            return {'maps': y}
        else:
            return {'maps': y, "classes": self.classes(x)}

5、添加多分类loss

参考PaddleSeg代码,新增了一个CrossEntropyLoss方法

class CrossEntropyLoss(nn.Layer):

    def __init__(self,
                 weight=None,
                 ignore_index=255,
                 top_k_percent_pixels=1.0,
                 data_format='NCHW'):
        super(CrossEntropyLoss, self).__init__()
        self.ignore_index = ignore_index
        self.top_k_percent_pixels = top_k_percent_pixels
        self.EPS = 1e-8
        self.data_format = data_format
        if weight is not None:
            self.weight = paddle.to_tensor(weight, dtype='float32')
        else:
            self.weight = None

    def forward(self, logit, label, semantic_weights=None):
        channel_axis = 1 if self.data_format == 'NCHW' else -1
        if self.weight is not None and logit.shape[channel_axis] != len(
                self.weight):
            raise ValueError(
                'The number of weights = {} must be the same as the number of classes = {}.'
                    .format(len(self.weight), logit.shape[channel_axis]))

        if channel_axis == 1:
            logit = paddle.transpose(logit, [0, 2, 3, 1])
        label = label.astype('int64')
        # In F.cross_entropy, the ignore_index is invalid, which needs to be fixed.
        # When there is 255 in the label and paddle version <= 2.1.3, the cross_entropy OP will report an error, which is fixed in paddle develop version.
        loss = F.cross_entropy(
            logit,
            label,
            ignore_index=self.ignore_index,
            reduction='none',
            weight=self.weight)

        return self._post_process_loss(logit, label, semantic_weights, loss)

    def _post_process_loss(self, logit, label, semantic_weights, loss):
        mask = label != self.ignore_index
        mask = paddle.cast(mask, 'float32')
        label.stop_gradient = True
        mask.stop_gradient = True

        if loss.ndim > mask.ndim:
            loss = paddle.squeeze(loss, axis=-1)
        loss = loss * mask
        if semantic_weights is not None:
            loss = loss * semantic_weights

        if self.weight is not None:
            _one_hot = F.one_hot(label, logit.shape[-1])
            coef = paddle.sum(_one_hot * self.weight, axis=-1)
        else:
            coef = paddle.ones_like(label)

        if self.top_k_percent_pixels == 1.0:
            avg_loss = paddle.mean(loss) / (paddle.mean(mask * coef) + self.EPS)
        else:
            loss = loss.reshape((-1,))
            top_k_pixels = int(self.top_k_percent_pixels * loss.numel())
            loss, indices = paddle.topk(loss, top_k_pixels)
            coef = coef.reshape((-1,))
            coef = paddle.gather(coef, indices)
            coef.stop_gradient = True
            coef = coef.astype('float32')
            avg_loss = loss.mean() / (paddle.mean(coef) + self.EPS)

        return avg_loss


class DBLoss(nn.Layer):
    """
    Differentiable Binarization (DB) Loss Function
    args:
        param (dict): the super paramter for DB Loss
    """

    def __init__(self,
                 balance_loss=True,
                 main_loss_type='DiceLoss',
                 alpha=5,
                 beta=10,
                 ohem_ratio=3,
                 eps=1e-6,
                 num_classes=1,
                 **kwargs):
        super(DBLoss, self).__init__()
        self.alpha = alpha
        self.beta = beta
        self.num_classes = num_classes
        self.dice_loss = DiceLoss(eps=eps)
        self.l1_loss = MaskL1Loss(eps=eps)
        self.bce_loss = BalanceLoss(
            balance_loss=balance_loss,
            main_loss_type=main_loss_type,
            negative_ratio=ohem_ratio)

        self.loss_func = CrossEntropyLoss()

    def forward(self, predicts, labels):
        predict_maps = predicts['maps']
        if self.num_classes > 1:
            predict_classes = predicts['classes']
            label_threshold_map, label_threshold_mask, label_shrink_map, label_shrink_mask, class_mask = labels[1:]
        else:
            label_threshold_map, label_threshold_mask, label_shrink_map, label_shrink_mask = labels[1:]

        shrink_maps = predict_maps[:, 0, :, :]
        threshold_maps = predict_maps[:, 1, :, :]
        binary_maps = predict_maps[:, 2, :, :]

        loss_shrink_maps = self.bce_loss(shrink_maps, label_shrink_map,
                                         label_shrink_mask)
        loss_threshold_maps = self.l1_loss(threshold_maps, label_threshold_map,
                                           label_threshold_mask)
        loss_binary_maps = self.dice_loss(binary_maps, label_shrink_map,
                                          label_shrink_mask)
        loss_shrink_maps = self.alpha * loss_shrink_maps
        loss_threshold_maps = self.beta * loss_threshold_maps

        # 处理
        if self.num_classes > 1:
            loss_classes = self.loss_func(predict_classes, class_mask)

            loss_all = loss_shrink_maps + loss_threshold_maps + loss_binary_maps + loss_classes

            losses = {'loss': loss_all,
                      "loss_shrink_maps": loss_shrink_maps,
                      "loss_threshold_maps": loss_threshold_maps,
                      "loss_binary_maps": loss_binary_maps,
                      "loss_classes": loss_classes}
        else:
            loss_all = loss_shrink_maps + loss_threshold_maps + loss_binary_maps

            losses = {'loss': loss_all,
                      "loss_shrink_maps": loss_shrink_maps,
                      "loss_threshold_maps": loss_threshold_maps,
                      "loss_binary_maps": loss_binary_maps}
        return losses

6、修改db_postprocess.py

class DBPostProcess(object):
    """
    The post process for Differentiable Binarization (DB).
    """

    def __init__(self,
                 thresh=0.3,
                 box_thresh=0.7,
                 max_candidates=1000,
                 unclip_ratio=2.0,
                 use_dilation=False,
                 score_mode="fast",
                 **kwargs):
        self.thresh = thresh
        self.box_thresh = box_thresh
        self.max_candidates = max_candidates
        self.unclip_ratio = unclip_ratio
        self.min_size = 3
        self.score_mode = score_mode
        assert score_mode in [
            "slow", "fast"
        ], "Score mode must be in [slow, fast] but got: {}".format(score_mode)

        self.dilation_kernel = None if not use_dilation else np.array(
            [[1, 1], [1, 1]])

    def boxes_from_bitmap(self, pred, _bitmap, classes, dest_width, dest_height):
        '''
        _bitmap: single map with shape (1, H, W),
                whose values are binarized as {0, 1}
        '''

        bitmap = _bitmap
        height, width = bitmap.shape

        outs = cv2.findContours((bitmap * 255).astype(np.uint8), cv2.RETR_LIST,
                                cv2.CHAIN_APPROX_SIMPLE)
        if len(outs) == 3:
            img, contours, _ = outs[0], outs[1], outs[2]
        elif len(outs) == 2:
            contours, _ = outs[0], outs[1]

        num_contours = min(len(contours), self.max_candidates)

        boxes = []
        scores = []
        class_indexes = []
        class_scores = []
        for index in range(num_contours):
            contour = contours[index]
            points, sside = self.get_mini_boxes(contour)
            if sside < self.min_size:
                continue
            points = np.array(points)
            if self.score_mode == "fast":
                score, class_index, class_score = self.box_score_fast(pred, points.reshape(-1, 2), classes)
            else:
                score, class_index, class_score = self.box_score_slow(pred, contour, classes)
            if self.box_thresh > score:
                continue

            box = self.unclip(points).reshape(-1, 1, 2)
            box, sside = self.get_mini_boxes(box)
            if sside < self.min_size + 2:
                continue
            box = np.array(box)

            box[:, 0] = np.clip(
                np.round(box[:, 0] / width * dest_width), 0, dest_width)
            box[:, 1] = np.clip(
                np.round(box[:, 1] / height * dest_height), 0, dest_height)

            boxes.append(box.astype(np.int16))
            scores.append(score)

            class_indexes.append(class_index)
            class_scores.append(class_score)

        if classes is None:
            return np.array(boxes, dtype=np.int16), scores
        else:
            return np.array(boxes, dtype=np.int16), scores, class_indexes, class_scores

    def unclip(self, box):
        unclip_ratio = self.unclip_ratio
        poly = Polygon(box)
        distance = poly.area * unclip_ratio / poly.length
        offset = pyclipper.PyclipperOffset()
        offset.AddPath(box, pyclipper.JT_ROUND, pyclipper.ET_CLOSEDPOLYGON)
        expanded = np.array(offset.Execute(distance))
        return expanded

    def get_mini_boxes(self, contour):
        bounding_box = cv2.minAreaRect(contour)
        points = sorted(list(cv2.boxPoints(bounding_box)), key=lambda x: x[0])

        index_1, index_2, index_3, index_4 = 0, 1, 2, 3
        if points[1][1] > points[0][1]:
            index_1 = 0
            index_4 = 1
        else:
            index_1 = 1
            index_4 = 0
        if points[3][1] > points[2][1]:
            index_2 = 2
            index_3 = 3
        else:
            index_2 = 3
            index_3 = 2

        box = [
            points[index_1], points[index_2], points[index_3], points[index_4]
        ]
        return box, min(bounding_box[1])

    def box_score_fast(self, bitmap, _box, classes):
        '''
        box_score_fast: use bbox mean score as the mean score
        '''
        h, w = bitmap.shape[:2]
        box = _box.copy()
        xmin = np.clip(np.floor(box[:, 0].min()).astype(np.int), 0, w - 1)
        xmax = np.clip(np.ceil(box[:, 0].max()).astype(np.int), 0, w - 1)
        ymin = np.clip(np.floor(box[:, 1].min()).astype(np.int), 0, h - 1)
        ymax = np.clip(np.ceil(box[:, 1].max()).astype(np.int), 0, h - 1)

        mask = np.zeros((ymax - ymin + 1, xmax - xmin + 1), dtype=np.uint8)
        box[:, 0] = box[:, 0] - xmin
        box[:, 1] = box[:, 1] - ymin
        cv2.fillPoly(mask, box.reshape(1, -1, 2).astype(np.int32), 1)

        if classes is None:
            return cv2.mean(bitmap[ymin:ymax + 1, xmin:xmax + 1], mask)[0], None, None
        else:
            k = 999
            class_mask = np.full((ymax - ymin + 1, xmax - xmin + 1), k, dtype=np.int32)

            cv2.fillPoly(class_mask, box.reshape(1, -1, 2).astype(np.int32), 0)
            classes = classes[ymin:ymax + 1, xmin:xmax + 1]

            new_classes = classes + class_mask

            # 拉平
            a = new_classes.reshape(-1)
            b = np.where(a >= k)
            classes = np.delete(a, b[0].tolist())

            class_index = np.argmax(np.bincount(classes))
            class_score = np.sum(classes == class_index) / len(classes)

            return cv2.mean(bitmap[ymin:ymax + 1, xmin:xmax + 1], mask)[0], class_index, class_score

    def box_score_slow(self, bitmap, contour, classes):
        '''
        box_score_slow: use polyon mean score as the mean score
        '''
        h, w = bitmap.shape[:2]
        contour = contour.copy()
        contour = np.reshape(contour, (-1, 2))

        xmin = np.clip(np.min(contour[:, 0]), 0, w - 1)
        xmax = np.clip(np.max(contour[:, 0]), 0, w - 1)
        ymin = np.clip(np.min(contour[:, 1]), 0, h - 1)
        ymax = np.clip(np.max(contour[:, 1]), 0, h - 1)

        mask = np.zeros((ymax - ymin + 1, xmax - xmin + 1), dtype=np.uint8)

        contour[:, 0] = contour[:, 0] - xmin
        contour[:, 1] = contour[:, 1] - ymin

        cv2.fillPoly(mask, contour.reshape(1, -1, 2).astype(np.int32), 1)

        if classes is None:
            return cv2.mean(bitmap[ymin:ymax + 1, xmin:xmax + 1], mask)[0], None, None
        else:
            k = 999
            class_mask = np.full((ymax - ymin + 1, xmax - xmin + 1), k, dtype=np.int32)

            cv2.fillPoly(class_mask, contour.reshape(1, -1, 2).astype(np.int32), 0)
            classes = classes[ymin:ymax + 1, xmin:xmax + 1]

            new_classes = classes + class_mask

            # 拉平
            a = new_classes.reshape(-1)
            b = np.where(a >= k)
            classes = np.delete(a, b[0].tolist())

            class_index = np.argmax(np.bincount(classes))
            class_score = np.sum(classes == class_index) / len(classes)

            return cv2.mean(bitmap[ymin:ymax + 1, xmin:xmax + 1], mask)[0], class_index, class_score

    def __call__(self, outs_dict, shape_list):
        pred = outs_dict['maps']
        if isinstance(pred, paddle.Tensor):
            pred = pred.numpy()
        pred = pred[:, 0, :, :]
        segmentation = pred > self.thresh

        if "classes" in outs_dict:
            classes = outs_dict['classes']

            if isinstance(classes, paddle.Tensor):
                classes = classes.numpy()

        else:
            classes = None

        boxes_batch = []
        for batch_index in range(pred.shape[0]):
            src_h, src_w, ratio_h, ratio_w = shape_list[batch_index]
            if self.dilation_kernel is not None:
                mask = cv2.dilate(
                    np.array(segmentation[batch_index]).astype(np.uint8),
                    self.dilation_kernel)
            else:
                mask = segmentation[batch_index]

            if classes is None:
                boxes, scores = self.boxes_from_bitmap(pred[batch_index], mask, None,
                                                       src_w, src_h)
                boxes_batch.append({'points': boxes})
            else:
                boxes, scores, class_indexes, class_scores = self.boxes_from_bitmap(pred[batch_index], mask,
                                                                                      classes[batch_index],
                                                                                      src_w, src_h)
                boxes_batch.append({'points': boxes, "classes": class_indexes, "class_scores": class_scores})

        return boxes_batch

7、修改train.py、eval.py、infer_det.py和export_model.py

添加这两行代码

    if "num_classes" in global_config:
        config['Architecture']["Head"]['num_classes'] = global_config["num_classes"]
        config['Loss']['num_classes'] = global_config["num_classes"]

完毕!!!

到此,整个网络结构及核心代码就完成了!接下来我们看看实际效果如何。

后面将写几篇文章来讲解DBNet多分类的应用,敬请关注!

来源:番茄小能手

物联沃分享整理
物联沃-IOTWORD物联网 » 基于PaddleOCR的DBNet多分类文本检测网络

发表评论