import time
import cv2
import threading
import requests
import numpy as np
# Constant
VIDEO_FRAMES_TO_SKIP = 5 # the no. of frames we skip for each frame we will process
VIRTUAL_SENSOR_IP_ADDRESS = "192.168.0.135"
VIDEO_URL_USERNAME = "admin"
VIDEO_URL_PASSWORD = "admin"
VIDEO_STREAM_URL = f"http://{VIDEO_URL_USERNAME}:{VIDEO_URL_PASSWORD}@10.76.13.21:8081/video"
# Global variables for communication between threads
lock = threading.Lock()
face_location = None
# Function to detect faces in a frame and update the face_location variable
def detect_faces(frame):
global face_location
# Load the pre-trained deep learning face detector model
face_detector = cv2.dnn.readNetFromCaffe(
"deploy.prototxt",
"res10_300x300_ssd_iter_140000_fp16.caffemodel"
)
# Resize the frame for processing
(h, w) = frame.shape[:2]
blob = cv2.dnn.blobFromImage(cv2.resize(frame, (300, 300)), 1.0, (300, 300), (104.0, 177.0, 123.0))
# Pass the blob through the network and obtain the face detections
face_detector.setInput(blob)
detections = face_detector.forward()
with lock:
face_location = None
# Loop over the face detections and update the face_location variable with the location of the first detected face
for i in range(detections.shape[2]):
confidence = detections[0, 0, i, 2]
if confidence > 0.5:
box = detections[0, 0, i, 3:7] * np.array([frame.shape[1], frame.shape[0], frame.shape[1], frame.shape[0]])
(startX, startY, endX, endY) = box.astype("int")
# Draw the bounding box and label on the image
cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2)
# Calculate x_percent and y_percent relative to frame size
face_center = (int((startX + endX) / 2), int((startY + endY) / 2))
x_percent = int((face_center[0] / w) * 100)
y_percent = int((face_center[1] / h) * 100)
# Write the (x, y) percentage location on the image
cv2.putText(frame, f"{confidence*100:.2f}% ({x_percent}%, {y_percent}%)", (startX, startY - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
with lock:
face_location = (x_percent, y_percent)
# Function to send an HTTP request with the location of the detected face
def send_request():
global face_location
while True:
with lock:
x_percent = 0
if face_location is not None:
x_percent, y_percent = face_location
url = f"http://{VIRTUAL_SENSOR_IP_ADDRESS}/set?t=c&r="
myUrl = f"{url}{round(x_percent * 1024 / 100)}"
#print(f"Sending request to {myUrl}")
requests.get(myUrl)
face_location = None
time.sleep(0.5)
# Open the default camera
camera = cv2.VideoCapture(VIDEO_STREAM_URL)
# Start the secondary thread to send HTTP requests
thread = threading.Thread(target=send_request)
thread.daemon = True
thread.start()
frameCount = 0
# Loop over the frames from the camera
while True:
# Read a frame from the camera
ret, frame = camera.read()
frameCount = frameCount + 1
if (frameCount % VIDEO_FRAMES_TO_SKIP) != 0:
#print( f"{frameCount} Skip" )
continue
frameCount = 0
# Flip the image so that it will display as a mirror image
flipped_frame = cv2.flip(frame, 1)
# All the heavy duty processing is done here
detect_faces( flipped_frame )
# Display the output frame
cv2.imshow("Output", flipped_frame)
# Check for the "q" key to quit
if cv2.waitKey(1) == ord("q"):
break
# Clean up
camera.release()
cv2.destroyAllWindows()