Skip to content

Commit

Permalink
#799 add more logs to savant_rs_video_demux
Browse files Browse the repository at this point in the history
  • Loading branch information
tomskikh committed Jun 28, 2024
1 parent 39f4a05 commit 9ec2301
Showing 1 changed file with 63 additions and 14 deletions.
77 changes: 63 additions & 14 deletions gst_plugins/python/savant_rs_video_demux.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ def handle_buffer(
)
return Gst.FlowReturn.OK

self.logger.debug('Getting video frame for frame %s', savant_frame_meta.idx)
video_frame, _ = self.video_pipeline.get_independent_frame(
savant_frame_meta.idx
)
Expand All @@ -280,9 +281,13 @@ def handle_buffer(
video_frame.source_id,
'is' if video_frame.keyframe else 'is not',
)
self.logger.debug(
'Moving frame %s to %s', savant_frame_meta.idx, self.pipeline_stage_name
)
self.video_pipeline.move_as_is(
self.pipeline_stage_name, [savant_frame_meta.idx]
)
self.logger.debug('Checking if demuxer is running')
if not self.is_running:
self.logger.info(
'Demuxer is not running. Skipping buffer with timestamp %s.',
Expand All @@ -291,6 +296,7 @@ def handle_buffer(
self.video_pipeline.delete(savant_frame_meta.idx)
return Gst.FlowReturn.OK

self.logger.debug('Checking if source %s is blacklisted', video_frame.source_id)
if self.zeromq_reader is not None and self.zeromq_reader.is_blacklisted(
video_frame.source_id.encode()
):
Expand All @@ -301,31 +307,48 @@ def handle_buffer(
)
self.video_pipeline.delete(savant_frame_meta.idx)
return Gst.FlowReturn.OK
self.logger.debug('Source %s is not blacklisted', video_frame.source_id)

self.logger.debug('Building frame info for frame %s', savant_frame_meta.idx)
frame_info = FrameInfo.build(savant_frame_meta.idx, video_frame)

self.logger.debug('Waiting for shared sources lock')
with self.source_lock:
self.logger.debug('Got shared sources lock')
res = self._get_source_info(frame_info, buffer)
if not isinstance(res, SourceInfo):
self.video_pipeline.delete(frame_info.idx)
return res
source_info = res
source_info.locked.set()
source_info.timestamp = time.time()
self.logger.debug('Shared sources lock released')

self.logger.debug('Waiting for source %s lock', source_info.source_id)
with source_info.lock():
self.logger.debug('Got source %s lock', source_info.source_id)
self.logger.debug(
'Check if frame prams are changed for source %s', source_info.source_id
)
if (
source_info.src_pad is not None
and source_info.params != frame_info.params
):
self.update_frame_params(source_info, frame_info.params)
self.logger.debug('Check timestamps for source %s', source_info.source_id)
if source_info.src_pad is not None:
self.check_timestamps(source_info, buffer)
self.logger.debug('Saving last PTS for source %s', source_info.source_id)
if buffer.pts != Gst.CLOCK_TIME_NONE:
source_info.last_pts = buffer.pts
self.logger.debug('Saving last DTS for source %s', source_info.source_id)
if buffer.dts != Gst.CLOCK_TIME_NONE:
source_info.last_dts = buffer.dts
self.logger.debug('Check if source %s has src pad', source_info.source_id)
if source_info.src_pad is None:
self.logger.debug(
'Source %s has no src pad, adding it', source_info.source_id
)
if video_frame.keyframe:
self.add_source(video_frame.source_id, source_info)
else:
Expand All @@ -346,6 +369,16 @@ def handle_buffer(
)
result: Gst.FlowReturn = source_info.src_pad.push(buffer)

self.logger.debug(
'Frame with IDX %s and PTS %s from source %s has been pushed: %s',
frame_info.idx,
buffer.pts,
video_frame.source_id,
result,
)

self.logger.debug('Source %s lock released', source_info.source_id)

if result not in [
Gst.FlowReturn.OK,
Gst.FlowReturn.FLUSHING,
Expand All @@ -354,6 +387,14 @@ def handle_buffer(
Gst.FlowReturn.CUSTOM_SUCCESS_1,
Gst.FlowReturn.CUSTOM_SUCCESS_2,
]:

self.logger.debug(
'Failed to push frame with IDX %s and PTS %s from source %s: %s',
frame_info.idx,
buffer.pts,
video_frame.source_id,
result,
)
if self.zeromq_reader is not None:
self.logger.debug(
'Blacklisting source %s due to error %s',
Expand All @@ -362,10 +403,18 @@ def handle_buffer(
)
self.zeromq_reader.blacklist_source(video_frame.source_id.encode())
self.video_pipeline.delete(savant_frame_meta.idx)
self.logger.debug(
'Waiting for source %s lock [B]', source_info.source_id
)
with source_info.lock():
self.logger.debug('Got source %s lock [B]', source_info.source_id)
self.remove_source(source_info, send_eos=False)
self.logger.debug('Source %s lock released [B]', source_info.source_id)
self.logger.debug('Waiting for shared sources lock [B]')
with self.source_lock:
self.logger.debug('Got shared sources lock [B]')
del self.sources[source_info.source_id]
self.logger.debug('Shared sources lock released [B]')
result = Gst.FlowReturn.OK
else:
self.logger.error(
Expand Down Expand Up @@ -510,22 +559,15 @@ def add_source(self, source_id: str, source_info: SourceInfo):
def update_frame_params(self, source_info: SourceInfo, frame_params: FrameParams):
"""Handle changed frame parameters on a source."""

if source_info.params != frame_params:
self.logger.info(
'Frame parameters on pad %s was changed from %s to %s',
source_info.src_pad.get_name(),
source_info.params,
frame_params,
)
source_info.params = frame_params
self.remove_source(source_info, send_eos=True)
return

caps = build_caps(frame_params)
source_info.src_pad.push_event(Gst.Event.new_caps(caps))
self.logger.info(
'Caps on pad %s changed to %s', source_info.src_pad, caps.to_string()
'Frame parameters on pad %s was changed from %s to %s',
source_info.src_pad.get_name(),
source_info.params,
frame_params,
)
source_info.params = frame_params
self.remove_source(source_info, send_eos=True)
return

def check_timestamps(self, source_info: SourceInfo, buffer: Gst.Buffer):
"""Check frame timestamps (PTS and DTS).
Expand Down Expand Up @@ -601,6 +643,13 @@ def eviction_loop(self):
)
time.sleep(self.source_eviction_interval)

def frame_params_equal(self, a: FrameParams, b: FrameParams) -> bool:
if a.codec != b.codec:
return False
if a.codec in [Codec.JPEG, Codec.PNG]:
return True
return a == b


# register plugin
GObject.type_register(SavantRsVideoDemux)
Expand Down

0 comments on commit 9ec2301

Please sign in to comment.