import cv2 import time from hpe_mp_class import hpe_mp_class import json import argparse from pythonosc import udp_client import yaml # Arguments parser = argparse.ArgumentParser() parser.add_argument('--config_path', type=str, default="config.yml", help='path to config file yaml') args = parser.parse_args() config_path = args.config_path # Config parameters data_yaml = yaml.safe_load(open(config_path)) address_input = data_yaml["address_input"] for i in range(0, 100): if address_input == str(i): address_input = i scale_pose = data_yaml["scale_pose"] osc_address = data_yaml["osc_address"] osc_port = data_yaml["osc_port"] osc_message_address = data_yaml["osc_message_address"] output_method = data_yaml["output_method"] crop = data_yaml["crop_image"] mirror_image = data_yaml["mirror_image"] show_image = data_yaml["show_image"] apose = data_yaml["apose"] world = data_yaml["world"] old_world = data_yaml["old_world"] # OSC client client = udp_client.SimpleUDPClient(osc_address, osc_port) # Videocapture cap = cv2.VideoCapture(address_input) # Preprocessing parameters frame_width = int(crop*cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(crop*cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # FPS variables pTime = 0 cTime = 0 # Mediapie class mp_cl = hpe_mp_class() while True: # Reading frame success, img = cap.read() # Image preprocessing if crop != 1.0: img = cv2.resize(img, (frame_width, frame_height)) # Mediapipe mp_cl.process(img, scale_pose=scale_pose) mp_cl.show(img) # FPS cTime = time.time() fps = 1. / (cTime - pTime) pTime = cTime # Showing if show_image: if mirror_image: img = cv2.flip(img, 1) # mirror cv2.putText(img, str(int(fps)), (22, 32), cv2.FONT_HERSHEY_COMPLEX, 1, (0, 0, 0), 2) cv2.putText(img, str(int(fps)), (20, 30), cv2.FONT_HERSHEY_COMPLEX, 1, (255, 255, 255), 2) cv2.imshow("Main", img) # Output if output_method == 'file': # JSON res = mp_cl.getJSON(apose=apose, world=world, old_world=old_world) with open('hierarchy_data.json', 'w', encoding='utf-8') as f: json.dump(res, f, ensure_ascii=False, indent=4) else: # OSC res = mp_cl.getJSON(apose=apose, world=world, old_world=old_world) res_list = [] for val in res.keys(): stroka = str(val) for val2 in res[val]: if val2 == 'visible': stroka += " " + str(val2) + " " + str(res[val][val2]) # res_list.append(str(val) + " " + str(val2) + " " + str(res[val][val2])) else: for val3 in res[val][val2]: stroka += " " + str(val2) + "_" + str(val3) + " " + str(res[val][val2][val3]) # res_list.append(str(val) + " " + str(val2) + " " + str(val3) + " " + str(res[val][val2][val3])) res_list.append(stroka) client.send_message(osc_message_address, res_list) # Interface key = cv2.waitKey(1) if key == 27: break