Source code for easycv.datasets.detection.data_sources.coco_panoptic

import os
from collections import defaultdict

import mmcv
import numpy as np
from xtcocotools.coco import COCO

from easycv.datasets.detection.data_sources import DetSourceCoco
from easycv.datasets.registry import DATASOURCES, PIPELINES
from easycv.datasets.shared.pipelines import Compose
from easycv.framework.errors import RuntimeError, TypeError
from easycv.utils.registry import build_from_cfg

try:
    import panopticapi
    from panopticapi.evaluation import VOID
    from panopticapi.utils import id2rgb
except ImportError:
    panopticapi = None
    id2rgb = None
    VOID = None

INSTANCE_OFFSET = 1000


class COCOPanoptic(COCO):
    """This wrapper is for loading the panoptic style annotation file.

    The format is shown in the CocoPanopticDataset class.

    Args:
        annotation_file (str): Path of annotation file.
    """

    def __init__(self, annotation_file=None):
        if panopticapi is None:
            raise RuntimeError(
                'panopticapi is not installed, please install it by: '
                'pip install git+https://github.com/cocodataset/'
                'panopticapi.git.')

        super(COCOPanoptic, self).__init__(annotation_file)

    def createIndex(self):
        # create index
        print('creating index...')
        # anns stores 'segment_id -> annotation'
        anns, cats, imgs = {}, {}, {}
        img_to_anns, cat_to_imgs = defaultdict(list), defaultdict(list)
        if 'annotations' in self.dataset:
            for ann, img_info in zip(self.dataset['annotations'],
                                     self.dataset['images']):
                img_info['segm_file'] = ann['file_name']
                for seg_ann in ann['segments_info']:
                    # to match with instance.json
                    seg_ann['image_id'] = ann['image_id']
                    seg_ann['height'] = img_info['height']
                    seg_ann['width'] = img_info['width']
                    img_to_anns[ann['image_id']].append(seg_ann)
                    # segment_id is not unique in coco dataset orz...
                    if seg_ann['id'] in anns.keys():
                        anns[seg_ann['id']].append(seg_ann)
                    else:
                        anns[seg_ann['id']] = [seg_ann]

        if 'images' in self.dataset:
            for img in self.dataset['images']:
                imgs[img['id']] = img

        if 'categories' in self.dataset:
            for cat in self.dataset['categories']:
                cats[cat['id']] = cat

        if 'annotations' in self.dataset and 'categories' in self.dataset:
            for ann in self.dataset['annotations']:
                for seg_ann in ann['segments_info']:
                    cat_to_imgs[seg_ann['category_id']].append(ann['image_id'])

        print('index created!')

        self.anns = anns
        self.imgToAnns = img_to_anns
        self.catToImgs = cat_to_imgs
        self.imgs = imgs
        self.cats = cats

    def load_anns(self, ids=[]):
        """Load anns with the specified ids.

        self.anns is a list of annotation lists instead of a
        list of annotations.

        Args:
            ids (int array): integer ids specifying anns

        Returns:
            anns (object array): loaded ann objects
        """
        anns = []

        if hasattr(ids, '__iter__') and hasattr(ids, '__len__'):
            # self.anns is a list of annotation lists instead of
            # a list of annotations
            for id in ids:
                anns += self.anns[id]
            return anns
        elif type(ids) == int:
            return self.anns[ids]


[docs]@DATASOURCES.register_module class DetSourceCocoPanoptic(DetSourceCoco): """ cocopanoptic data source """
[docs] def __init__(self, ann_file, pan_ann_file, img_prefix, seg_prefix, pipeline, outfile_prefix='test/test_pan', test_mode=False, filter_empty_gt=False, thing_classes=None, stuff_classes=None, iscrowd=False): """ Args: ann_file (str): Path of coco detection annotation file pan_ann_file (str): Path of coco panoptic annotation file img_prefix (str): Path of image file seg_prefix (str): Path of semantic image file pipeline (list[dict]): list of data augmentatin operation outfile_prefix (str, optional): The filename prefix of the output files. If the prefix is "somepath/xxx", the json files will be named "somepath/xxx.panoptic.json", "somepath/xxx.bbox.json", "somepath/xxx.segm.json" test_mode (bool, optional): If set True, `self._filter_imgs` will not works. filter_empty_gt (bool, optional): If set true, images without bounding boxes of the dataset's classes will be filtered out. This option only works when `test_mode=False`, i.e., we never filter images during tests. thing_classes (list[str], optional): list of thing classes. Defaults to None. stuff_classes (list[str], optional): list of thing classes. Defaults to None. iscrowd (bool, optional): when traing setted as False, when val setted as True. Defaults to False. """ super().__init__( ann_file, img_prefix, pipeline, test_mode=test_mode, filter_empty_gt=filter_empty_gt, classes=thing_classes, iscrowd=iscrowd) self.outfile_prefix = outfile_prefix self.pan_ann_file = pan_ann_file self.seg_prefix = seg_prefix self.thing_classes = thing_classes self.stuff_classes = stuff_classes # load annotations (and proposals) self.data_infos_pan = self.load_annotations_pan(self.pan_ann_file) if not test_mode: valid_inds = self._filter_imgs_pan() self.data_infos_pan = [self.data_infos_pan[i] for i in valid_inds] self._set_group_flag_pan() transforms = [] for transform in pipeline: if isinstance(transform, dict): transform = build_from_cfg(transform, PIPELINES) transforms.append(transform) elif callable(transform): transforms.append(transform) else: raise TypeError('transform must be callable or a dict') self.pipeline = Compose(transforms)
[docs] def load_annotations_pan(self, ann_file): """Load annotation from COCO Panoptic style annotation file. Args: ann_file (str): Path of annotation file. Returns: list[dict]: Annotation info from COCO api. """ self.coco_pan = COCOPanoptic(ann_file) self.cat_ids_pan = self.coco_pan.getCatIds() self.cat2label_pan = { cat_id: i for i, cat_id in enumerate(self.cat_ids_pan) } self.categories_pan = self.coco_pan.cats self.img_ids_pan = self.coco_pan.getImgIds() data_infos = [] for i in self.img_ids_pan: info = self.coco_pan.loadImgs([i])[0] info['filename'] = info['file_name'] info['segm_file'] = info['filename'].replace('jpg', 'png') data_infos.append(info) return data_infos
[docs] def get_ann_info_pan(self, idx): """Get COCO annotation by index. Args: idx (int): Index of data. Returns: dict: Annotation info of specified index. """ img_id = self.data_infos_pan[idx]['id'] ann_ids = self.coco_pan.getAnnIds(imgIds=[img_id]) ann_info = self.coco_pan.load_anns(ann_ids) # filter out unmatched images ann_info = [i for i in ann_info if i['image_id'] == img_id] return self._parse_ann_info_pan(self.data_infos_pan[idx], ann_info)
def _parse_ann_info_pan(self, img_info, ann_info): """Parse annotations and load panoptic ground truths. Args: img_info (int): Image info of an image. ann_info (list[dict]): Annotation info of an image. Returns: dict: A dict containing the following keys: bboxes, bboxes_ignore, labels, masks, seg_map. """ gt_bboxes = [] gt_labels = [] gt_bboxes_ignore = [] gt_mask_infos = [] for i, ann in enumerate(ann_info): x1, y1, w, h = ann['bbox'] if ann['area'] <= 0 or w < 1 or h < 1: continue bbox = [x1, y1, x1 + w, y1 + h] category_id = ann['category_id'] contiguous_cat_id = self.cat2label_pan[category_id] is_thing = self.coco_pan.loadCats(ids=category_id)[0]['isthing'] if is_thing: is_crowd = ann.get('iscrowd', False) if not is_crowd: gt_bboxes.append(bbox) gt_labels.append(contiguous_cat_id) else: gt_bboxes_ignore.append(bbox) is_thing = False mask_info = { 'id': ann['id'], 'category': contiguous_cat_id, 'is_thing': is_thing } gt_mask_infos.append(mask_info) if gt_bboxes: gt_bboxes = np.array(gt_bboxes, dtype=np.float32) gt_labels = np.array(gt_labels, dtype=np.int64) else: gt_bboxes = np.zeros((0, 4), dtype=np.float32) gt_labels = np.array([], dtype=np.int64) if gt_bboxes_ignore: gt_bboxes_ignore = np.array(gt_bboxes_ignore, dtype=np.float32) else: gt_bboxes_ignore = np.zeros((0, 4), dtype=np.float32) ann = dict( bboxes=gt_bboxes, labels=gt_labels, bboxes_ignore=gt_bboxes_ignore, masks=gt_mask_infos, seg_map=img_info['segm_file']) return ann def _filter_imgs_pan(self, min_size=32): """Filter images too small or without ground truths.""" ids_with_ann = [] # check whether images have legal thing annotations. for lists in self.coco_pan.anns.values(): for item in lists: category_id = item['category_id'] is_thing = self.coco_pan.loadCats( ids=category_id)[0]['isthing'] if not is_thing: continue ids_with_ann.append(item['image_id']) ids_with_ann = set(ids_with_ann) valid_inds = [] valid_img_ids = [] for i, img_info in enumerate(self.data_infos_pan): img_id = self.img_ids_pan[i] if self.filter_empty_gt and img_id not in ids_with_ann: continue if min(img_info['width'], img_info['height']) >= min_size: valid_inds.append(i) valid_img_ids.append(img_id) self.img_ids_pan = valid_img_ids return valid_inds
[docs] def pre_pipeline(self, results): """Prepare results dict for pipeline.""" results['img_prefix'] = self.img_prefix results['seg_prefix'] = self.seg_prefix results['bbox_fields'] = [] results['mask_fields'] = [] results['seg_fields'] = []
[docs] def prepare_train_img(self, idx): """Get training data and annotations after pipeline. Args: idx (int): Index of data. Returns: dict: Training data and annotation after pipeline with new keys \ introduced by pipeline. """ img_info = self.data_infos_pan[idx] ann_info = self.get_ann_info_pan(idx) results = dict(img_info=img_info, ann_info=ann_info) self.pre_pipeline(results) return self.pipeline(results)
def _set_group_flag_pan(self): """Set flag according to image aspect ratio. Images with aspect ratio greater than 1 will be set as group 1, otherwise group 0. """ self.flag = np.zeros(len(self), dtype=np.uint8) for i in range(len(self)): img_info = self.data_infos_pan[i] if img_info['width'] / img_info['height'] > 1: self.flag[i] = 1 def _pan2json(self, results): """Convert panoptic results to COCO panoptic json style.""" label2cat = dict((v, k) for (k, v) in self.cat2label_pan.items()) pred_annotations = [] outdir = os.path.join(os.path.dirname(self.outfile_prefix), 'panoptic') for idx in range(len(self)): img_id = self.img_ids_pan[idx] segm_file = self.data_infos_pan[idx]['segm_file'] pan = results[idx] pan_labels = np.unique(pan) segm_info = [] for pan_label in pan_labels: sem_label = pan_label % INSTANCE_OFFSET # We reserve the length of self.CLASSES for VOID label if sem_label == len(self.thing_classes + self.stuff_classes): continue # convert sem_label to json label cat_id = label2cat[sem_label] is_thing = self.categories_pan[cat_id]['isthing'] mask = pan == pan_label area = mask.sum() segm_info.append({ 'id': int(pan_label), 'category_id': cat_id, 'isthing': is_thing, 'area': int(area) }) # evaluation script uses 0 for VOID label. pan[pan % INSTANCE_OFFSET == len(self.thing_classes + self.stuff_classes)] = VOID pan = id2rgb(pan).astype(np.uint8) mmcv.imwrite(pan[:, :, ::-1], os.path.join(outdir, segm_file)) record = { 'image_id': img_id, 'segments_info': segm_info, 'file_name': segm_file } pred_annotations.append(record) pan_json_results = dict(annotations=pred_annotations) return pan_json_results
[docs] def results2json(self, results): """Dump the results to a COCO style json file. There are 4 types of results: proposals, bbox predictions, mask predictions, panoptic segmentation predictions, and they have different data types. This method will automatically recognize the type, and dump them to json files. .. code-block:: none [ { 'pan_results': np.array, # shape (h, w) # ins_results which includes bboxes and RLE encoded masks # is optional. 'ins_results': (list[np.array], list[list[str]]) }, ... ] Args: results (list[dict]): Testing results of the dataset. Returns: dict[str: str]: Possible keys are "panoptic", "bbox", "segm", \ "proposal", and values are corresponding filenames. """ result_files = dict() # panoptic segmentation results if 'pan_results' in results: pan_results = results['pan_results'] pan_json_results = self._pan2json(pan_results) result_files['panoptic'] = f'{self.outfile_prefix}.panoptic.json' mmcv.dump(pan_json_results, result_files['panoptic']) return result_files
[docs] def get_gt_json(self, result_files): """get input for coco panptic evaluation Args: result_files (dict): path of predict result Returns: gt_json (dict): gt label gt_folder (str): path of gt file pred_json(dict): predict result pred_folder(str): path of pred file categories(dict): panoptic categories """ imgs = self.coco_pan.imgs gt_json = self.coco_pan.imgToAnns gt_json = [{ 'image_id': k, 'segments_info': v, 'file_name': imgs[k]['segm_file'] } for k, v in gt_json.items()] pred_json = mmcv.load(result_files['panoptic']) pred_json = dict( (el['image_id'], el) for el in pred_json['annotations']) gt_folder = self.seg_prefix pred_folder = os.path.join( os.path.dirname(self.outfile_prefix), 'panoptic') categories = self.categories_pan return gt_json, gt_folder, pred_json, pred_folder, categories