Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 38 additions & 37 deletions pycaption/filtergraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@
import zipfile
from io import BytesIO

from PIL import Image

from pycaption.base import CaptionSet
from pycaption.subtitler_image_based import SubtitleImageBasedWriter


class FiltergraphWriter(SubtitleImageBasedWriter):
"""
FFmpeg filtergraph writer for image-based subtitles.
FFmpeg subtitle image writer using concat demuxer.

Generates PNG subtitle images and an FFmpeg filtergraph that can be used
to create a transparent WebM video with subtitle overlays.
Generates PNG subtitle images and an FFmpeg concat demuxer file that
sequences blank (transparent) and subtitle images with proper timing.
The concat file can be fed directly to ffmpeg as an input.

By default, generates Full HD (1920x1080) images. The filtergraph uses
the overlay filter with timing to display each subtitle at the correct time.
By default, generates Full HD (1920x1080) images.

Uses PNG format for images with 4-color indexed palette for optimal
compression (~6 KB per Full HD image).
Expand Down Expand Up @@ -62,12 +64,15 @@ def write(
align='center'
):
"""
Write captions as PNG images with an FFmpeg filtergraph for creating
a transparent WebM video overlay.
Write captions as PNG images with an FFmpeg concat demuxer file.

Returns a ZIP file containing:
- PNG subtitle images in the specified image_dir
- filtergraph.txt: FFmpeg filter_complex script
- PNG subtitle images in the specified output_dir
- blank.png: Transparent image for gaps between subtitles
- concat.txt: FFmpeg concat demuxer file with timing

The concat.txt can be used directly as ffmpeg input:
ffmpeg -f concat -safe 0 -i {output_dir}/concat.txt ...

:param caption_set: CaptionSet containing the captions to write
:param position: Position of subtitles ('top', 'bottom', 'source')
Expand All @@ -84,37 +89,32 @@ def write(
caps, lang, tmpDir, position, align, avoid_same_next_start_prev_end
)

# Calculate total duration (last end time)
max_end = max(cap_list[0].end for cap_list in caps_final)
duration_seconds = max_end / 1_000_000 + 1 # Add 1 second buffer

# Build FFmpeg filtergraph
# Start with transparent base
filter_parts = []
filter_parts.append(
f"color=c=black@0:s={self.video_width}x{self.video_height}:d={duration_seconds:.3f},format=yuva444p[base]"
)

# Load each image (paths relative to where ffmpeg is run)
for i in range(1, len(caps_final) + 1):
filter_parts.append(
f"movie={self.output_dir}/subtitle{i:04d}.png,format=yuva444p[s{i}]"
)
# Create blank transparent image for gaps between subtitles
blank = Image.new('RGBA', (self.video_width, self.video_height), (0, 0, 0, 0))
blank.save(tmpDir + '/blank.png', optimize=True, compress_level=9)

# Chain overlays
prev_label = "base"
# Build concat demuxer file that sequences blank/subtitle images
concat_lines = ['ffconcat version 1.0']
prev_end_us = 0
for i, cap_list in enumerate(caps_final, 1):
start_sec = self.format_ts_seconds(cap_list[0].start)
end_sec = self.format_ts_seconds(cap_list[0].end)
next_label = f"v{i}" if i < len(caps_final) else "out"
start_us = cap_list[0].start
end_us = cap_list[0].end

# Gap before this subtitle
gap_us = start_us - prev_end_us
if gap_us > 0:
concat_lines.append('file blank.png')
concat_lines.append(f'duration {gap_us / 1_000_000:.3f}')

filter_parts.append(
f"[{prev_label}][s{i}]overlay=x=0:y=0:enable='between(t,{start_sec},{end_sec})':format=auto[{next_label}]"
)
prev_label = next_label
# Subtitle image
concat_lines.append(f'file subtitle{i:04d}.png')
concat_lines.append(f'duration {(end_us - start_us) / 1_000_000:.3f}')

filtergraph = ";\n".join(filter_parts)
prev_end_us = end_us

# Trailing blank so the last subtitle doesn't freeze on screen
concat_lines.append('file blank.png')
concat_text = '\n'.join(concat_lines)

# Create ZIP archive
with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf:
Expand All @@ -123,8 +123,9 @@ def write(
img_path = tmpDir + '/subtitle%04d.png' % i
zf.write(img_path, f'{self.output_dir}/subtitle{i:04d}.png')

# Add filtergraph
zf.writestr(f'{self.output_dir}/filtergraph.txt', filtergraph)
# Add blank image and concat file
zf.write(tmpDir + '/blank.png', f'{self.output_dir}/blank.png')
zf.writestr(f'{self.output_dir}/concat.txt', concat_text)

buf.seek(0)
return buf.read()
128 changes: 82 additions & 46 deletions tests/test_filtergraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


class TestFiltergraphWriterTestCase:
"""Tests for the FiltergraphWriter that generates FFmpeg filtergraphs."""
"""Tests for the FiltergraphWriter that generates FFmpeg concat demuxer files."""

def setup_method(self):
self.writer = FiltergraphWriter()
Expand All @@ -28,10 +28,11 @@ def test_zip_contents(self):
names = zf.namelist()
assert 'embedded_subs/subtitle0001.png' in names
assert 'embedded_subs/subtitle0002.png' in names
assert 'embedded_subs/filtergraph.txt' in names
assert 'embedded_subs/blank.png' in names
assert 'embedded_subs/concat.txt' in names

def test_filtergraph_structure(self):
"""Test that filtergraph has correct structure."""
def test_concat_header(self):
"""Test that concat file starts with ffconcat header."""
srt_content = """1
00:00:01,000 --> 00:00:04,000
Test
Expand All @@ -40,18 +41,42 @@ def test_filtergraph_structure(self):
zip_data = self.writer.write(caption_set)

with zipfile.ZipFile(BytesIO(zip_data), 'r') as zf:
filtergraph = zf.read('embedded_subs/filtergraph.txt').decode()
concat = zf.read('embedded_subs/concat.txt').decode()
assert concat.startswith('ffconcat version 1.0')

# Should have color source for transparent base
assert 'color=c=black@0:s=1920x1080' in filtergraph
assert 'format=yuva444p' in filtergraph
def test_concat_file_structure(self):
"""Test that concat demuxer file has correct timing entries."""
srt_content = """1
00:00:01,000 --> 00:00:04,000
First

# Should have movie input for image (path relative to ffmpeg working dir)
assert 'movie=embedded_subs/subtitle0001.png' in filtergraph
2
00:00:05,000 --> 00:00:08,000
Second
"""
caption_set = SRTReader().read(srt_content)
zip_data = self.writer.write(caption_set)

# Should have overlay with timing
assert 'overlay=x=0:y=0:enable=' in filtergraph
assert "between(t,1.000,4.000)" in filtergraph
with zipfile.ZipFile(BytesIO(zip_data), 'r') as zf:
concat = zf.read('embedded_subs/concat.txt').decode()

# Gap before first subtitle (1 second)
assert 'file blank.png\nduration 1.000' in concat

# First subtitle (3 seconds)
assert 'file subtitle0001.png\nduration 3.000' in concat

# Gap between subtitles (1 second)
lines = concat.split('\n')
idx_sub1 = next(i for i, l in enumerate(lines) if 'subtitle0001' in l)
assert lines[idx_sub1 + 2] == 'file blank.png'
assert lines[idx_sub1 + 3] == 'duration 1.000'

# Second subtitle (3 seconds)
assert 'file subtitle0002.png\nduration 3.000' in concat

# Trailing blank at the end
assert lines[-1] == 'file blank.png'

def test_custom_output_dir(self):
"""Test custom output directory."""
Expand All @@ -66,13 +91,11 @@ def test_custom_output_dir(self):
with zipfile.ZipFile(BytesIO(zip_data), 'r') as zf:
names = zf.namelist()
assert 'subs/subtitle0001.png' in names
assert 'subs/filtergraph.txt' in names
assert 'subs/blank.png' in names
assert 'subs/concat.txt' in names

filtergraph = zf.read('subs/filtergraph.txt').decode()
assert 'movie=subs/subtitle0001.png' in filtergraph

def test_multiple_overlays_chained(self):
"""Test that multiple subtitles are chained correctly."""
def test_multiple_subtitles_in_concat(self):
"""Test that multiple subtitles produce correct concat entries."""
srt_content = """1
00:00:01,000 --> 00:00:04,000
First
Expand All @@ -89,50 +112,63 @@ def test_multiple_overlays_chained(self):
zip_data = self.writer.write(caption_set)

with zipfile.ZipFile(BytesIO(zip_data), 'r') as zf:
filtergraph = zf.read('embedded_subs/filtergraph.txt').decode()

# Should have 3 movie inputs
assert 'movie=embedded_subs/subtitle0001.png' in filtergraph
assert 'movie=embedded_subs/subtitle0002.png' in filtergraph
assert 'movie=embedded_subs/subtitle0003.png' in filtergraph
concat = zf.read('embedded_subs/concat.txt').decode()

# Should have chained overlays
assert '[base][s1]overlay' in filtergraph
assert '[v1][s2]overlay' in filtergraph
assert '[v2][s3]overlay' in filtergraph
assert 'file subtitle0001.png' in concat
assert 'file subtitle0002.png' in concat
assert 'file subtitle0003.png' in concat

# Last one should output to [out]
assert '[out]' in filtergraph
def test_custom_resolution(self):
"""Test custom video resolution produces correctly sized images."""
writer = FiltergraphWriter(video_width=1280, video_height=720)
srt_content = """1
00:00:01,000 --> 00:00:04,000
Test
"""
caption_set = SRTReader().read(srt_content)
zip_data = writer.write(caption_set)

def test_duration_calculation(self):
"""Test that duration is calculated from last subtitle end time."""
with zipfile.ZipFile(BytesIO(zip_data), 'r') as zf:
# Verify blank image has correct dimensions
from PIL import Image
blank_data = zf.read('embedded_subs/blank.png')
img = Image.open(BytesIO(blank_data))
assert img.size == (1280, 720)

def test_back_to_back_subtitles_no_gap(self):
"""Test subtitles with no gap between them produce no blank entry."""
srt_content = """1
00:00:01,000 --> 00:00:04,000
First

2
00:01:30,000 --> 00:01:35,500
Last one at 1:35
00:00:04,000 --> 00:00:07,000
Immediately after
"""
caption_set = SRTReader().read(srt_content)
zip_data = self.writer.write(caption_set)

with zipfile.ZipFile(BytesIO(zip_data), 'r') as zf:
filtergraph = zf.read('embedded_subs/filtergraph.txt').decode()
concat = zf.read('embedded_subs/concat.txt').decode()
lines = concat.split('\n')

# Duration should be ~96.5 seconds (95.5 + 1 buffer)
assert 'd=96.500' in filtergraph
# Find subtitle0001 line
idx_sub1 = next(i for i, l in enumerate(lines) if 'subtitle0001' in l)
# Next file entry should be subtitle0002 directly (no blank in between)
assert lines[idx_sub1 + 2] == 'file subtitle0002.png'

def test_custom_resolution(self):
"""Test custom video resolution."""
writer = FiltergraphWriter(video_width=1280, video_height=720)
def test_subtitle_starting_at_zero(self):
"""Test subtitle starting at time 0 produces no leading blank."""
srt_content = """1
00:00:01,000 --> 00:00:04,000
Test
00:00:00,000 --> 00:00:03,000
Starts immediately
"""
caption_set = SRTReader().read(srt_content)
zip_data = writer.write(caption_set)
zip_data = self.writer.write(caption_set)

with zipfile.ZipFile(BytesIO(zip_data), 'r') as zf:
filtergraph = zf.read('embedded_subs/filtergraph.txt').decode()
assert 's=1280x720' in filtergraph
concat = zf.read('embedded_subs/concat.txt').decode()
lines = concat.split('\n')

# First file entry should be the subtitle, not blank
assert lines[1] == 'file subtitle0001.png'