Skip to content

Conversation

@fordN
Copy link
Contributor

@fordN fordN commented Jan 9, 2026

This PR creates a shared set of generalized integration tests that can be used across loader implementations. Previously each loader implemented their own integration test harnesses and tests, leading to huge amounts of code duplication and significant testing work for implementers of new loaders.

In this new setup with shared integration tests we have a set of shared base tests, shared streaming tests, and each loader/datastore just has to implement four functions on its destination data store in order to use these shared tests: get_row_count, query_rows, cleanup_table, and get_column_names. Loader implementers may also add loader specific tests for functionality specific to their loader implementation.

With these changes we significantly reduce the overall loader integration testing code and reduce the amount of code a loader dev has to write for integration tests of a new loader.

Summary:

  • Before: 5,302 lines of test code across 6 loaders
  • After: 2,626 lines (670 shared + 1,956 backend-specific)
  • Saved: 2,676 lines (50% reduction)

Per-Loader Average

  • Before: ~883 lines per loader (all standalone, duplicated)
  • After: ~326 lines per loader (inherits 12 tests automatically)
  • Savings: ~557 lines per loader (63% less work)

fordN added 4 commits January 8, 2026 18:12
Foundational work to enable all loader tests to inherit common test
patterns
Adds generalized streaming test infrastructure and migrates Redis and
Snowflake loader tests to use the shared base classes.
Migrates the final three loader test suites to use the shared base test
infrastructure
@fordN fordN self-assigned this Jan 9, 2026
@fordN fordN added the enhancement New feature or request label Jan 9, 2026
@github-actions
Copy link

github-actions bot commented Jan 9, 2026

🎨 Ruff Formatting & Linting Report

Run the following commands locally to fix issues:

ruff format .
ruff check . --fix

Formatting changes needed:

--- tests/integration/loaders/conftest.py
+++ tests/integration/loaders/conftest.py
@@ -141,11 +141,7 @@
             loader_config = loader_config.copy()
             # Only enable state if not explicitly configured
             if 'state' not in loader_config:
-                loader_config['state'] = {
-                    'enabled': True,
-                    'storage': 'memory',
-                    'store_batch_id': True
-                }
+                loader_config['state'] = {'enabled': True, 'storage': 'memory', 'store_batch_id': True}
 
         # Create and return the loader instance
         return self.config.loader_class(loader_config)

--- tests/integration/loaders/test_base_streaming.py
+++ tests/integration/loaders/test_base_streaming.py
@@ -138,7 +138,9 @@
             reorg_response = ResponseBatch.reorg_batch(
                 invalidation_ranges=[BlockRange(network='ethereum', start=104, end=108)]
             )
-            reorg_results = list(loader.load_stream_continuous(iter([reorg_response]), test_table_name, connection_name='test_conn'))
+            reorg_results = list(
+                loader.load_stream_continuous(iter([reorg_response]), test_table_name, connection_name='test_conn')
+            )
             assert len(reorg_results) == 1
             assert reorg_results[0].success
 
@@ -174,7 +176,9 @@
             )
 
             # Load via streaming API (with connection_name for state tracking)
-            results = list(loader.load_stream_continuous(iter([response]), test_table_name, connection_name='test_conn'))
+            results = list(
+                loader.load_stream_continuous(iter([response]), test_table_name, connection_name='test_conn')
+            )
             assert len(results) == 1
             assert results[0].success
 
@@ -187,7 +191,9 @@
             reorg_response = ResponseBatch.reorg_batch(
                 invalidation_ranges=[BlockRange(network='ethereum', start=160, end=180)]
             )
-            reorg_results = list(loader.load_stream_continuous(iter([reorg_response]), test_table_name, connection_name='test_conn'))
+            reorg_results = list(
+                loader.load_stream_continuous(iter([reorg_response]), test_table_name, connection_name='test_conn')
+            )
             assert len(reorg_results) == 1
             assert reorg_results[0].success
 
@@ -248,7 +254,9 @@
             reorg_response = ResponseBatch.reorg_batch(
                 invalidation_ranges=[BlockRange(network='ethereum', start=100, end=100)]
             )
-            reorg_results = list(loader.load_stream_continuous(iter([reorg_response]), test_table_name, connection_name='test_conn'))
+            reorg_results = list(
+                loader.load_stream_continuous(iter([reorg_response]), test_table_name, connection_name='test_conn')
+            )
             assert len(reorg_results) == 1
             assert reorg_results[0].success
 

2 files would be reformatted, 98 files already formatted

Linting issues:

E501 Line too long (133 > 120)
   --> tests/integration/loaders/test_base_streaming.py:141:121
    |
139 |                 invalidation_ranges=[BlockRange(network='ethereum', start=104, end=108)]
140 |             )
141 |             reorg_results = list(loader.load_stream_continuous(iter([reorg_response]), test_table_name, connection_name='test_conn'))
    |                                                                                                                         ^^^^^^^^^^^^^
142 |             assert len(reorg_results) == 1
143 |             assert reorg_results[0].success
    |

E501 Line too long (121 > 120)
   --> tests/integration/loaders/test_base_streaming.py:177:121
    |
176 |             # Load via streaming API (with connection_name for state tracking)
177 |             results = list(loader.load_stream_continuous(iter([response]), test_table_name, connection_name='test_conn'))
    |                                                                                                                         ^
178 |             assert len(results) == 1
179 |             assert results[0].success
    |

E501 Line too long (133 > 120)
   --> tests/integration/loaders/test_base_streaming.py:190:121
    |
188 |                 invalidation_ranges=[BlockRange(network='ethereum', start=160, end=180)]
189 |             )
190 |             reorg_results = list(loader.load_stream_continuous(iter([reorg_response]), test_table_name, connection_name='test_conn'))
    |                                                                                                                         ^^^^^^^^^^^^^
191 |             assert len(reorg_results) == 1
192 |             assert reorg_results[0].success
    |

E501 Line too long (133 > 120)
   --> tests/integration/loaders/test_base_streaming.py:251:121
    |
249 |                 invalidation_ranges=[BlockRange(network='ethereum', start=100, end=100)]
250 |             )
251 |             reorg_results = list(loader.load_stream_continuous(iter([reorg_response]), test_table_name, connection_name='test_conn'))
    |                                                                                                                         ^^^^^^^^^^^^^
252 |             assert len(reorg_results) == 1
253 |             assert reorg_results[0].success
    |

Found 4 errors.

@fordN fordN force-pushed the ford/generalize-loader-tests branch from 31d82d2 to 1010d2a Compare January 9, 2026 17:47
@fordN fordN force-pushed the ford/generalize-loader-tests branch from 1010d2a to f910b03 Compare January 9, 2026 17:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants