From eed9da07b061b1853056c434644e2e28331fdeeb Mon Sep 17 00:00:00 2001 From: Greg Brandt Date: Sat, 14 Feb 2026 13:22:01 -0800 Subject: [PATCH 1/4] Polish and test coverage - Export all major functions from root avrokit init - Improve test coverage - Bump version --- avrokit/__init__.py | 36 +- pyproject.toml | 2 +- tests/test_plan_coverage.py | 969 ++++++++++++++++++++++++++++++++++++ 3 files changed, 1005 insertions(+), 2 deletions(-) create mode 100644 tests/test_plan_coverage.py diff --git a/avrokit/__init__.py b/avrokit/__init__.py index 35d1c73..77d7356 100644 --- a/avrokit/__init__.py +++ b/avrokit/__init__.py @@ -77,8 +77,9 @@ ... writer.roll() # Create a new partition file """ -from .url import URL, parse_url, create_url_mapping +from .url import URL, FileURL, parse_url, create_url_mapping, flatten_urls from .io import ( + Appendable, PartitionedAvroReader, PartitionedAvroWriter, TimePartitionedAvroWriter, @@ -88,16 +89,46 @@ avro_writer, avro_records, compact_avro_data, + read_avro_schema, + read_avro_schema_from_first_nonempty_file, validate_avro_schema_evolution, ) from .asyncio import DeferredAvroWriter, BlockingQueueAvroReader +from .tools import ( + CatTool, + ConcatTool, + FileSortTool, + FromParquetTool, + GetMetaTool, + GetSchemaTool, + HttpServerTool, + PartitionTool, + RepairTool, + StatsTool, + ToJsonTool, + ToParquetTool, +) __all__ = [ + "Appendable", "BlockingQueueAvroReader", + "CatTool", + "ConcatTool", "DeferredAvroWriter", + "FileSortTool", + "FileURL", + "FromParquetTool", + "GetMetaTool", + "GetSchemaTool", + "HttpServerTool", + "PartitionTool", "PartitionedAvroReader", "PartitionedAvroWriter", + "RepairTool", + "StatsTool", "TimePartitionedAvroWriter", + "ToJsonTool", + "ToParquetTool", "URL", "add_avro_schema_fields", "avro_reader", @@ -106,6 +137,9 @@ "avro_records", "compact_avro_data", "create_url_mapping", + "flatten_urls", "parse_url", + "read_avro_schema", + "read_avro_schema_from_first_nonempty_file", "validate_avro_schema_evolution", ] diff --git a/pyproject.toml b/pyproject.toml index deb314c..124f850 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ [tool.poetry] name = "avrokit" -version = "0.0.1" +version = "0.0.2" description = "Python utilities for working with Avro data files" authors = ["Greg Brandt "] license = "Apache-2.0" diff --git a/tests/test_plan_coverage.py b/tests/test_plan_coverage.py new file mode 100644 index 0000000..13e6e36 --- /dev/null +++ b/tests/test_plan_coverage.py @@ -0,0 +1,969 @@ +# SPDX-FileCopyrightText: 2026 Greg Brandt +# +# SPDX-License-Identifier: Apache-2.0 + +""" +High-priority tests based on coverage analysis from TEST_PLAN.md. +""" + +import pytest +import tempfile +import os +import json +import logging +from avrokit.tools.count import CountTool +from avrokit.tools.stats import StatsTool, Stats +from avrokit.tools.getmeta import GetMetaTool +from avrokit.tools.concat import ConcatTool +from avrokit.tools.cat import CatTool +from avrokit.tools.getschema import GetSchemaTool +from avrokit.io import avro_schema, avro_writer, avro_reader +from avrokit.url.factory import parse_url +from avrokit.io.writer import PartitionedAvroWriter +from avrokit.asyncio.reader import BlockingQueueAvroReader +from avrokit.asyncio.writer import DeferredAvroWriter + + +def create_test_avro_file(tmpdir, filename, schema, records, codec="null"): + """Helper to create test Avro files.""" + file_path = os.path.join(tmpdir, filename) + url = parse_url(file_path) + with avro_writer(url.with_mode("wb"), schema, codec=codec) as writer: + for record in records: + writer.append(record) + return file_path + + +class TestCountTool: + """Tests for the count tool (Priority 1 - 0% coverage).""" + + def test_count_single_file(self): + """Basic counting of records in a single file.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + file_path = create_test_avro_file( + tmpdir, "test.avro", schema, [{"id": i} for i in range(10)] + ) + url = parse_url(file_path) + + tool = CountTool() + count = tool.count([url]) + + assert count == 10 + + def test_count_multiple_files(self): + """Glob pattern support - counting across multiple files.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + # Create multiple files + for i in range(3): + create_test_avro_file( + tmpdir, + f"test_{i}.avro", + schema, + [{"id": j + i * 10} for j in range(10)], + ) + + # Count using glob pattern + pattern = os.path.join(tmpdir, "test_*.avro") + url = parse_url(pattern) + + tool = CountTool() + count = tool.count(url.expand()) + + assert count == 30 + + def test_count_empty_file(self): + """Empty file returns 0.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + file_path = create_test_avro_file(tmpdir, "empty.avro", schema, []) + url = parse_url(file_path) + + tool = CountTool() + count = tool.count([url]) + + assert count == 0 + + def test_count_single_record(self): + """Edge case: single record file.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "value", "type": "string"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + file_path = create_test_avro_file( + tmpdir, "single.avro", schema, [{"value": "hello"}] + ) + url = parse_url(file_path) + + tool = CountTool() + count = tool.count([url]) + + assert count == 1 + + def test_count_deflate_codec(self): + """Different codec handling (deflate).""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + file_path = create_test_avro_file( + tmpdir, + "deflate.avro", + schema, + [{"id": i} for i in range(5)], + codec="deflate", + ) + url = parse_url(file_path) + + tool = CountTool() + count = tool.count([url]) + + assert count == 5 + + def test_fast_count_records(self): + """Test the core method directly.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + file_path = create_test_avro_file( + tmpdir, "test.avro", schema, [{"id": i} for i in range(25)] + ) + url = parse_url(file_path) + + with avro_reader(url.with_mode("rb")) as reader: + tool = CountTool() + count = tool.fast_count_records(reader) + + assert count == 25 + + +class TestStatsTool: + """Tests for the stats tool (Priority 1 - 47% coverage).""" + + def test_stats_null_field_counts(self): + """Null count per field.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "name", "type": ["string", "null"]}, + ], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + records = [ + {"id": 1, "name": "Alice"}, + {"id": 2, "name": None}, + {"id": 3, "name": "Bob"}, + ] + file_path = create_test_avro_file(tmpdir, "test.avro", schema, records) + url = parse_url(file_path) + + tool = StatsTool() + stats = Stats() + tool.run.__self__.run + + from io import StringIO + import sys + + # Replicate the tool's run logic + stats_obj = Stats() + with avro_reader(url) as reader: + for record in reader: + stats_obj.count += 1 + for f, v in record.items(): + if f not in stats_obj.count_null_by_field: + stats_obj.count_null_by_field[f] = 0 + if v is None: + stats_obj.count_null_by_field[f] += 1 + + assert stats_obj.count == 3 + assert stats_obj.count_null_by_field.get("name") == 1 + + def test_stats_multiple_files(self): + """Aggregation across multiple files.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + urls = [] + for i in range(3): + file_path = create_test_avro_file( + tmpdir, f"file_{i}.avro", schema, [{"id": j} for j in range(5)] + ) + urls.append(parse_url(file_path)) + + tool = StatsTool() + total_count = 0 + for url in urls: + with avro_reader(url) as reader: + for _ in reader: + total_count += 1 + + assert total_count == 15 + + def test_stats_nested_types(self): + """Complex schemas with nested records and arrays.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "items", "type": {"type": "array", "items": "int"}}, + { + "name": "nested", + "type": [ + { + "type": "record", + "name": "Nested", + "fields": [{"name": "value", "type": "string"}], + }, + "null", + ], + }, + ], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + records = [ + {"id": 1, "items": [1, 2, 3], "nested": {"value": "a"}}, + {"id": 2, "items": [], "nested": None}, + ] + file_path = create_test_avro_file(tmpdir, "test.avro", schema, records) + url = parse_url(file_path) + + with avro_reader(url) as reader: + count = sum(1 for _ in reader) + + assert count == 2 + + def test_stats_json_output_structure(self): + """Verify output format.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + file_path = create_test_avro_file( + tmpdir, "test.avro", schema, [{"id": i} for i in range(3)] + ) + url = parse_url(file_path) + + stats = Stats() + with avro_reader(url) as reader: + for record in reader: + stats.count += 1 + url_size = url.size() + stats.size_bytes += url_size + + result = { + "count": stats.count, + "count_by_file": stats.count_by_file, + "count_null_by_field": stats.count_null_by_field, + "size_bytes": stats.size_bytes, + "size_bytes_by_file": stats.size_bytes_by_file, + } + + assert "count" in result + assert "size_bytes" in result + assert result["count"] == 3 + assert result["size_bytes"] > 0 + + def test_stats_empty_file(self): + """Zero records but file size present.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + file_path = create_test_avro_file(tmpdir, "empty.avro", schema, []) + url = parse_url(file_path) + + stats = Stats() + with avro_reader(url) as reader: + for _ in reader: + stats.count += 1 + + url_size = url.size() + + assert stats.count == 0 + assert url_size > 0 + + +class TestGetMetaTool: + """Tests for getmeta tool (Priority 2 - 47% coverage).""" + + def test_getmeta_standard_metadata(self): + """Standard metadata: avro.schema, avro.codec.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + file_path = create_test_avro_file(tmpdir, "test.avro", schema, [{"id": 1}]) + url = parse_url(file_path) + + tool = GetMetaTool() + with avro_reader(url.with_mode("rb")) as reader: + meta = reader.meta + + assert "avro.schema" in meta + assert "avro.codec" in meta + + def test_getmeta_custom_metadata(self): + """User-defined custom metadata.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + file_path = os.path.join(tmpdir, "test.avro") + url = parse_url(file_path) + + # Write custom metadata using DataFileWriter directly + from avro.datafile import DataFileWriter + from avro.io import DatumWriter + + with url.with_mode("wb").open() as f: + writer = DataFileWriter(f, DatumWriter(), schema, codec="null") + writer.append({"id": 1}) + # Add custom metadata before closing + writer.meta["custom.key"] = b"custom_value" + writer.close() + + with avro_reader(url.with_mode("rb")) as reader: + meta = reader.meta + + assert "custom.key" in meta + assert meta["custom.key"] == b"custom_value" + + def test_getmeta_utf8_special_chars(self): + """Encoding edge cases with special UTF-8 characters.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "text", "type": "string"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + records = [ + {"text": "hello"}, + {"text": "こんにちは"}, + {"text": "🎉"}, + ] + file_path = create_test_avro_file(tmpdir, "test.avro", schema, records) + url = parse_url(file_path) + + tool = GetMetaTool() + with avro_reader(url.with_mode("rb")) as reader: + schema_bytes = reader.meta.get("avro.schema") + assert schema_bytes is not None + schema_str = schema_bytes.decode("utf-8") + assert "Test" in schema_str + + +class TestConcatTool: + """Tests for concat tool (Priority 2 - 19% coverage).""" + + def test_concat_schema_mismatch(self): + """Falls back to record concat when schemas differ.""" + schema1 = avro_schema( + { + "type": "record", + "name": "Test1", + "fields": [{"name": "id", "type": "int"}], + } + ) + schema2 = avro_schema( + { + "type": "record", + "name": "Test2", + "fields": [{"name": "value", "type": "string"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + # Create files with different schemas + file1 = create_test_avro_file(tmpdir, "file1.avro", schema1, [{"id": 1}]) + file2 = create_test_avro_file( + tmpdir, "file2.avro", schema2, [{"value": "hello"}] + ) + + tool = ConcatTool() + urls = [parse_url(file1), parse_url(file2)] + + # check_schema_and_codec returns False for different schemas + # This causes the fallback to record concat instead of block concat + result = tool.check_schema_and_codec(urls, "null") + assert result == False + + # Verify the tool correctly identifies schema mismatch + # by checking that it falls back to record concat (not block concat) + # The run method checks schema and codec match before deciding + + def test_concat_codec_mismatch(self): + """Falls back correctly when codecs differ.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + # Create files with different codecs + file1 = create_test_avro_file( + tmpdir, "file1.avro", schema, [{"id": 1}], codec="null" + ) + file2 = create_test_avro_file( + tmpdir, "file2.avro", schema, [{"id": 2}], codec="deflate" + ) + + output_file = os.path.join(tmpdir, "output.avro") + + tool = ConcatTool() + urls = [parse_url(file1), parse_url(file2)] + output_url = parse_url(output_file) + + # check_schema_and_codec should return False + result = tool.check_schema_and_codec(urls, "null") + assert result == False + + # Should fall back to record concat + tool.concat(urls, output_url, "null") + + with avro_reader(output_url.with_mode("rb")) as reader: + records = list(reader) + assert len(records) == 2 + + def test_concat_block_concat_success(self): + """Fast path works when schemas and codecs match.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + # Create multiple files with same schema and codec + files = [] + for i in range(3): + file_path = create_test_avro_file( + tmpdir, + f"file{i}.avro", + schema, + [{"id": j} for j in range(5)], + codec="null", + ) + files.append(parse_url(file_path)) + + output_file = os.path.join(tmpdir, "output.avro") + output_url = parse_url(output_file) + + tool = ConcatTool() + + # check_schema_and_codec should return True + result = tool.check_schema_and_codec(files, "null") + assert result == True + + # Use block concat (fast path) + tool.block_concat(files, output_url, "null") + + with avro_reader(output_url.with_mode("rb")) as reader: + records = list(reader) + assert len(records) == 15 + + def test_concat_single_file(self): + """Edge case: single file concatenation.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + file1 = create_test_avro_file( + tmpdir, "file1.avro", schema, [{"id": i} for i in range(10)] + ) + output_file = os.path.join(tmpdir, "output.avro") + + tool = ConcatTool() + urls = [parse_url(file1)] + output_url = parse_url(output_file) + + tool.concat(urls, output_url, "null") + + with avro_reader(output_url.with_mode("rb")) as reader: + records = list(reader) + assert len(records) == 10 + + def test_concat_empty_file(self): + """Graceful handling of empty file.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + # Create one empty and one non-empty file + file1 = create_test_avro_file(tmpdir, "empty.avro", schema, []) + file2 = create_test_avro_file( + tmpdir, "file2.avro", schema, [{"id": 1}, {"id": 2}] + ) + + output_file = os.path.join(tmpdir, "output.avro") + + tool = ConcatTool() + urls = [parse_url(file1), parse_url(file2)] + output_url = parse_url(output_file) + + tool.concat(urls, output_url, "null") + + with avro_reader(output_url.with_mode("rb")) as reader: + records = list(reader) + assert len(records) == 2 + + +class TestCatTool: + """Tests for cat tool (Priority 2 - 37% coverage).""" + + def test_cat_offset(self): + """Skip first N records.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + records = [{"id": i} for i in range(10)] + file_path = create_test_avro_file(tmpdir, "test.avro", schema, records) + + output_file = os.path.join(tmpdir, "output.avro") + + tool = CatTool() + tool.sample( + [parse_url(file_path)], + parse_url(output_file), + schema, + codec="null", + offset=3, + ) + + with avro_reader(parse_url(output_file).with_mode("rb")) as reader: + output_records = list(reader) + + assert len(output_records) == 7 + assert output_records[0]["id"] == 3 + + def test_cat_limit(self): + """Stop after N records.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + records = [{"id": i} for i in range(10)] + file_path = create_test_avro_file(tmpdir, "test.avro", schema, records) + + output_file = os.path.join(tmpdir, "output.avro") + + tool = CatTool() + tool.sample( + [parse_url(file_path)], + parse_url(output_file), + schema, + codec="null", + limit=4, + ) + + with avro_reader(parse_url(output_file).with_mode("rb")) as reader: + output_records = list(reader) + + assert len(output_records) == 4 + + def test_cat_samplerate_zero(self): + """No records returned when sample rate is 0.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + records = [{"id": i} for i in range(10)] + file_path = create_test_avro_file(tmpdir, "test.avro", schema, records) + + output_file = os.path.join(tmpdir, "output.avro") + + tool = CatTool() + tool.sample( + [parse_url(file_path)], + parse_url(output_file), + schema, + codec="null", + samplerate=0.0, + ) + + with avro_reader(parse_url(output_file).with_mode("rb")) as reader: + output_records = list(reader) + + assert len(output_records) == 0 + + def test_cat_samplerate_full(self): + """All records returned when sample rate is 1.0.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + records = [{"id": i} for i in range(10)] + file_path = create_test_avro_file(tmpdir, "test.avro", schema, records) + + output_file = os.path.join(tmpdir, "output.avro") + + tool = CatTool() + tool.sample( + [parse_url(file_path)], + parse_url(output_file), + schema, + codec="null", + samplerate=1.0, + ) + + with avro_reader(parse_url(output_file).with_mode("rb")) as reader: + output_records = list(reader) + + assert len(output_records) == 10 + + def test_cat_offset_beyond_length(self): + """Edge case: offset beyond file length.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + records = [{"id": i} for i in range(5)] + file_path = create_test_avro_file(tmpdir, "test.avro", schema, records) + + output_file = os.path.join(tmpdir, "output.avro") + + tool = CatTool() + tool.sample( + [parse_url(file_path)], + parse_url(output_file), + schema, + codec="null", + offset=100, + ) + + with avro_reader(parse_url(output_file).with_mode("rb")) as reader: + output_records = list(reader) + + assert len(output_records) == 0 + + def test_cat_deflate_codec(self): + """Different codec handling (deflate).""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + records = [{"id": i} for i in range(10)] + file_path = create_test_avro_file( + tmpdir, "test.avro", schema, records, codec="deflate" + ) + + output_file = os.path.join(tmpdir, "output.avro") + + tool = CatTool() + tool.sample( + [parse_url(file_path)], + parse_url(output_file), + schema, + codec="deflate", + ) + + with avro_reader(parse_url(output_file).with_mode("rb")) as reader: + output_records = list(reader) + + assert len(output_records) == 10 + + +class TestGetSchemaTool: + """Tests for getschema tool (Priority 2 - 53% coverage).""" + + def test_getschema_empty_file(self): + """Error handling for empty file.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + file_path = create_test_avro_file(tmpdir, "empty.avro", schema, []) + url = parse_url(file_path) + + from avrokit.io.schema import read_avro_schema_from_first_nonempty_file + + # Empty file should still have schema + result = read_avro_schema_from_first_nonempty_file(url.expand()) + assert result is not None + + def test_getschema_complex_schema(self): + """Complex schemas with nested records, unions, and enums.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [ + {"name": "id", "type": "int"}, + { + "name": "status", + "type": { + "type": "enum", + "name": "Status", + "symbols": ["A", "B", "C"], + }, + }, + { + "name": "data", + "type": { + "type": "record", + "name": "Data", + "fields": [{"name": "value", "type": "string"}], + }, + }, + {"name": "optional", "type": ["string", "null"]}, + {"name": "tags", "type": {"type": "array", "items": "string"}}, + ], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + records = [ + { + "id": 1, + "status": "A", + "data": {"value": "test"}, + "optional": "hello", + "tags": ["a", "b"], + } + ] + file_path = create_test_avro_file(tmpdir, "test.avro", schema, records) + url = parse_url(file_path) + + from avrokit.io.schema import read_avro_schema_from_first_nonempty_file + + extracted = read_avro_schema_from_first_nonempty_file([url]) + assert extracted is not None + assert extracted.name == "Test" + json_schema = extracted.to_json() + assert "Data" in str(json_schema) + assert "Status" in str(json_schema) + + +class TestPartitionedAvroWriter: + """Tests for io/writer.py (Priority 3 - 90% coverage).""" + + def test_partitioned_writer_find_last_filename_none(self): + """No existing files - returns None.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + url = parse_url(tmpdir, mode="wb") + + writer = PartitionedAvroWriter(url, schema) + result = writer.find_last_filename() + + assert result is None + + def test_partitioned_writer_next_filename_invalid(self): + """Malformed filename raises ValueError.""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + url = parse_url(tmpdir, mode="wb") + + writer = PartitionedAvroWriter(url, schema) + + with pytest.raises(ValueError, match="Invalid filename format"): + writer.next_filename("invalid_name.avro") + + def test_partitioned_writer_roll(self): + """File rotation with roll().""" + schema = avro_schema( + { + "type": "record", + "name": "Test", + "fields": [{"name": "id", "type": "int"}], + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + url = parse_url(tmpdir, mode="wb") + + writer = PartitionedAvroWriter(url, schema) + + with writer: + writer.append({"id": 1}) + # Roll to new file + writer.roll() + writer.append({"id": 2}) + + # Check that multiple files were created + files = os.listdir(tmpdir) + assert len(files) == 2 + + +class TestBlockingQueueAvroReader: + """Tests for asyncio/reader.py (Priority 3 - 90% coverage).""" + + def test_reader_worker_exception(self): + """Exception path in reader thread.""" + + def raise_error(): + raise ValueError("Test error") + + reader = BlockingQueueAvroReader(raise_error) + reader.start() + + import time + + time.sleep(0.1) + + assert reader._reader_thread_done.is_set() + + +class TestDeferredAvroWriter: + """Tests for asyncio/writer.py (Priority 3 - 90% coverage).""" + + def test_writer_worker_exception(self): + """Exception path in writer thread.""" + + class FailingWriter: + def append(self, datum): + raise ValueError("Write error") + + writer = DeferredAvroWriter(FailingWriter()) + writer.start() + + # Add a record that will fail + writer.append({"id": 1}) + + import time + + time.sleep(0.2) + + writer.stop() + + assert writer._writer_thread_done.is_set() From 44b627de3e0d80bd361463c167f1b88524861251 Mon Sep 17 00:00:00 2001 From: Greg Brandt Date: Sat, 14 Feb 2026 13:59:21 -0800 Subject: [PATCH 2/4] Add check.yml workflow --- .github/workflows/check.yml | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 .github/workflows/check.yml diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml new file mode 100644 index 0000000..077fee7 --- /dev/null +++ b/.github/workflows/check.yml @@ -0,0 +1,33 @@ +name: Check + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + check: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install dependencies + run: poetry install --with dev --extras all + + - name: Lint + run: poetry run flake8 . + + - name: Typecheck + run: poetry run mypy . + + - name: Format + run: poetry run black . + + - name: Test + run: poetry run pytest -n auto From acd606a32cc8df18ddb6ff204c804f597d5f7620 Mon Sep 17 00:00:00 2001 From: Greg Brandt Date: Sat, 14 Feb 2026 14:00:26 -0800 Subject: [PATCH 3/4] Add poetry --- .github/workflows/check.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 077fee7..892860e 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -17,6 +17,9 @@ jobs: with: python-version: "3.12" + - name: Install Poetry + run: pipx install poetry + - name: Install dependencies run: poetry install --with dev --extras all From db680a98a3d85abb3213fce1437ec74565ee341b Mon Sep 17 00:00:00 2001 From: Greg Brandt Date: Sat, 14 Feb 2026 14:06:56 -0800 Subject: [PATCH 4/4] Fix lint errors --- tests/test_additional_coverage.py | 2 +- tests/test_plan_coverage.py | 17 +++-------------- tests/test_property_based.py | 7 +++---- tests/test_tools_integration.py | 21 +++++++++------------ 4 files changed, 16 insertions(+), 31 deletions(-) diff --git a/tests/test_additional_coverage.py b/tests/test_additional_coverage.py index 2b6e601..947ab3a 100644 --- a/tests/test_additional_coverage.py +++ b/tests/test_additional_coverage.py @@ -20,7 +20,7 @@ def test_file_url_nonexistent_read(self): """Test reading from non-existent file.""" url = parse_url("/tmp/nonexistent_file_12345.avro") with pytest.raises(FileNotFoundError): - with url.with_mode("rb") as f: + with url.with_mode("rb") as _: pass def test_file_url_exists_check(self): diff --git a/tests/test_plan_coverage.py b/tests/test_plan_coverage.py index 13e6e36..bfa1feb 100644 --- a/tests/test_plan_coverage.py +++ b/tests/test_plan_coverage.py @@ -9,14 +9,10 @@ import pytest import tempfile import os -import json -import logging from avrokit.tools.count import CountTool from avrokit.tools.stats import StatsTool, Stats -from avrokit.tools.getmeta import GetMetaTool from avrokit.tools.concat import ConcatTool from avrokit.tools.cat import CatTool -from avrokit.tools.getschema import GetSchemaTool from avrokit.io import avro_schema, avro_writer, avro_reader from avrokit.url.factory import parse_url from avrokit.io.writer import PartitionedAvroWriter @@ -201,12 +197,8 @@ def test_stats_null_field_counts(self): url = parse_url(file_path) tool = StatsTool() - stats = Stats() tool.run.__self__.run - from io import StringIO - import sys - # Replicate the tool's run logic stats_obj = Stats() with avro_reader(url) as reader: @@ -239,7 +231,6 @@ def test_stats_multiple_files(self): ) urls.append(parse_url(file_path)) - tool = StatsTool() total_count = 0 for url in urls: with avro_reader(url) as reader: @@ -363,7 +354,6 @@ def test_getmeta_standard_metadata(self): file_path = create_test_avro_file(tmpdir, "test.avro", schema, [{"id": 1}]) url = parse_url(file_path) - tool = GetMetaTool() with avro_reader(url.with_mode("rb")) as reader: meta = reader.meta @@ -420,7 +410,6 @@ def test_getmeta_utf8_special_chars(self): file_path = create_test_avro_file(tmpdir, "test.avro", schema, records) url = parse_url(file_path) - tool = GetMetaTool() with avro_reader(url.with_mode("rb")) as reader: schema_bytes = reader.meta.get("avro.schema") assert schema_bytes is not None @@ -461,7 +450,7 @@ def test_concat_schema_mismatch(self): # check_schema_and_codec returns False for different schemas # This causes the fallback to record concat instead of block concat result = tool.check_schema_and_codec(urls, "null") - assert result == False + assert not result # Verify the tool correctly identifies schema mismatch # by checking that it falls back to record concat (not block concat) @@ -494,7 +483,7 @@ def test_concat_codec_mismatch(self): # check_schema_and_codec should return False result = tool.check_schema_and_codec(urls, "null") - assert result == False + assert not result # Should fall back to record concat tool.concat(urls, output_url, "null") @@ -533,7 +522,7 @@ def test_concat_block_concat_success(self): # check_schema_and_codec should return True result = tool.check_schema_and_codec(files, "null") - assert result == True + assert result # Use block concat (fast path) tool.block_concat(files, output_url, "null") diff --git a/tests/test_property_based.py b/tests/test_property_based.py index b3aacba..569c359 100644 --- a/tests/test_property_based.py +++ b/tests/test_property_based.py @@ -35,7 +35,7 @@ def test_roundtrip_empty_file(self): try: url = parse_url(tmp.name) # Write zero records - with avro_writer(url.with_mode("wb"), schema) as writer: + with avro_writer(url.with_mode("wb"), schema) as _: pass # Read back and verify with avro_reader(url.with_mode("rb")) as reader: @@ -129,7 +129,6 @@ def test_roundtrip_many_small_records(self): def test_nested_records_deep(self): """Test deeply nested record structures.""" # Create a deeply nested schema - inner_schema = {"name": "value", "type": "int"} schema_dict = { "type": "record", "name": "Level0", @@ -310,7 +309,7 @@ def test_partitioned_writer_empty_partitions(self): ) with tempfile.TemporaryDirectory() as tmpdir: base_url = parse_url(os.path.join(tmpdir, "output", "*.avro")) - with PartitionedAvroWriter(base_url.with_mode("wb"), schema) as writer: + with PartitionedAvroWriter(base_url.with_mode("wb"), schema) as _: pass # Write nothing # Should create at least one file @@ -380,7 +379,7 @@ def test_partitioned_reader_many_empty_partitions(self): # Create multiple empty files for i in range(5): file_url = parse_url(os.path.join(tmpdir, f"part-{i:05d}.avro")) - with avro_writer(file_url.with_mode("wb"), schema) as writer: + with avro_writer(file_url.with_mode("wb"), schema) as _: pass # Empty # Read with partitioned reader diff --git a/tests/test_tools_integration.py b/tests/test_tools_integration.py index e025083..9eca8c1 100644 --- a/tests/test_tools_integration.py +++ b/tests/test_tools_integration.py @@ -10,15 +10,10 @@ import tempfile import os from avrokit.tools.partition import PartitionTool -from avrokit.tools.fromparquet import FromParquetTool, parquet_to_avro +from avrokit.tools.fromparquet import parquet_to_avro from avrokit.tools.toparquet import ToParquetTool, avro_to_parquet from avrokit.tools.filesort import FileSortTool -from avrokit.tools.concat import ConcatTool -from avrokit.tools.cat import CatTool -from avrokit.tools.getschema import GetSchemaTool -from avrokit.tools.getmeta import GetMetaTool from avrokit.tools.tojson import ToJsonTool -from avrokit.tools.stats import StatsTool from avrokit.io import avro_schema, avro_writer, avro_reader from avrokit.url.factory import parse_url from faker import Faker @@ -471,9 +466,10 @@ def test_concat_multiple_files(self): input_url = parse_url(input_pattern) output_url = parse_url(output_file) - with PartitionedAvroReader( - input_url.with_mode("rb") - ) as reader, avro_writer(output_url.with_mode("wb"), schema) as writer: + with ( + PartitionedAvroReader(input_url.with_mode("rb")) as reader, + avro_writer(output_url.with_mode("wb"), schema) as writer, + ): for record in reader: writer.append(record) @@ -523,9 +519,10 @@ def test_partition_then_concat(self): from avrokit.io.reader import PartitionedAvroReader - with PartitionedAvroReader( - partition_url.with_mode("rb") - ) as reader, avro_writer(concat_url.with_mode("wb"), schema) as writer: + with ( + PartitionedAvroReader(partition_url.with_mode("rb")) as reader, + avro_writer(concat_url.with_mode("wb"), schema) as writer, + ): for record in reader: writer.append(record)