diff --git a/.gitignore b/.gitignore index 163aefc4b..df5968c48 100644 --- a/.gitignore +++ b/.gitignore @@ -172,4 +172,5 @@ annotator/downloads/ # test results and expectations web_tests/results/ -web_tests/expectations/ \ No newline at end of file +web_tests/expectations/ +*_diff.png \ No newline at end of file diff --git a/annotator/openpose/__init__.py b/annotator/openpose/__init__.py index 6602dbf8f..34c1b1de7 100644 --- a/annotator/openpose/__init__.py +++ b/annotator/openpose/__init__.py @@ -17,23 +17,19 @@ from .body import Body, BodyResult, Keypoint from .hand import Hand from .face import Face +from .types import PoseResult, HandResult, FaceResult from modules import devices from annotator.annotator_path import models_path -from typing import NamedTuple, Tuple, List, Callable, Union, Optional +from typing import Tuple, List, Callable, Union, Optional body_model_path = "https://huggingface.co/lllyasviel/Annotators/resolve/main/body_pose_model.pth" hand_model_path = "https://huggingface.co/lllyasviel/Annotators/resolve/main/hand_pose_model.pth" face_model_path = "https://huggingface.co/lllyasviel/Annotators/resolve/main/facenet.pth" -HandResult = List[Keypoint] -FaceResult = List[Keypoint] +remote_onnx_det = "https://huggingface.co/yzd-v/DWPose/resolve/main/yolox_l.onnx" +remote_onnx_pose = "https://huggingface.co/yzd-v/DWPose/resolve/main/dw-ll_ucoco_384.onnx" -class PoseResult(NamedTuple): - body: BodyResult - left_hand: Union[HandResult, None] - right_hand: Union[HandResult, None] - face: Union[FaceResult, None] def draw_poses(poses: List[PoseResult], H, W, draw_body=True, draw_hand=True, draw_face=True): """ @@ -162,8 +158,7 @@ def compress_keypoints(keypoints: Union[List[Keypoint], None]) -> Union[List[flo 'canvas_height': canvas_height, 'canvas_width': canvas_width, }, indent=4) - - + class OpenposeDetector: """ A class for detecting human poses in images using the Openpose model. @@ -179,6 +174,8 @@ def __init__(self): self.hand_estimation = None self.face_estimation = None + self.dw_pose_estimation = None + def load_model(self): """ Load the Openpose body, hand, and face models. @@ -202,10 +199,25 @@ def load_model(self): self.body_estimation = Body(body_modelpath) self.hand_estimation = Hand(hand_modelpath) self.face_estimation = Face(face_modelpath) + + def load_dw_model(self): + from .wholebody import Wholebody # DW Pose + + def load_model(filename: str, remote_url: str): + local_path = os.path.join(self.model_dir, filename) + if not os.path.exists(local_path): + from basicsr.utils.download_util import load_file_from_url + load_file_from_url(remote_url, model_dir=self.model_dir) + return local_path + + onnx_det = load_model("yolox_l.onnx", remote_onnx_det) + onnx_pose = load_model("dw-ll_ucoco_384.onnx", remote_onnx_pose) + self.dw_pose_estimation = Wholebody(onnx_det, onnx_pose) def unload_model(self): """ Unload the Openpose models by moving them to the CPU. + Note: DW Pose models always run on CPU, so no need to `unload` them. """ if self.body_estimation is not None: self.body_estimation.model.to("cpu") @@ -302,10 +314,29 @@ def detect_poses(self, oriImg, include_hand=False, include_face=False) -> List[P ), left_hand, right_hand, face)) return results - + + def detect_poses_dw(self, oriImg) -> List[PoseResult]: + """ + Detect poses in the given image using DW Pose: + https://github.com/IDEA-Research/DWPose + + Args: + oriImg (numpy.ndarray): The input image for pose detection. + + Returns: + List[PoseResult]: A list of PoseResult objects containing the detected poses. + """ + from .wholebody import Wholebody # DW Pose + + self.load_dw_model() + + with torch.no_grad(): + keypoints_info = self.dw_pose_estimation(oriImg.copy()) + return Wholebody.format_result(keypoints_info) + def __call__( - self, oriImg, include_body=True, include_hand=False, include_face=False, - json_pose_callback: Callable[[str], None] = None, + self, oriImg, include_body=True, include_hand=False, include_face=False, + use_dw_pose=False, json_pose_callback: Callable[[str], None] = None, ): """ Detect and draw poses in the given image. @@ -315,14 +346,19 @@ def __call__( include_body (bool, optional): Whether to include body keypoints. Defaults to True. include_hand (bool, optional): Whether to include hand keypoints. Defaults to False. include_face (bool, optional): Whether to include face keypoints. Defaults to False. + use_dw_pose (bool, optional): Whether to use DW pose detection algorithm. Defaults to False. json_pose_callback (Callable, optional): A callback that accepts the pose JSON string. Returns: numpy.ndarray: The image with detected and drawn poses. """ H, W, _ = oriImg.shape - poses = self.detect_poses(oriImg, include_hand, include_face) + + if use_dw_pose: + poses = self.detect_poses_dw(oriImg) + else: + poses = self.detect_poses(oriImg, include_hand, include_face) + if json_pose_callback: json_pose_callback(encode_poses_as_json(poses, H, W)) - return draw_poses(poses, H, W, draw_body=include_body, draw_hand=include_hand, draw_face=include_face) - \ No newline at end of file + return draw_poses(poses, H, W, draw_body=include_body, draw_hand=include_hand, draw_face=include_face) diff --git a/annotator/openpose/body.py b/annotator/openpose/body.py index 168dde3c9..32934f19e 100644 --- a/annotator/openpose/body.py +++ b/annotator/openpose/body.py @@ -11,24 +11,7 @@ from . import util from .model import bodypose_model - -class Keypoint(NamedTuple): - x: float - y: float - score: float = 1.0 - id: int = -1 - - -class BodyResult(NamedTuple): - # Note: Using `Union` instead of `|` operator as the ladder is a Python - # 3.10 feature. - # Annotator code should be Python 3.8 Compatible, as controlnet repo uses - # Python 3.8 environment. - # https://github.com/lllyasviel/ControlNet/blob/d3284fcd0972c510635a4f5abe2eeb71dc0de524/environment.yaml#L6 - keypoints: List[Union[Keypoint, None]] - total_score: float = 0.0 - total_parts: int = 0 - +from .types import Keypoint, BodyResult class Body(object): def __init__(self, model_path): diff --git a/annotator/openpose/cv_ox_det.py b/annotator/openpose/cv_ox_det.py new file mode 100644 index 000000000..f261cb891 --- /dev/null +++ b/annotator/openpose/cv_ox_det.py @@ -0,0 +1,124 @@ +import cv2 +import numpy as np + +def nms(boxes, scores, nms_thr): + """Single class NMS implemented in Numpy.""" + x1 = boxes[:, 0] + y1 = boxes[:, 1] + x2 = boxes[:, 2] + y2 = boxes[:, 3] + + areas = (x2 - x1 + 1) * (y2 - y1 + 1) + order = scores.argsort()[::-1] + + keep = [] + while order.size > 0: + i = order[0] + keep.append(i) + xx1 = np.maximum(x1[i], x1[order[1:]]) + yy1 = np.maximum(y1[i], y1[order[1:]]) + xx2 = np.minimum(x2[i], x2[order[1:]]) + yy2 = np.minimum(y2[i], y2[order[1:]]) + + w = np.maximum(0.0, xx2 - xx1 + 1) + h = np.maximum(0.0, yy2 - yy1 + 1) + inter = w * h + ovr = inter / (areas[i] + areas[order[1:]] - inter) + + inds = np.where(ovr <= nms_thr)[0] + order = order[inds + 1] + + return keep + +def multiclass_nms(boxes, scores, nms_thr, score_thr): + """Multiclass NMS implemented in Numpy. Class-aware version.""" + final_dets = [] + num_classes = scores.shape[1] + for cls_ind in range(num_classes): + cls_scores = scores[:, cls_ind] + valid_score_mask = cls_scores > score_thr + if valid_score_mask.sum() == 0: + continue + else: + valid_scores = cls_scores[valid_score_mask] + valid_boxes = boxes[valid_score_mask] + keep = nms(valid_boxes, valid_scores, nms_thr) + if len(keep) > 0: + cls_inds = np.ones((len(keep), 1)) * cls_ind + dets = np.concatenate( + [valid_boxes[keep], valid_scores[keep, None], cls_inds], 1 + ) + final_dets.append(dets) + if len(final_dets) == 0: + return None + return np.concatenate(final_dets, 0) + +def demo_postprocess(outputs, img_size, p6=False): + grids = [] + expanded_strides = [] + strides = [8, 16, 32] if not p6 else [8, 16, 32, 64] + + hsizes = [img_size[0] // stride for stride in strides] + wsizes = [img_size[1] // stride for stride in strides] + + for hsize, wsize, stride in zip(hsizes, wsizes, strides): + xv, yv = np.meshgrid(np.arange(wsize), np.arange(hsize)) + grid = np.stack((xv, yv), 2).reshape(1, -1, 2) + grids.append(grid) + shape = grid.shape[:2] + expanded_strides.append(np.full((*shape, 1), stride)) + + grids = np.concatenate(grids, 1) + expanded_strides = np.concatenate(expanded_strides, 1) + outputs[..., :2] = (outputs[..., :2] + grids) * expanded_strides + outputs[..., 2:4] = np.exp(outputs[..., 2:4]) * expanded_strides + + return outputs + +def preprocess(img, input_size, swap=(2, 0, 1)): + if len(img.shape) == 3: + padded_img = np.ones((input_size[0], input_size[1], 3), dtype=np.uint8) * 114 + else: + padded_img = np.ones(input_size, dtype=np.uint8) * 114 + + r = min(input_size[0] / img.shape[0], input_size[1] / img.shape[1]) + resized_img = cv2.resize( + img, + (int(img.shape[1] * r), int(img.shape[0] * r)), + interpolation=cv2.INTER_LINEAR, + ).astype(np.uint8) + padded_img[: int(img.shape[0] * r), : int(img.shape[1] * r)] = resized_img + + padded_img = padded_img.transpose(swap) + padded_img = np.ascontiguousarray(padded_img, dtype=np.float32) + return padded_img, r + +def inference_detector(session, oriImg): + input_shape = (640,640) + img, ratio = preprocess(oriImg, input_shape) + + input = img[None, :, :, :] + outNames = session.getUnconnectedOutLayersNames() + session.setInput(input) + output = session.forward(outNames) + + predictions = demo_postprocess(output[0], input_shape)[0] + + boxes = predictions[:, :4] + scores = predictions[:, 4:5] * predictions[:, 5:] + + boxes_xyxy = np.ones_like(boxes) + boxes_xyxy[:, 0] = boxes[:, 0] - boxes[:, 2]/2. + boxes_xyxy[:, 1] = boxes[:, 1] - boxes[:, 3]/2. + boxes_xyxy[:, 2] = boxes[:, 0] + boxes[:, 2]/2. + boxes_xyxy[:, 3] = boxes[:, 1] + boxes[:, 3]/2. + boxes_xyxy /= ratio + dets = multiclass_nms(boxes_xyxy, scores, nms_thr=0.45, score_thr=0.1) + if dets is not None: + final_boxes, final_scores, final_cls_inds = dets[:, :4], dets[:, 4], dets[:, 5] + isscore = final_scores>0.3 + iscat = final_cls_inds == 0 + isbbox = [ i and j for (i, j) in zip(isscore, iscat)] + final_boxes = final_boxes[isbbox] + + return final_boxes diff --git a/annotator/openpose/cv_ox_pose.py b/annotator/openpose/cv_ox_pose.py new file mode 100644 index 000000000..a798a5f49 --- /dev/null +++ b/annotator/openpose/cv_ox_pose.py @@ -0,0 +1,355 @@ +from typing import List, Tuple + +import cv2 +import numpy as np + +def preprocess( + img: np.ndarray, out_bbox, input_size: Tuple[int, int] = (192, 256) +) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + """Do preprocessing for DWPose model inference. + + Args: + img (np.ndarray): Input image in shape. + input_size (tuple): Input image size in shape (w, h). + + Returns: + tuple: + - resized_img (np.ndarray): Preprocessed image. + - center (np.ndarray): Center of image. + - scale (np.ndarray): Scale of image. + """ + # get shape of image + img_shape = img.shape[:2] + out_img, out_center, out_scale = [], [], [] + if len(out_bbox) == 0: + out_bbox = [[0, 0, img_shape[1], img_shape[0]]] + for i in range(len(out_bbox)): + x0 = out_bbox[i][0] + y0 = out_bbox[i][1] + x1 = out_bbox[i][2] + y1 = out_bbox[i][3] + bbox = np.array([x0, y0, x1, y1]) + + # get center and scale + center, scale = bbox_xyxy2cs(bbox, padding=1.25) + + # do affine transformation + resized_img, scale = top_down_affine(input_size, scale, center, img) + + # normalize image + mean = np.array([123.675, 116.28, 103.53]) + std = np.array([58.395, 57.12, 57.375]) + resized_img = (resized_img - mean) / std + + out_img.append(resized_img) + out_center.append(center) + out_scale.append(scale) + + return out_img, out_center, out_scale + + +def inference(sess, img): + """Inference DWPose model. + + Args: + sess : ONNXRuntime session. + img : Input image in shape. + + Returns: + outputs : Output of DWPose model. + """ + all_out = [] + # build input + for i in range(len(img)): + + input = img[i].transpose(2, 0, 1) + input = input[None, :, :, :] + + outNames = sess.getUnconnectedOutLayersNames() + sess.setInput(input) + outputs = sess.forward(outNames) + all_out.append(outputs) + + return all_out + + +def postprocess(outputs: List[np.ndarray], + model_input_size: Tuple[int, int], + center: Tuple[int, int], + scale: Tuple[int, int], + simcc_split_ratio: float = 2.0 + ) -> Tuple[np.ndarray, np.ndarray]: + """Postprocess for DWPose model output. + + Args: + outputs (np.ndarray): Output of RTMPose model. + model_input_size (tuple): RTMPose model Input image size. + center (tuple): Center of bbox in shape (x, y). + scale (tuple): Scale of bbox in shape (w, h). + simcc_split_ratio (float): Split ratio of simcc. + + Returns: + tuple: + - keypoints (np.ndarray): Rescaled keypoints. + - scores (np.ndarray): Model predict scores. + """ + all_key = [] + all_score = [] + for i in range(len(outputs)): + # use simcc to decode + simcc_x, simcc_y = outputs[i] + keypoints, scores = decode(simcc_x, simcc_y, simcc_split_ratio) + + # rescale keypoints + keypoints = keypoints / model_input_size * scale[i] + center[i] - scale[i] / 2 + all_key.append(keypoints[0]) + all_score.append(scores[0]) + + return np.array(all_key), np.array(all_score) + + +def bbox_xyxy2cs(bbox: np.ndarray, + padding: float = 1.) -> Tuple[np.ndarray, np.ndarray]: + """Transform the bbox format from (x,y,w,h) into (center, scale) + + Args: + bbox (ndarray): Bounding box(es) in shape (4,) or (n, 4), formatted + as (left, top, right, bottom) + padding (float): BBox padding factor that will be multilied to scale. + Default: 1.0 + + Returns: + tuple: A tuple containing center and scale. + - np.ndarray[float32]: Center (x, y) of the bbox in shape (2,) or + (n, 2) + - np.ndarray[float32]: Scale (w, h) of the bbox in shape (2,) or + (n, 2) + """ + # convert single bbox from (4, ) to (1, 4) + dim = bbox.ndim + if dim == 1: + bbox = bbox[None, :] + + # get bbox center and scale + x1, y1, x2, y2 = np.hsplit(bbox, [1, 2, 3]) + center = np.hstack([x1 + x2, y1 + y2]) * 0.5 + scale = np.hstack([x2 - x1, y2 - y1]) * padding + + if dim == 1: + center = center[0] + scale = scale[0] + + return center, scale + + +def _fix_aspect_ratio(bbox_scale: np.ndarray, + aspect_ratio: float) -> np.ndarray: + """Extend the scale to match the given aspect ratio. + + Args: + scale (np.ndarray): The image scale (w, h) in shape (2, ) + aspect_ratio (float): The ratio of ``w/h`` + + Returns: + np.ndarray: The reshaped image scale in (2, ) + """ + w, h = np.hsplit(bbox_scale, [1]) + bbox_scale = np.where(w > h * aspect_ratio, + np.hstack([w, w / aspect_ratio]), + np.hstack([h * aspect_ratio, h])) + return bbox_scale + + +def _rotate_point(pt: np.ndarray, angle_rad: float) -> np.ndarray: + """Rotate a point by an angle. + + Args: + pt (np.ndarray): 2D point coordinates (x, y) in shape (2, ) + angle_rad (float): rotation angle in radian + + Returns: + np.ndarray: Rotated point in shape (2, ) + """ + sn, cs = np.sin(angle_rad), np.cos(angle_rad) + rot_mat = np.array([[cs, -sn], [sn, cs]]) + return rot_mat @ pt + + +def _get_3rd_point(a: np.ndarray, b: np.ndarray) -> np.ndarray: + """To calculate the affine matrix, three pairs of points are required. This + function is used to get the 3rd point, given 2D points a & b. + + The 3rd point is defined by rotating vector `a - b` by 90 degrees + anticlockwise, using b as the rotation center. + + Args: + a (np.ndarray): The 1st point (x,y) in shape (2, ) + b (np.ndarray): The 2nd point (x,y) in shape (2, ) + + Returns: + np.ndarray: The 3rd point. + """ + direction = a - b + c = b + np.r_[-direction[1], direction[0]] + return c + + +def get_warp_matrix(center: np.ndarray, + scale: np.ndarray, + rot: float, + output_size: Tuple[int, int], + shift: Tuple[float, float] = (0., 0.), + inv: bool = False) -> np.ndarray: + """Calculate the affine transformation matrix that can warp the bbox area + in the input image to the output size. + + Args: + center (np.ndarray[2, ]): Center of the bounding box (x, y). + scale (np.ndarray[2, ]): Scale of the bounding box + wrt [width, height]. + rot (float): Rotation angle (degree). + output_size (np.ndarray[2, ] | list(2,)): Size of the + destination heatmaps. + shift (0-100%): Shift translation ratio wrt the width/height. + Default (0., 0.). + inv (bool): Option to inverse the affine transform direction. + (inv=False: src->dst or inv=True: dst->src) + + Returns: + np.ndarray: A 2x3 transformation matrix + """ + shift = np.array(shift) + src_w = scale[0] + dst_w = output_size[0] + dst_h = output_size[1] + + # compute transformation matrix + rot_rad = np.deg2rad(rot) + src_dir = _rotate_point(np.array([0., src_w * -0.5]), rot_rad) + dst_dir = np.array([0., dst_w * -0.5]) + + # get four corners of the src rectangle in the original image + src = np.zeros((3, 2), dtype=np.float32) + src[0, :] = center + scale * shift + src[1, :] = center + src_dir + scale * shift + src[2, :] = _get_3rd_point(src[0, :], src[1, :]) + + # get four corners of the dst rectangle in the input image + dst = np.zeros((3, 2), dtype=np.float32) + dst[0, :] = [dst_w * 0.5, dst_h * 0.5] + dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dir + dst[2, :] = _get_3rd_point(dst[0, :], dst[1, :]) + + if inv: + warp_mat = cv2.getAffineTransform(np.float32(dst), np.float32(src)) + else: + warp_mat = cv2.getAffineTransform(np.float32(src), np.float32(dst)) + + return warp_mat + + +def top_down_affine(input_size: dict, bbox_scale: dict, bbox_center: dict, + img: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """Get the bbox image as the model input by affine transform. + + Args: + input_size (dict): The input size of the model. + bbox_scale (dict): The bbox scale of the img. + bbox_center (dict): The bbox center of the img. + img (np.ndarray): The original image. + + Returns: + tuple: A tuple containing center and scale. + - np.ndarray[float32]: img after affine transform. + - np.ndarray[float32]: bbox scale after affine transform. + """ + w, h = input_size + warp_size = (int(w), int(h)) + + # reshape bbox to fixed aspect ratio + bbox_scale = _fix_aspect_ratio(bbox_scale, aspect_ratio=w / h) + + # get the affine matrix + center = bbox_center + scale = bbox_scale + rot = 0 + warp_mat = get_warp_matrix(center, scale, rot, output_size=(w, h)) + + # do affine transform + img = cv2.warpAffine(img, warp_mat, warp_size, flags=cv2.INTER_LINEAR) + + return img, bbox_scale + + +def get_simcc_maximum(simcc_x: np.ndarray, + simcc_y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """Get maximum response location and value from simcc representations. + + Note: + instance number: N + num_keypoints: K + heatmap height: H + heatmap width: W + + Args: + simcc_x (np.ndarray): x-axis SimCC in shape (K, Wx) or (N, K, Wx) + simcc_y (np.ndarray): y-axis SimCC in shape (K, Wy) or (N, K, Wy) + + Returns: + tuple: + - locs (np.ndarray): locations of maximum heatmap responses in shape + (K, 2) or (N, K, 2) + - vals (np.ndarray): values of maximum heatmap responses in shape + (K,) or (N, K) + """ + N, K, Wx = simcc_x.shape + simcc_x = simcc_x.reshape(N * K, -1) + simcc_y = simcc_y.reshape(N * K, -1) + + # get maximum value locations + x_locs = np.argmax(simcc_x, axis=1) + y_locs = np.argmax(simcc_y, axis=1) + locs = np.stack((x_locs, y_locs), axis=-1).astype(np.float32) + max_val_x = np.amax(simcc_x, axis=1) + max_val_y = np.amax(simcc_y, axis=1) + + # get maximum value across x and y axis + mask = max_val_x > max_val_y + max_val_x[mask] = max_val_y[mask] + vals = max_val_x + locs[vals <= 0.] = -1 + + # reshape + locs = locs.reshape(N, K, 2) + vals = vals.reshape(N, K) + + return locs, vals + + +def decode(simcc_x: np.ndarray, simcc_y: np.ndarray, + simcc_split_ratio) -> Tuple[np.ndarray, np.ndarray]: + """Modulate simcc distribution with Gaussian. + + Args: + simcc_x (np.ndarray[K, Wx]): model predicted simcc in x. + simcc_y (np.ndarray[K, Wy]): model predicted simcc in y. + simcc_split_ratio (int): The split ratio of simcc. + + Returns: + tuple: A tuple containing center and scale. + - np.ndarray[float32]: keypoints in shape (K, 2) or (n, K, 2) + - np.ndarray[float32]: scores in shape (K,) or (n, K) + """ + keypoints, scores = get_simcc_maximum(simcc_x, simcc_y) + keypoints /= simcc_split_ratio + + return keypoints, scores + + +def inference_pose(session, out_bbox, oriImg): + model_input_size = (288, 384) + resized_img, center, scale = preprocess(oriImg, out_bbox, model_input_size) + outputs = inference(session, resized_img) + keypoints, scores = postprocess(outputs, model_input_size, center, scale) + + return keypoints, scores \ No newline at end of file diff --git a/annotator/openpose/types.py b/annotator/openpose/types.py new file mode 100644 index 000000000..e521e65dc --- /dev/null +++ b/annotator/openpose/types.py @@ -0,0 +1,29 @@ +from typing import NamedTuple, List, Optional + +class Keypoint(NamedTuple): + x: float + y: float + score: float = 1.0 + id: int = -1 + + +class BodyResult(NamedTuple): + # Note: Using `Optional` instead of `|` operator as the ladder is a Python + # 3.10 feature. + # Annotator code should be Python 3.8 Compatible, as controlnet repo uses + # Python 3.8 environment. + # https://github.com/lllyasviel/ControlNet/blob/d3284fcd0972c510635a4f5abe2eeb71dc0de524/environment.yaml#L6 + keypoints: List[Optional[Keypoint]] + total_score: float = 0.0 + total_parts: int = 0 + + +HandResult = List[Keypoint] +FaceResult = List[Keypoint] + + +class PoseResult(NamedTuple): + body: BodyResult + left_hand: Optional[HandResult] + right_hand: Optional[HandResult] + face: Optional[FaceResult] diff --git a/annotator/openpose/util.py b/annotator/openpose/util.py index 9175b8e47..00a88084e 100644 --- a/annotator/openpose/util.py +++ b/annotator/openpose/util.py @@ -2,7 +2,7 @@ import numpy as np import matplotlib import cv2 -from typing import List, Tuple, Union +from typing import List, Tuple, Union, Optional from .body import BodyResult, Keypoint @@ -67,6 +67,17 @@ def transfer(model, model_weights): return transfered_model_weights +def is_normalized(keypoints: List[Optional[Keypoint]]) -> bool: + point_normalized = [ + 0 <= abs(k.x) <= 1 and 0 <= abs(k.y) <= 1 + for k in keypoints + if k is not None + ] + if not point_normalized: + return False + return all(point_normalized) + + def draw_bodypose(canvas: np.ndarray, keypoints: List[Keypoint]) -> np.ndarray: """ Draw keypoints and limbs representing body pose on a given canvas. @@ -81,7 +92,11 @@ def draw_bodypose(canvas: np.ndarray, keypoints: List[Keypoint]) -> np.ndarray: Note: The function expects the x and y coordinates of the keypoints to be normalized between 0 and 1. """ - H, W, C = canvas.shape + if not is_normalized(keypoints): + H, W = 1.0, 1.0 + else: + H, W, _ = canvas.shape + stickwidth = 4 limbSeq = [ @@ -142,7 +157,10 @@ def draw_handpose(canvas: np.ndarray, keypoints: Union[List[Keypoint], None]) -> if not keypoints: return canvas - H, W, C = canvas.shape + if not is_normalized(keypoints): + H, W = 1.0, 1.0 + else: + H, W, _ = canvas.shape edges = [[0, 1], [1, 2], [2, 3], [3, 4], [0, 5], [5, 6], [6, 7], [7, 8], [0, 9], [9, 10], \ [10, 11], [11, 12], [0, 13], [13, 14], [14, 15], [15, 16], [0, 17], [17, 18], [18, 19], [19, 20]] @@ -190,7 +208,11 @@ def draw_facepose(canvas: np.ndarray, keypoints: Union[List[Keypoint], None]) -> if not keypoints: return canvas - H, W, C = canvas.shape + if not is_normalized(keypoints): + H, W = 1.0, 1.0 + else: + H, W, _ = canvas.shape + for keypoint in keypoints: if keypoint is None: continue diff --git a/annotator/openpose/wholebody.py b/annotator/openpose/wholebody.py new file mode 100644 index 000000000..34d96486f --- /dev/null +++ b/annotator/openpose/wholebody.py @@ -0,0 +1,87 @@ +# Copyright (c) OpenMMLab. All rights reserved. +import cv2 +import numpy as np + +from .cv_ox_det import inference_detector +from .cv_ox_pose import inference_pose + +from typing import List, Optional +from .types import PoseResult, BodyResult, Keypoint + + +class Wholebody: + def __init__(self, onnx_det: str, onnx_pose: str): + # Always loads to CPU to avoid building OpenCV. + device = 'cpu' + backend = cv2.dnn.DNN_BACKEND_OPENCV if device == 'cpu' else cv2.dnn.DNN_BACKEND_CUDA + # You need to manually build OpenCV through cmake to work with your GPU. + providers = cv2.dnn.DNN_TARGET_CPU if device == 'cpu' else cv2.dnn.DNN_TARGET_CUDA + + self.session_det = cv2.dnn.readNetFromONNX(onnx_det) + self.session_det.setPreferableBackend(backend) + self.session_det.setPreferableTarget(providers) + + self.session_pose = cv2.dnn.readNetFromONNX(onnx_pose) + self.session_pose.setPreferableBackend(backend) + self.session_pose.setPreferableTarget(providers) + + def __call__(self, oriImg): + det_result = inference_detector(self.session_det, oriImg) + keypoints, scores = inference_pose(self.session_pose, det_result, oriImg) + + keypoints_info = np.concatenate( + (keypoints, scores[..., None]), axis=-1) + # compute neck joint + neck = np.mean(keypoints_info[:, [5, 6]], axis=1) + # neck score when visualizing pred + neck[:, 2:4] = np.logical_and( + keypoints_info[:, 5, 2:4] > 0.3, + keypoints_info[:, 6, 2:4] > 0.3).astype(int) + new_keypoints_info = np.insert( + keypoints_info, 17, neck, axis=1) + mmpose_idx = [ + 17, 6, 8, 10, 7, 9, 12, 14, 16, 13, 15, 2, 1, 4, 3 + ] + openpose_idx = [ + 1, 2, 3, 4, 6, 7, 8, 9, 10, 12, 13, 14, 15, 16, 17 + ] + new_keypoints_info[:, openpose_idx] = \ + new_keypoints_info[:, mmpose_idx] + keypoints_info = new_keypoints_info + + return keypoints_info + + @staticmethod + def format_result(keypoints_info: np.ndarray) -> List[PoseResult]: + def format_keypoint_part( + part: np.ndarray, + ) -> Optional[List[Optional[Keypoint]]]: + keypoints = [ + Keypoint(x, y, score, i) if score >= 0.3 else None + for i, (x, y, score) in enumerate(part) + ] + return ( + None if all(keypoint is None for keypoint in keypoints) else keypoints + ) + + def total_score(keypoints: Optional[List[Optional[Keypoint]]]) -> float: + return ( + sum(keypoint.score for keypoint in keypoints if keypoint is not None) + if keypoints is not None + else 0.0 + ) + + pose_results = [] + + for instance in keypoints_info: + body_keypoints = format_keypoint_part(instance[:18]) or ([None] * 18) + left_hand = format_keypoint_part(instance[92:113]) + right_hand = format_keypoint_part(instance[113:134]) + face = format_keypoint_part(instance[24:92]) + + body = BodyResult( + body_keypoints, total_score(body_keypoints), len(body_keypoints) + ) + pose_results.append(PoseResult(body, left_hand, right_hand, face)) + + return pose_results diff --git a/install.py b/install.py index b3c25183c..abbce3e4f 100644 --- a/install.py +++ b/install.py @@ -17,4 +17,4 @@ launch.run_pip(f"install {package}", f"sd-webui-controlnet requirement: {package}") except Exception as e: print(e) - print(f'Warning: Failed to install {package}, some preprocessors may not work.') \ No newline at end of file + print(f'Warning: Failed to install {package}, some preprocessors may not work.') diff --git a/scripts/controlnet_version.py b/scripts/controlnet_version.py index b651d0a79..9753c3046 100644 --- a/scripts/controlnet_version.py +++ b/scripts/controlnet_version.py @@ -1,4 +1,4 @@ -version_flag = 'v1.1.234' +version_flag = 'v1.1.237' from scripts.logging import logger diff --git a/scripts/global_state.py b/scripts/global_state.py index ca17bfbaf..98afbbb61 100644 --- a/scripts/global_state.py +++ b/scripts/global_state.py @@ -62,6 +62,7 @@ def unified_preprocessor(preprocessor_name: str, *args, **kwargs): "openpose_face": functools.partial(g_openpose_model.run_model, include_body=True, include_hand=False, include_face=True), "openpose_faceonly": functools.partial(g_openpose_model.run_model, include_body=False, include_hand=False, include_face=True), "openpose_full": functools.partial(g_openpose_model.run_model, include_body=True, include_hand=True, include_face=True), + "dw_openpose_full": functools.partial(g_openpose_model.run_model, include_body=True, include_hand=True, include_face=True, use_dw_pose=True), "clip_vision": clip, "color": color, "pidinet": pidinet, @@ -107,6 +108,7 @@ def unified_preprocessor(preprocessor_name: str, *args, **kwargs): "openpose_hand": g_openpose_model.unload, "openpose_face": g_openpose_model.unload, "openpose_full": g_openpose_model.unload, + "dw_openpose_full": g_openpose_model.unload, "segmentation": unload_uniformer, "depth_zoe": unload_zoe_depth, "normal_bae": unload_normal_bae, diff --git a/scripts/processor.py b/scripts/processor.py index 12dae6bc5..372b6237e 100644 --- a/scripts/processor.py +++ b/scripts/processor.py @@ -229,6 +229,7 @@ def run_model( include_body: bool, include_hand: bool, include_face: bool, + use_dw_pose: bool = False, json_pose_callback: Callable[[str], None] = None, res: int = 512, **kwargs # Ignore rest of kwargs @@ -253,6 +254,7 @@ def run_model( include_body=include_body, include_hand=include_hand, include_face=include_face, + use_dw_pose=use_dw_pose, json_pose_callback=json_pose_callback )), True @@ -688,6 +690,14 @@ def shuffle(img, res=512, **kwargs): "value": 512 } ], + "dw_openpose_full": [ + { + "name": flag_preprocessor_resolution, + "min": 64, + "max": 2048, + "value": 512 + } + ], "segmentation": [ { "name": flag_preprocessor_resolution, diff --git a/tests/annotator_tests/openpose_tests/openpose_e2e_test.py b/tests/annotator_tests/openpose_tests/openpose_e2e_test.py index 1479e1efb..b078a15bc 100644 --- a/tests/annotator_tests/openpose_tests/openpose_e2e_test.py +++ b/tests/annotator_tests/openpose_tests/openpose_e2e_test.py @@ -90,6 +90,19 @@ def test_all(self): ), overwrite_expectation=False ) + + def test_dw(self): + self.template( + test_image = f'{TestOpenposeDetector.image_path}/woman.jpeg', + expected_image = f'{TestOpenposeDetector.image_path}/expected_woman_dw_all_output.png', + detector_config=dict( + include_body=True, + include_face=True, + include_hand=True, + use_dw_pose=True, + ), + overwrite_expectation=False, + ) if __name__ == '__main__': unittest.main() \ No newline at end of file diff --git a/tests/images/expected_woman_dw_all_output.png b/tests/images/expected_woman_dw_all_output.png new file mode 100644 index 000000000..2bcefcc5a Binary files /dev/null and b/tests/images/expected_woman_dw_all_output.png differ