Source code for easycv.predictors.feature_extractor
# Copyright (c) Alibaba, Inc. and its affiliates.
import math
from glob import glob
import numpy as np
import torch
from PIL import Image
from easycv.framework.errors import ValueError
from easycv.thirdparty.face_align import glint360k_align
from easycv.thirdparty.mtcnn import FaceDetector
from .base import Predictor
from .builder import PREDICTORS
try:
from easy_vision.python.inference.predictor import PredictorInterface
except:
from .interface import PredictorInterface
[docs]@PREDICTORS.register_module()
class TorchFeatureExtractor(PredictorInterface):
[docs] def __init__(self, model_path, model_config=None):
"""
init model
Args:
model_path: model file path
model_config: config string for model to init, in json format
"""
self.predictor = Predictor(model_path)
self.output_name = 'feature'
[docs] def get_output_type(self):
"""
in this function user should return a type dict, which indicates
which type of data should the output of predictor be converted to
* type json, data will be serialized to json str
* type image, data will be converted to encode image binary and write to oss file,
whose name is output_dir/${key}/${input_filename}_${idx}.jpg, where input_filename
is the base filename extracted from url, key corresponds to the key in the dict of output_type,
if the type of data indexed by key is a list, idx is the index of element in list, otherwhile ${idx} will be empty
* type video, data will be converted to encode video binary and write to oss file,
:: return {
'image': 'image',
'feature': 'json'
}
indicating that the image data in the output dict will be save to image
file and feature in output dict will be converted to json
"""
return {}
[docs] def predict(self, input_data_list, batch_size=-1):
"""
using session run predict a number of samples using batch_size
Args:
input_data_list: a list of numpy array, each array is a sample to be predicted
batch_size: batch_size passed by the caller, you can also ignore this param and
use a fixed number if you do not want to adjust batch_size in runtime
Return:
result: a list of dict, each dict is the prediction result of one sample
eg, {"output1": value1, "output2": value2}, the value type can be
python int str float, and numpy array
"""
num_image = len(input_data_list)
assert len(
input_data_list) > 0, 'input images should not be an empty list'
if batch_size > 0:
num_batches = int(math.ceil(float(num_image) / batch_size))
image_list = input_data_list
else:
num_batches = 1
batch_size = len(input_data_list)
image_list = input_data_list
outputs_list = []
for batch_idx in range(num_batches):
batch_image_list = image_list[batch_idx * batch_size:min(
(batch_idx + 1) * batch_size, len(image_list))]
image_tensor_list = self.predictor.preprocess(batch_image_list)
input_data = self.batch(image_tensor_list)
outputs = self.predictor.predict_batch(
input_data, mode='extract')['neck'].data.cpu().numpy()
for idx in range(len(image_tensor_list)):
single_result = {}
out = np.squeeze(outputs[idx])
single_result[self.output_name] = out
outputs_list.append(single_result)
return outputs_list
[docs]@PREDICTORS.register_module()
class TorchFaceFeatureExtractor(PredictorInterface):
[docs] def __init__(self, model_path, model_config=None):
"""
init model, add a facedetect and align for img input.
Args:
model_path: model file path
model_config: config string for model to init, in json format
"""
# Forward compatibility, to support both pth(only face feature model) or tar.gz(face feature model + mtcnn model)
if model_path.endswith('.pth') or model_path.endswith('.pt'):
self.predictor = Predictor(model_path)
self.detector = FaceDetector()
else:
face_model = glob('%s/*.pth' % model_path) + glob(
'%s/*.pt' % model_path)
assert (len(face_model) == 1)
self.predictor = Predictor(face_model[0])
mtcnn_weights = glob('%s/weights/*.npy' % model_path)
if len(mtcnn_weights) != 3:
print(
"User provide model_path doesn't contain mtcnn models, we try to load weights from http, might failed!"
)
self.detector = FaceDetector(dir_path=model_path)
self.output_name = 'feature'
[docs] def get_output_type(self):
"""
in this function user should return a type dict, which indicates
which type of data should the output of predictor be converted to
* type json, data will be serialized to json str
* type image, data will be converted to encode image binary and write to oss file,
whose name is output_dir/${key}/${input_filename}_${idx}.jpg, where input_filename
is the base filename extracted from url, key corresponds to the key in the dict of output_type,
if the type of data indexed by key is a list, idx is the index of element in list, otherwhile ${idx} will be empty
* type video, data will be converted to encode video binary and write to oss file,
:: return {
'image': 'image',
'feature': 'json'
}
indicating that the image data in the output dict will be save to image
file and feature in output dict will be converted to json
"""
return {}
[docs] def predict(self, input_data_list, batch_size=-1, detect_and_align=True):
"""
using session run predict a number of samples using batch_size
Args:
input_data_list: a list of numpy array or PIL.Image, each array is a sample to be predicted
batch_size: batch_size passed by the caller, you can also ignore this param and
use a fixed number if you do not want to adjust batch_size in runtime
detect_and_align: True to detect and align before feature extractor
Return:
result: a list of dict, each dict is the prediction result of one sample
eg, {"output1": value1, "output2": value2}, the value type can be
python int str float, and numpy array
Raise:
if detect !=1 face in a img, then do nothing for this image
"""
num_image = len(input_data_list)
assert len(
input_data_list) > 0, 'input images should not be an empty list'
if batch_size > 0:
num_batches = int(math.ceil(float(num_image) / batch_size))
image_list = input_data_list
else:
num_batches = 1
batch_size = len(input_data_list)
image_list = input_data_list
for i in range(len(image_list)):
if isinstance(image_list[i], np.ndarray):
image_list[i] = Image.fromarray(image_list[i])
outputs_list = []
for batch_idx in range(num_batches):
batch_image_list = image_list[batch_idx * batch_size:min(
(batch_idx + 1) * batch_size, len(image_list))]
if detect_and_align:
for idx, img in enumerate(batch_image_list):
bbox, ld = self.detector.safe_detect(img)
if len(bbox) > 1:
print('batch %d , %dth image has more then 1 face' %
(batch_idx, idx))
batch_image_list[idx] = np.array(
glint360k_align(img, ld[0]))
elif len(bbox) == 0:
print(
'batch %d , %dth image has no face detected, use original img'
% (batch_idx, idx))
batch_image_list[idx] = np.array(
img.resize((112, 112)))
else:
batch_image_list[idx] = np.array(
glint360k_align(img, ld[0]))
else:
for idx, img in enumerate(batch_image_list):
batch_image_list[idx] = np.array(img.resize((112, 112)))
image_tensor_list = self.predictor.preprocess(batch_image_list)
input_data = self.batch(image_tensor_list)
outputs = self.predictor.predict_batch(
input_data, mode='extract')['neck'].data.cpu().numpy()
for idx in range(len(image_tensor_list)):
single_result = {}
out = np.squeeze(outputs[idx])
single_result[self.output_name] = out
outputs_list.append(single_result)
return outputs_list
[docs]@PREDICTORS.register_module()
class TorchMultiFaceFeatureExtractor(PredictorInterface):
[docs] def __init__(self, model_path, model_config=None):
"""
init model, add a facedetect and align for img input.
Args:
model_path: model file path
model_config: config string for model to init, in json format
"""
# Forward compatibility, to support both pth(only face feature model) or tar.gz(face feature model + mtcnn model)
if model_path.endswith('.pth') or model_path.endswith('.pt'):
self.predictor = Predictor(model_path)
self.detector = FaceDetector()
else:
face_model = glob('%s/*.pth' % model_path) + glob(
'%s/*.pt' % model_path)
assert (len(face_model) == 1)
self.predictor = Predictor(face_model[0])
mtcnn_weights = glob('%s/weights/*.npy' % model_path)
if len(mtcnn_weights) != 3:
print(
"User provide model_path doesn't contain mtcnn models, we try to load weights from http, might failed!"
)
self.detector = FaceDetector(dir_path=model_path)
self.output_name = 'feature'
[docs] def get_output_type(self):
"""
in this function user should return a type dict, which indicates
which type of data should the output of predictor be converted to
* type json, data will be serialized to json str
* type image, data will be converted to encode image binary and write to oss file,
whose name is output_dir/${key}/${input_filename}_${idx}.jpg, where input_filename
is the base filename extracted from url, key corresponds to the key in the dict of output_type,
if the type of data indexed by key is a list, idx is the index of element in list, otherwhile ${idx} will be empty
* type video, data will be converted to encode video binary and write to oss file,
:: return {
'image': 'image',
'feature': 'json'
}
indicating that the image data in the output dict will be save to image
file and feature in output dict will be converted to json
"""
return {}
[docs] def predict(self, input_data_list, batch_size=-1, detect_and_align=True):
"""
using session run predict a number of samples using batch_size
Args:
input_data_list: a list of numpy array or PIL.Image, each array is a sample to be predicted
batch_size: batch_size passed by the caller, you can also ignore this param and
use a fixed number if you do not want to adjust batch_size in runtime
detect_and_align: True to detect and align before feature extractor
Return:
result: a list of dict, each dict is the prediction result of one sample
eg, {"output1": value1, "output2": value2}, the value type can be
python int str float, and numpy array
Raise:
if detect !=1 face in a img, then do nothing for this image
"""
num_image = len(input_data_list)
outputs_list = []
for idx in range(num_image):
img = input_data_list[idx]
if isinstance(img, np.ndarray):
img = Image.fromarray(img)
sub_imgs = []
if detect_and_align:
bboxes, ld = self.detector.safe_detect(img)
if len(bboxes) > 1:
print('%dth image has more then 1 face' % idx)
for box, subld in zip(bboxes, ld):
sub_imgs.append(np.array(glint360k_align(img, subld)))
# sub_imgs.append(np.array(glint360k_align(img, ld[0])))
elif len(bboxes) == 0:
print('%dth image has no face detected, use original img' %
idx)
sub_imgs.append(np.array(img.resize((112, 112))))
else:
sub_imgs.append(np.array(glint360k_align(img, ld[0])))
else:
sub_imgs.append(np.array(img.resize((112, 112))))
# x1,y1 x2,y2,score
bboxes = [[0, 0, 111, 111, 1.0]]
image_tensor_list = self.predictor.preprocess(sub_imgs)
input_data = self.batch(image_tensor_list)
outputs = self.predictor.predict_batch(
input_data, mode='extract')['neck'].data.cpu().numpy()
# for sub_idx in range(len(image_tensor_list)):
single_result = {}
if len(bboxes) == 0:
bboxes = [[0, 0, 111, 111, 1.0]]
# out = np.squeeze(outputs[0])
single_result[self.output_name] = outputs
single_result['bbox'] = bboxes
outputs_list.append(single_result)
return outputs_list
[docs]@PREDICTORS.register_module()
class TorchFaceAttrExtractor(PredictorInterface):
[docs] def __init__(
self,
model_path,
model_config=None,
face_threshold=0.95,
attr_method=['distribute_sum', 'softmax', 'softmax'],
attr_name=['age', 'gender', 'emo'],
):
"""
init model
Args:
model_path: model file path
model_config: config string for model to init, in json format
attr_method:
- softmax: do softmax for feature_dim 1
- distribute_sum: do softmax and prob sum
"""
if model_path.endswith('.pth') or model_path.endswith('.pt'):
self.predictor = Predictor(model_path)
self.detector = FaceDetector()
else:
face_model = glob('%s/*.pth' % model_path) + glob(
'%s/*.pt' % model_path)
assert (len(face_model) == 1)
self.predictor = Predictor(face_model[0])
mtcnn_weights = glob('%s/weights/*.npy' % model_path)
if len(mtcnn_weights) != 3:
print(
"User provide model_path doesn't contain mtcnn models, we try to load weights from http, might failed!"
)
self.detector = FaceDetector(dir_path=model_path)
self.attr_name = attr_name
self.attr_method = attr_method
assert (len(self.attr_method) == len(self.attr_name))
self.gender_map = {0: 'female', 1: 'male'}
self.emo_map = {
0: 'Neutral',
1: 'Happiness',
2: 'Sadness',
3: 'Surprise',
4: 'Fear',
5: 'Disgust',
6: 'Anger',
7: 'Contempt',
}
self.pop_map = {
0: '0',
1: '1',
2: '2',
3: '3',
4: '4',
5: '5',
}
self.face_threshold = face_threshold
[docs] def get_output_type(self):
"""
in this function user should return a type dict, which indicates
which type of data should the output of predictor be converted to
* type json, data will be serialized to json str
* type image, data will be converted to encode image binary and write to oss file,
whose name is output_dir/${key}/${input_filename}_${idx}.jpg, where input_filename
is the base filename extracted from url, key corresponds to the key in the dict of output_type,
if the type of data indexed by key is a list, idx is the index of element in list, otherwhile ${idx} will be empty
* type video, data will be converted to encode video binary and write to oss file,
:: return {
'image': 'image',
'feature': 'json'
}
indicating that the image data in the output dict will be save to image
file and feature in output dict will be converted to json
"""
return {}
[docs] def predict(self, input_data_list, batch_size=-1):
"""
using session run predict a number of samples using batch_size
Args:
input_data_list: a list of numpy array, each array is a sample to be predicted
batch_size: batch_size passed by the caller, you can also ignore this param and
use a fixed number if you do not want to adjust batch_size in runtime
Return:
result: a list of dict, each dict is the prediction result of one sample
eg, {"output1": value1, "output2": value2}, the value type can be
python int str float, and numpy array
"""
num_image = len(input_data_list)
assert len(
input_data_list) > 0, 'input images should not be an empty list'
if batch_size > 0:
num_batches = int(math.ceil(float(num_image) / batch_size))
image_list = input_data_list
else:
num_batches = 1
batch_size = len(input_data_list)
image_list = input_data_list
outputs_list = []
for batch_idx in range(num_batches):
batch_image_list = image_list[
batch_idx *
batch_size:min(len(image_list), (batch_idx + 1) * batch_size)]
face_image_list = []
face_bbox_list = []
faceidx_by_imageidx = {}
for idx, img in enumerate(batch_image_list):
# this try except only happens to no face detected
bbox, ld = self.detector.safe_detect(img)
if len(bbox) == 0:
print('batch %d , %dth image has no face detected' %
(batch_idx, idx))
elif len(bbox) >= 1:
if len(bbox) > 1:
print('batch %d , %dth image has more then %d face' %
(batch_idx, idx, len(bbox)))
_bbox = []
_ld = []
for idx, b in enumerate(bbox):
if b[-1] > self.face_threshold:
_bbox.append(b)
_ld.append(ld[idx])
bbox = _bbox
ld = _ld
# this is for muti face detectd in one img
faceidx_by_imageidx[idx] = []
for bbox_idx, face_box in enumerate(bbox):
face_image_list.append(
glint360k_align(img, ld[bbox_idx]))
face_bbox_list.append(face_box)
face_idx = len(face_image_list) - 1
faceidx_by_imageidx[idx].append(face_idx)
# else:
# batch_image_list[idx] = np.array(glint360k_align(img, ld[0]))
if len(face_image_list) > 0:
image_tensor_list = self.predictor.preprocess(face_image_list)
input_data = self.batch(image_tensor_list)
outputs = self.predictor.predict_batch(
input_data, mode='extract')
neck_output_dict = {}
for neck_idx, attr_method in enumerate(self.attr_method):
neck_output = outputs['neck_%d_0' % neck_idx]
neck_output = torch.nn.Softmax(dim=1)(neck_output)
if attr_method == 'softmax':
neck_output = torch.argmax(neck_output, dim=1)
elif attr_method == 'distribute_sum':
n, c = neck_output.size()
distribute = torch.arange(0, c).repeat(n, 1).to(
neck_output.device)
neck_output = (distribute * neck_output).sum(dim=1)
else:
raise ValueError(
'TorchFaceAttrExtractor for neck %d only support attr_method softmax/distributed sum'
% (neck_idx))
neck_output = torch.argmax(neck_output, dim=1)
neck_output_dict[neck_idx] = neck_output.cpu().numpy()
for imgidx in faceidx_by_imageidx.keys():
single_result = {}
for k in neck_output_dict.keys():
single_result['face_' + self.attr_name[k]] = []
single_result['face_bbox'] = []
for fn, faceidx in enumerate(faceidx_by_imageidx[imgidx]):
for k in neck_output_dict.keys():
out = np.squeeze(neck_output_dict[k][faceidx])
if self.attr_method[k] == 'softmax':
label_map = getattr(
self, '%s_map' % self.attr_name[k])
out = label_map[out]
single_result['face_' +
self.attr_name[k]].append(out)
single_result['face_bbox'].append(
face_bbox_list[faceidx])
outputs_list.append(single_result)
return outputs_list