I am doing a learning experiment: using a worker thread to do the frame capturing from web cam, then send the frames to the main thread to do a processing which is forwarding to a face detection model (yunet). I send the frames via queue.
The code is as follow:
class Camera(Thread):
def __init__(self, cap, frame_queue):
super().__init__()
self.frame_queue = frame_queue
self.cap = cap
self.running = True
def run(self):
while self.running:
has_frame,frame = self.cap.read()
# if not has_frame:
# self.running = False
# break
self.frame_queue.put_nowait(frame)
def stop(self):
self.cap.release()
self.running = False
def visualize(image, faces, print_flag=False):
output = image.copy()
# cv2.putText(output, 'FPS: {:.2f}'.format(fps), (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255))
for idx, face in enumerate(faces):
if print_flag:
print('Face {}, top-left coordinates: ({:.0f}, {:.0f}), box width: {:.0f}, box height {:.0f}, score: {:.2f}'.format(idx, face[0], face[1], face[2], face[3], face[-1]))
coords = face[:-1].astype(np.int32)
# Draw face bounding box
cv2.rectangle(output, (coords[0], coords[1]), (coords[0]+coords[2], coords[1]+coords[3]), (0, 255, 0), 2)
# Draw landmarks
cv2.circle(output, (coords[4], coords[5]), 2, (0, 0, 0), 2)
cv2.circle(output, (coords[6], coords[7]), 2, (0, 0, 0), 2)
cv2.circle(output, (coords[8], coords[9]), 2, (0, 255, 0), 2)
cv2.circle(output, (coords[10], coords[11]), 2, (255, 0, 255), 2)
cv2.circle(output, (coords[12], coords[13]), 2, (0, 255, 255), 2)
#check tilt
# angle = int(check_tilt(coords[4],coords[5],coords[6],coords[7])[0])
# print(angle)
# Put score
cv2.putText(output, '{:.4f}'.format(face[-1]), (coords[0], coords[1]+15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0))
# cv2.putText(output, str(angle), (coords[0], coords[1]+40),cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0))
def main():
yunet = cv.FaceDetectorYN.create(
model= "face_detection_yunet_2022mar.onnx",
config='',
input_size=(320, 320),
score_threshold=0.7,
nms_threshold=0.7,
top_k=5000,
backend_id=cv.dnn.DNN_BACKEND_DEFAULT,
target_id=cv.dnn.DNN_TARGET_CPU
)
device_id = 0
cap = cv.VideoCapture(device_id)
frame_w = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))
frame_h = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))
yunet.setInputSize([frame_w, frame_h])
frame_queue = Queue()
camera_thread = Camera(cap, frame_queue)
camera_thread.start()
# processing_thread = Detector(frame_queue, yunet)
# processing_thread.start()
tm = cv.TickMeter()
counter = 0
while cv.waitKey(1) < 0:
tm.start()
print(counter)
if frame_queue.empty():
continue
frame = frame_queue.get_nowait()
if frame is None:
continue
# has_frame, frame = cap.read()
# if not has_frame:
# print('No frames grabbed!')
_, faces = yunet.detect(frame) # # faces: None, or nx15 np.array
counter +=1
# print(faces)
tm.stop()
fps = tm.getFPS()
tm.reset()
if faces is not None:
frame = visualize(frame, faces, fps=fps)
cv.imshow('libfacedetection demo', frame)
camera_thread.stop()
camera_thread.join()
if __name__ == '__main__':
main()
And the problem is the showed video is lagging behind from what is happening in realtime, even though the fps is around 22ms. I think the Camera thread is slower than the main thread. Can someone point out what mistake that i have done? Thank you
System:
opencv 4.7.0
python 3.10.9
mac M1