Skip to content
Merged
33 changes: 30 additions & 3 deletions crates/enc-ffmpeg/src/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ pub struct EncoderBase {
first_pts: Option<i64>,
last_frame_pts: Option<i64>,
last_written_dts: Option<i64>,
/// One-packet reorder buffer: a packet is written once its successor is
/// known so a synthesized duration can be replaced with the real dts
/// delta. Fragmenting muxers place each fragment at the accumulated
/// duration of the previous one, and the last sample of a fragment uses
/// the packet duration verbatim — a nominal duration there collapses any
/// capture gap that lands on a fragment cut, playing post-gap content
/// during the gap. The bool records whether the duration was synthesized.
held_packet: Option<(Packet, bool)>,
}

impl EncoderBase {
Expand All @@ -23,6 +31,7 @@ impl EncoderBase {
last_frame_pts: None,
stream_index,
last_written_dts: None,
held_packet: None,
}
}

Expand Down Expand Up @@ -133,7 +142,8 @@ impl EncoderBase {
_ => {}
}

if self.packet.duration() <= 0
let duration_synthesized = self.packet.duration() <= 0;
if duration_synthesized
&& let Some(duration) = nominal_packet_duration(
output.stream(self.stream_index).unwrap().time_base(),
encoder.frame_rate(),
Expand Down Expand Up @@ -161,7 +171,18 @@ impl EncoderBase {
}

self.last_written_dts = self.packet.dts();
self.packet.write_interleaved(output)?;

let current = std::mem::replace(&mut self.packet, Packet::empty());
if let Some((mut previous, previous_synthesized)) = self.held_packet.take() {
if previous_synthesized
&& let (Some(prev_dts), Some(cur_dts)) = (previous.dts(), current.dts())
&& cur_dts > prev_dts
{
previous.set_duration(cur_dts - prev_dts);
}
previous.write_interleaved(output)?;
}
self.held_packet = Some((current, duration_synthesized));
}

Ok(())
Expand All @@ -174,7 +195,13 @@ impl EncoderBase {
) -> Result<(), ffmpeg::Error> {
encoder.send_eof()?;

self.process_packets(output, encoder)
self.process_packets(output, encoder)?;

if let Some((previous, _)) = self.held_packet.take() {
previous.write_interleaved(output)?;
}

Ok(())
}
}

Expand Down
116 changes: 110 additions & 6 deletions crates/enc-ffmpeg/src/mux/segmented_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ impl SegmentedVideoEncoder {
frame: frame::Video,
timestamp: Duration,
) -> Result<(), QueueFrameError> {
let is_first_frame = self.segment_start_time.is_none();
let segment_start = match self.segment_start_time {
Some(start) => start,
None => {
Expand All @@ -347,9 +346,10 @@ impl SegmentedVideoEncoder {
.queue_frame(frame, timestamp, &mut self.output)?;
self.frames_in_segment += 1;

if is_first_frame {
self.try_notify_init_segment();
}
// The encoder holds one packet back to stamp real durations, so the
// init segment materializes a frame later than the first queue call;
// keep trying until it lands (no-op once notified).
self.try_notify_init_segment();

if !self.pending_segment_indices.is_empty() {
self.frames_since_pending_flush += 1;
Expand Down Expand Up @@ -1071,6 +1071,103 @@ mod tests {
}
}

// A capture gap whose post-gap frame lands on a segment cut: the dash
// muxer anchors each fragment at the accumulated duration of the previous
// one, and the last sample of a fragment takes its packet duration
// verbatim. Without real packet durations the first post-gap frame is
// pulled back onto the pre-gap timeline (one frame period after the last
// pre-gap frame) and its content plays DURING the gap — a multi-second
// desync for that frame and a collapsed hold for the viewer.
#[test]
fn gap_crossing_segment_cut_preserves_post_gap_pts() {
ffmpeg::init().ok();

let temp = tempfile::tempdir().unwrap();
let base_path = temp.path().to_path_buf();

let mut encoder = SegmentedVideoEncoder::init(
base_path.clone(),
test_video_info(),
SegmentedVideoEncoderConfig {
segment_duration: Duration::from_millis(500),
..Default::default()
},
)
.unwrap();

// ~30fps up to 0.4s, a 2.6s gap, then more frames. The post-gap
// frame is forced to a keyframe so the dash muxer cuts the segment
// exactly at the gap — the shape that loses the gap.
let pre_gap_ms: Vec<u64> = (0..12).map(|i| i * 33).collect();
let post_gap_ms: Vec<u64> = (0..12).map(|i| 3000 + i * 33).collect();
for &ts_ms in &pre_gap_ms {
let frame = create_test_frame(320, 240);
encoder
.queue_frame(frame, Duration::from_millis(ts_ms))
.unwrap();
}
for (i, &ts_ms) in post_gap_ms.iter().enumerate() {
let mut frame = create_test_frame(320, 240);
if i == 0 {
frame.set_kind(ffmpeg::picture::Type::I);
}
encoder
.queue_frame(frame, Duration::from_millis(ts_ms))
.unwrap();
}

encoder.finish().unwrap();

let mut segment_paths: Vec<PathBuf> = std::fs::read_dir(&base_path)
.unwrap()
.filter_map(|e| e.ok().map(|e| e.path()))
.filter(|p| p.extension().is_some_and(|ext| ext == "m4s"))
.collect();
segment_paths.sort();

let concat_path = base_path.join("concat_test.mp4");
let mut concatenated = std::fs::read(base_path.join(INIT_SEGMENT_NAME)).unwrap();
for segment in &segment_paths {
concatenated.extend(std::fs::read(segment).unwrap());
}
std::fs::write(&concat_path, concatenated).unwrap();

let mut input = format::input(&concat_path).unwrap();
let stream_index = input
.streams()
.best(ffmpeg::media::Type::Video)
.unwrap()
.index();
let time_base = input.stream(stream_index).unwrap().time_base();
let tb = time_base.numerator() as f64 / time_base.denominator() as f64;

let mut pts_secs: Vec<f64> = input
.packets()
.filter_map(|(stream, packet)| {
(stream.index() == stream_index)
.then_some(packet.pts())
.flatten()
})
.map(|pts| pts as f64 * tb)
.collect();
pts_secs.sort_by(|a, b| a.partial_cmp(b).unwrap());

assert_eq!(pts_secs.len(), pre_gap_ms.len() + post_gap_ms.len());

let expected: Vec<f64> = pre_gap_ms
.iter()
.chain(post_gap_ms.iter())
.map(|&ms| ms as f64 / 1000.0)
.collect();
for (pts, expected) in pts_secs.iter().zip(&expected) {
assert!(
(pts - expected).abs() < 0.04,
"encoded pts {pts:.3}s should match capture timestamp {expected:.3}s \
(all pts: {pts_secs:?})"
);
}
}

#[test]
fn manifest_updated_on_segment_boundary() {
ffmpeg::init().ok();
Expand Down Expand Up @@ -1126,8 +1223,15 @@ mod tests {
)
.unwrap();

let frame = create_test_frame(320, 240);
encoder.queue_frame(frame, Duration::ZERO).unwrap();
// The encoder holds one packet to stamp real durations (and hardware
// encoders add their own delay), so the init segment lands once
// enough frames have pushed the first packet through.
for i in 0..10u64 {
let frame = create_test_frame(320, 240);
encoder
.queue_frame(frame, Duration::from_millis(i * 33))
.unwrap();
}

assert!(encoder.validate_init_segment().is_ok());
}
Expand Down
Loading
Loading