Question
Frame Drops When Increasing Number of RTSP Streams
Hello,
I am currently working on a project that involves processing multiple RTSP streams to count the number of vehicles. I am using the following technologies and libraries in my code:
aiokafka: To send frames to a Kafka topic
cv2 (OpenCV): To capture frames from RTSP streams
asyncio: To handle asynchronous tasks of handling each stream (used asyncio.create_task
for stream processes handling and later asyncio.gather(**tasks)
)
ThreadPoolExecutor
: For processing frames concurrently for each stream. I need to process each frame before sending to my kafka topic. I have otimized the processing part as much as possible. process_frame is being called by the ThreadPoolExecutor for every frame in a while True loop.
The issue I am facing is that when I increase the number of streams beyond a certain point, I experience significant frame drops. Note that I dont want to skip frmes intenstionally since y model heavily relies on weach frame for the usecase that I am implementing. Here are the key parts of my implementation:
Stream Processing: Using cv2.VideoCapture
to read frames and aiokafkaAIOKafkaProducer
to send frames to Kafka.
Concurrency: Utilizing asyncio with ThreadPoolExecutor to handle multiple streams concurrently.I’ve lready tried increasing the number of workers.
Problem
When the number of streams is increased, I encounter frame drops. I suspect this could be due to limitations in concurrency handling, processing power, or network bandwidth.
Request
I would greatly appreciate any advice or suggestions on how to address the frame drop issue when handling multiple RTSP streams. Specifically, I am looking for recommendations on:
Optimizing concurrency and frame processing
Efficiently handling high-throughput streaming with minimal frame loss.
Any other potential improvements in my current setup.
Thank you for your assistance!
P.S. My code looks like this:
1. Load environment variables (HOST, USER, PASSWORD, DATABASE, SAVE_PATH, STREAMS_TO_TEST)
2. Define a function to create a table in the MySQL database:
- Connect to the MySQL database
- Create a cursor
- Execute SQL query to create table if it doesn't exist
- Commit changes
3. Define a function to start a connection to the MySQL database:
- Connect to the MySQL database using credentials from environment variables
- Return the connection
4. Define a function to initialize a logger for a stream:
- Check if the stream directory exists, create if it doesn't
- Create a logger instance for the stream
- Set the logging level to DEBUG
- Define a log file path
- Create a file handler for the logger
- Set the logging level and formatter for the handler
- Add the handler to the logger
- Return the logger
5. Define an asynchronous function to process a stream:
- Define a function to process a frame:
- Encode the frame as JPEG
- Convert the encoded frame to base64 string
- Return the base64 string
- Define an asynchronous function to send a frame:
- Create a message dictionary with frame data and frame count
- Send the message to Kafka
- Initialize frame count and start time
- Open the RTSP stream
- Check if the stream opened successfully, log an error if not
- Create an event loop and a thread pool executor
- Loop to read and process frames:
- Read a frame using the executor
- Process the frame to base64 using the executor
- Send the frame to Kafka
- Increment the frame count
- Break the loop if duration is exceeded
- Release the video capture
- Log the total frames sent, elapsed time, and average FPS
- Handle exceptions and log errors
6. Define the main asynchronous function:
- Read RTSP stream URLs and locations from a CSV file
- Initialize lists for loggers, working streams, and working locations
- Loop through streams and locations:
- Check if the maximum number of test cases is reached
- Try to open the RTSP stream
- If the stream is opened successfully:
- Increment the test cases count
- Add the stream and location to the working lists
- Create a logger for the stream and add it to the loggers list
- Initialize a Kafka producer
- Start the Kafka producer
- Create a list of tasks to process each stream
- Gather all tasks and wait for them to complete
- Stop the Kafka producer
7. Run the main function using asyncio