From 97946ad205d7c94af0effa627987152f7363de20 Mon Sep 17 00:00:00 2001 From: Aliaksei Kharlap Date: Tue, 3 Mar 2026 20:09:17 +0300 Subject: [PATCH] fix(import): crash recovery, stale detection, and InfluxDB v3 timestamps --- influxdata/import/README.md | 43 ++++- influxdata/import/import.py | 366 ++++++++++++++++++++++-------------- 2 files changed, 257 insertions(+), 152 deletions(-) diff --git a/influxdata/import/README.md b/influxdata/import/README.md index cbba803..661666f 100644 --- a/influxdata/import/README.md +++ b/influxdata/import/README.md @@ -11,6 +11,7 @@ Key features: - Automatic data sampling for optimal batch sizing - Resume interrupted imports from the last checkpoint - Pause and cancel running imports +- Crash recovery with stale import detection - Progress tracking and statistics - Tag/field conflict detection and resolution - Data type mismatch handling @@ -162,7 +163,7 @@ Pause a running import to resume later. **Request**: `POST /api/v3/engine/import?action=pause&import_id=` -> **Note**: Returns error if import is not found, already paused, or already cancelled. +> **Note**: Returns error if import is not found, already paused, already cancelled, or already completed. ### Resume Import @@ -188,7 +189,17 @@ Resume a paused or interrupted import. `POST /api/v3/engine/import?action=resume&import_id=&source_token=your_token` ``` -> **Note**: Authentication credentials are not stored for security reasons and must be provided when resuming. Returns error if import is not found, already cancelled, or already running. +> **Note**: Authentication credentials are not stored for security reasons and must be provided when resuming. Returns error if import is not found, already cancelled, already completed, or actively running. + +#### Crash recovery + +If the plugin or the server crashes during an import, the import state may be left as "running" even though nothing is actually running. The resume action handles this with **stale import detection**: + +- When the import state is "running", the plugin checks the timestamp of the last `import_state` record. +- If the last update is older than **5 minutes**, the import is considered stale (crashed) and resume is allowed. +- If no `import_state` records exist at all (the import crashed before processing any tables), the import is restarted from the beginning. + +If the plugin itself crashes (e.g., source database becomes unavailable), it writes a paused state before exiting, so the import can be resumed with a regular resume call after fixing the issue. ### Cancel Import @@ -196,7 +207,7 @@ Cancel a running import. Cancelled imports cannot be resumed. **Request**: `POST /api/v3/engine/import?action=cancel&import_id=` -> **Note**: Returns error if import is not found or already cancelled. +> **Note**: Returns error if import is not found, already cancelled, or already completed. ### Test Connection @@ -644,7 +655,7 @@ influxdb3 query --database mydb "SELECT * FROM import_state WHERE import_id = 'y ``` #### `import_pause_state` -Stores pause/cancel state for controlling running imports. +Stores pause/cancel/completed state for controlling running imports. ```bash influxdb3 query --database mydb "SELECT * FROM import_pause_state WHERE import_id = 'your-import-id' ORDER BY time DESC LIMIT 1" @@ -663,6 +674,7 @@ Starts a new import process: 2. Estimates import time based on data sampling 3. Creates import configuration and state records 4. Initiates table-by-table import +5. On any unhandled error, writes paused state so the import can be resumed after fixing the issue #### `import_table(influxdb3_local, config, import_id, measurement, start_time, end_time, task_id, ...)` @@ -677,10 +689,12 @@ Imports a single table: #### `resume_import(influxdb3_local, import_id, task_id, ...)` Resumes an interrupted import: -1. Loads saved import configuration -2. Identifies incomplete tables and their last checkpoint -3. Continues import from checkpoint positions -4. Handles tables without checkpoint (e.g., after crash) by restarting from beginning +1. Detects stale imports — if the import state is "running" but the last update is older than 5 minutes, treats it as crashed and allows resume +2. If no `import_state` records exist (crashed before processing any tables), restarts from the beginning +3. Loads saved import configuration +4. Identifies incomplete tables and their last checkpoint +5. Continues import from checkpoint positions +6. On any unhandled error, writes paused state so the import can be resumed again #### `get_import_stats(influxdb3_local, import_id, task_id)` @@ -774,6 +788,19 @@ During import, the plugin saves checkpoints: - For username/password: Provide both `source_username` AND `source_password` together - Do not mix authentication methods +#### Issue: "Import is already running" after server crash + +**Solution**: + +1. Wait at least 5 minutes after the crash — the plugin uses a 5-minute stale import threshold +2. Call the resume endpoint with credentials: + ```bash + curl -X POST "http://localhost:8181/api/v3/engine/import?action=resume&import_id=$IMPORT_ID" \ + -H "Content-Type: application/json" \ + -d '{"source_token": "my-token"}' + ``` +3. The plugin detects the stale state and resumes from the last checkpoint (or restarts from the beginning if no checkpoint exists) + #### Issue: "Import already completed" when trying to resume **Solution**: diff --git a/influxdata/import/import.py b/influxdata/import/import.py index 834a77c..cf12cc3 100644 --- a/influxdata/import/import.py +++ b/influxdata/import/import.py @@ -121,6 +121,7 @@ INITIAL_BACKOFF_SECONDS = 1 MAX_BACKOFF_SECONDS = 16 REQUEST_TIMEOUT_SECONDS = 30 +STALE_IMPORT_THRESHOLD_SECONDS = 300 # 5 minutes — if last import_state update is older, import is considered stale # Timestamp offset constants (for boundary adjustments) MICROSECOND_OFFSET = 1 @@ -133,6 +134,7 @@ class ImportPauseState(Enum): CANCELLED = "cancelled" PAUSED = "paused" RUNNING = "running" + COMPLETED = "completed" @dataclass @@ -1203,20 +1205,32 @@ def convert_influxql_to_line_protocol( time_idx = columns.index("time") if "time" in columns else 0 # Process each row + skipped = 0 for row in values: - builder = build_line_protocol_row( - influxdb3_local, - measurement, - row, - time_idx, - tags_dict, - tag_columns, - field_columns, - tag_renames, - task_id, + try: + builder = build_line_protocol_row( + influxdb3_local, + measurement, + row, + time_idx, + tags_dict, + tag_columns, + field_columns, + tag_renames, + task_id, + ) + if builder: + builders.append(builder) + except Exception as e: + skipped += 1 + influxdb3_local.warn( + f"[{task_id}] Skipping invalid row in '{measurement}': {e}" + ) + + if skipped > 0: + influxdb3_local.warn( + f"[{task_id}] Skipped {skipped} invalid rows in '{measurement}'" ) - if builder: - builders.append(builder) return builders @@ -1390,55 +1404,6 @@ def write_import_state( influxdb3_local.warn(f"[{task_id}] Failed to write import state: {e}") -def check_pause_state( - influxdb3_local, - import_id: str, - task_id: str, -) -> str: - """ - Check if import is paused or canceled by querying the import_pause_state measurement - - Returns: - 'cancelled' if import is canceled - 'paused' if import is paused - 'running' if import is running normally - """ - try: - # Use database parameter in query - query = f""" - SELECT paused, canceled, time - FROM 'import_pause_state' - WHERE import_id = '{import_id}' - ORDER BY time DESC - LIMIT 1 - """ - - result = influxdb3_local.query(query) - - # query() returns a list of dicts - if result and len(result) > 0: - # Get the first row (dict) - row = result[0] - - # Check if import is canceled (higher priority than paused) - canceled_value = row.get("canceled", False) - if str(canceled_value).lower() == "true": - influxdb3_local.info( - f"[{task_id}] Import {import_id} has been canceled" - ) - return "cancelled" - - # Check if import is paused - paused_value = row.get("paused", False) - # Convert to bool in case it's an integer or string - if str(paused_value).lower() == "true": - return "paused" - - return "running" - except Exception: - # For errors, assume import is running - return "running" - def import_table( influxdb3_local, @@ -1526,11 +1491,11 @@ def import_table( result = None while True: # Check for pause/cancel state - pause_state = check_pause_state( + pause_state = get_import_pause_state( influxdb3_local, import_id, task_id ) - if pause_state == "cancelled": + if pause_state == ImportPauseState.CANCELLED: influxdb3_local.info( f"[{task_id}] Import cancelled by user for '{measurement}'" ) @@ -1552,7 +1517,7 @@ def import_table( "errors": errors, "cancelled_at_time": current_time.isoformat(), } - elif pause_state == "paused": + elif pause_state == ImportPauseState.PAUSED: influxdb3_local.info( f"[{task_id}] Import paused by user for '{measurement}'" ) @@ -1561,7 +1526,9 @@ def import_table( current_time.isoformat() ) # default value if no valid timestamp found - # Try to extract the maximum time value (nanoseconds) from the current series + # Try to extract the maximum time value from the current series. + # InfluxDB v1/v2 returns timestamps as integer nanoseconds, + # InfluxDB v3 returns timestamps as ISO 8601 strings. if result and "results" in result: series_list = result["results"][0].get("series", []) if series_list: @@ -1572,19 +1539,22 @@ def import_table( if "time" in columns and values: time_idx = columns.index("time") try: - # Collect all valid timestamps from rows (expected as int nanoseconds) times = [ row[time_idx] for row in values if row[time_idx] is not None ] if times: - # Find the maximum timestamp (nanoseconds) - max_time_ns = max(times) - # Convert nanoseconds to seconds and produce an aware UTC ISO8601 string - paused_at_time = datetime.fromtimestamp( - max_time_ns / 1e9, tz=timezone.utc - ).isoformat() + if isinstance(times[0], (int, float)): + # v1/v2: nanoseconds → ISO 8601 + max_time_ns = max(times) + paused_at_time = datetime.fromtimestamp( + max_time_ns / 1e9, tz=timezone.utc + ).isoformat() + elif isinstance(times[0], str): + # v3: ISO 8601 strings — parse to compare correctly + parsed = [parse_timestamp(t) for t in times] + paused_at_time = max(parsed).isoformat() except Exception as e: influxdb3_local.warn( f"[{task_id}] Failed to extract paused_at_time from series: {e}" @@ -1692,14 +1662,18 @@ def import_table( influxdb3_local.error( f"[{task_id}] Error during import of '{measurement}': {e}" ) - errors.append( - {"time_range": f"{window_start} to {window_end}", "error": str(e)} + # Write paused state for this table so it can be resumed from this point + write_import_state( + influxdb3_local, + import_id, + measurement, + "paused", + rows_imported, + task_id, + current_time.isoformat(), + no_sync=True, ) - # Continue with next window - if direction > 0: - current_time = window_end - else: - current_time = window_start + raise influxdb3_local.info( f"[{task_id}] Completed import for '{measurement}': {rows_imported} rows imported" @@ -1860,6 +1834,12 @@ def resume_incomplete_import( import_duration = time.time() - import_start + # Write completed state to import_pause_state + try: + _write_import_pause_state(influxdb3_local, import_id, paused=False, canceled=False, completed=True) + except Exception as e: + influxdb3_local.warn(f"[{task_id}] Failed to write completed state: {e}") + # Generate final report report = { "import_id": import_id, @@ -2030,15 +2010,9 @@ def start_import(influxdb3_local, config: ImportConfig, task_id: str) -> Dict[st "errors": [f"Failed to save import config: {e}"], } - # Create default pause state record (not paused, not canceled) + # Create default pause state record (not paused, not canceled, not completed) try: - pause_builder = LineBuilder("import_pause_state") - pause_builder.tag("import_id", import_id) - pause_builder.bool_field("paused", False) - pause_builder.bool_field("canceled", False) - pause_builder.time_ns(int(time.time() * 1_000_000_000)) - - influxdb3_local.write_sync(pause_builder, no_sync=False) + _write_import_pause_state(influxdb3_local, import_id, paused=False, canceled=False, completed=False) influxdb3_local.info( f"[{task_id}] Created default pause state for import {import_id}" ) @@ -2051,6 +2025,60 @@ def start_import(influxdb3_local, config: ImportConfig, task_id: str) -> Dict[st ], } + # Run the import with error handling — on any unhandled error, set state to paused + try: + return _run_import(influxdb3_local, config, import_id, task_id) + except Exception as e: + influxdb3_local.error( + f"[{task_id}] Import failed with error: {e}. Setting state to paused for resumption." + ) + _write_pause_state_on_error(influxdb3_local, import_id, task_id) + return { + "import_id": import_id, + "status": "error", + "error": f"Import failed: {e}. State set to paused — resume after fixing the issue.", + } + + +def _write_import_pause_state( + influxdb3_local, + import_id: str, + paused: bool, + canceled: bool, + completed: bool, +) -> None: + """Write a record to the import_pause_state table.""" + builder = LineBuilder("import_pause_state") + builder.tag("import_id", import_id) + builder.bool_field("paused", paused) + builder.bool_field("canceled", canceled) + builder.bool_field("completed", completed) + builder.time_ns(int(time.time() * 1_000_000_000)) + influxdb3_local.write_sync(builder, no_sync=False) + + +def _write_pause_state_on_error( + influxdb3_local, import_id: str, task_id: str +) -> None: + """Write paused state to import_pause_state so the import can be resumed after fixing the issue.""" + try: + _write_import_pause_state(influxdb3_local, import_id, paused=True, canceled=False, completed=False) + influxdb3_local.info( + f"[{task_id}] Wrote paused state for import {import_id} due to error" + ) + except Exception as write_err: + influxdb3_local.error( + f"[{task_id}] Failed to write paused state after error: {write_err}" + ) + + +def _run_import( + influxdb3_local, config: ImportConfig, import_id: str, task_id: str +) -> Dict[str, Any]: + """ + Internal import execution logic. + Exceptions propagate to the caller (start_import) for state cleanup. + """ # Perform pre-flight checks success, errors, metadata = perform_preflight_checks( influxdb3_local, config, task_id @@ -2060,7 +2088,7 @@ def start_import(influxdb3_local, config: ImportConfig, task_id: str) -> Dict[st influxdb3_local.error(f"[{task_id}] Pre-flight checks failed:") for error in errors: influxdb3_local.error(f"[{task_id}] - {error}") - return {"import_id": import_id, "status": "failed", "errors": errors} + raise Exception(f"Pre-flight checks failed: {'; '.join(errors)}") measurements = metadata["measurements"] total_tables = len(measurements) @@ -2207,6 +2235,12 @@ def start_import(influxdb3_local, config: ImportConfig, task_id: str) -> Dict[st import_duration = time.time() - import_start + # Write completed state to import_pause_state + try: + _write_import_pause_state(influxdb3_local, import_id, paused=False, canceled=False, completed=True) + except Exception as e: + influxdb3_local.warn(f"[{task_id}] Failed to write completed state: {e}") + # Generate final report report = { "import_id": import_id, @@ -2251,12 +2285,13 @@ def get_import_pause_state( Returns an ImportPauseState enum value: NOT_FOUND - no record exists in import_pause_state for this import_id CANCELLED - the import has been canceled + COMPLETED - the import has completed successfully PAUSED - the import is paused - RUNNING - the import exists but is neither paused nor canceled + RUNNING - the import exists but is neither paused, canceled, nor completed """ try: status_query = f""" - SELECT paused, canceled + SELECT paused, canceled, completed FROM 'import_pause_state' WHERE import_id = '{import_id}' ORDER BY time DESC @@ -2273,6 +2308,10 @@ def get_import_pause_state( if str(canceled_value).lower() == "true": return ImportPauseState.CANCELLED + completed_value = row.get("completed", False) + if str(completed_value).lower() == "true": + return ImportPauseState.COMPLETED + paused_value = row.get("paused", False) if str(paused_value).lower() == "true": return ImportPauseState.PAUSED @@ -2295,16 +2334,13 @@ def pause_import(influxdb3_local, import_id: str, task_id: str) -> Dict[str, Any if pause_state == ImportPauseState.CANCELLED: return {"status": "error", "error": f"Import {import_id} is already cancelled and cannot be paused"} + if pause_state == ImportPauseState.COMPLETED: + return {"status": "error", "error": f"Import {import_id} is already completed and cannot be paused"} + if pause_state == ImportPauseState.PAUSED: return {"status": "error", "error": f"Import {import_id} is already paused"} - builder = LineBuilder("import_pause_state") - builder.tag("import_id", import_id) - builder.bool_field("paused", True) - builder.bool_field("canceled", False) - builder.time_ns(int(time.time() * 1_000_000_000)) - - influxdb3_local.write_sync(builder, no_sync=False) + _write_import_pause_state(influxdb3_local, import_id, paused=True, canceled=False, completed=False) influxdb3_local.info(f"[{task_id}] Import {import_id} paused") return {"status": "paused", "import_id": import_id} except Exception as e: @@ -2339,21 +2375,81 @@ def resume_import( if pause_state == ImportPauseState.CANCELLED: return {"status": "error", "error": f"Import {import_id} was cancelled and cannot be resumed"} + if pause_state == ImportPauseState.COMPLETED: + return {"status": "error", "error": f"Import {import_id} is already completed"} + if pause_state == ImportPauseState.RUNNING: - return {"status": "error", "error": f"Import {import_id} is already running"} + # Check if the import is actually running or just stale (crashed without writing paused state) + try: + stale_query = f""" + SELECT time + FROM 'import_state' + WHERE import_id = '{import_id}' + ORDER BY time DESC + LIMIT 1 + """ + stale_result = influxdb3_local.query(stale_query) + except Exception: + stale_result = None + + if stale_result: + last_update_ns = stale_result[0].get("time") + if last_update_ns is not None: + age_seconds = time.time() - (last_update_ns / 1e9) + if age_seconds > STALE_IMPORT_THRESHOLD_SECONDS: + influxdb3_local.warn( + f"[{task_id}] Import {import_id} has stale in_progress state " + f"(last update {age_seconds:.0f}s ago), treating as crashed. Allowing resume." + ) + else: + return {"status": "error", "error": f"Import {import_id} is already running"} + else: + return {"status": "error", "error": f"Import {import_id} is already running"} + else: + # No import_state records at all — import crashed before processing any tables + influxdb3_local.warn( + f"[{task_id}] Import {import_id} is in running state but has no import_state records. " + f"Treating as crashed. Allowing resume." + ) - # Check if import exists and is not completed - status_query = f""" - SELECT status, table_name - FROM 'import_state' - WHERE import_id = '{import_id}' - ORDER BY time DESC - LIMIT 100 - """ - status_result = influxdb3_local.query(status_query) + # Load import configuration with provided credentials + config = load_import_config( + influxdb3_local, + import_id, + task_id, + source_token, + source_username, + source_password, + ) + if not config: + return { + "status": "error", + "error": f"Import config not found for {import_id}. Cannot resume import.", + } + # Check if import_state table exists and has records for this import + try: + status_query = f""" + SELECT status, table_name + FROM 'import_state' + WHERE import_id = '{import_id}' + ORDER BY time DESC + LIMIT 100 + """ + status_result = influxdb3_local.query(status_query) + except Exception: + status_result = None + + # If no import_state records exist, the import failed before any tables were processed. + # Restart it from the beginning. if not status_result: - return {"status": "error", "error": f"Import {import_id} not found"} + influxdb3_local.info( + f"[{task_id}] No import_state records found for {import_id}. Restarting import from the beginning." + ) + # Write resume state (unpause the import) + _write_import_pause_state(influxdb3_local, import_id, paused=False, canceled=False, completed=False) + + return _run_import(influxdb3_local, config, import_id, task_id) # Check latest states to determine if import can be resumed latest_states = {} @@ -2376,29 +2472,10 @@ def resume_import( } # Write resume state (unpause the import) - builder = LineBuilder("import_pause_state") - builder.tag("import_id", import_id) - builder.bool_field("paused", False) - builder.bool_field("canceled", False) - influxdb3_local.write_sync(builder, no_sync=False) + _write_import_pause_state(influxdb3_local, import_id, paused=False, canceled=False, completed=False) influxdb3_local.info(f"[{task_id}] Wrote resume state for import {import_id}") - # 2. Load import configuration with provided credentials - config = load_import_config( - influxdb3_local, - import_id, - task_id, - source_token, - source_username, - source_password, - ) - if not config: - return { - "status": "error", - "error": f"Import config not found for {import_id}. Cannot resume import.", - } - - # 3. Find paused and in_progress tables for this specific import + # Find paused and in_progress tables for this specific import query = f""" SELECT import_id, table_name, status, rows_imported, time, paused_at_time FROM 'import_state' @@ -2449,7 +2526,7 @@ def resume_import( f"[{task_id}] Found {incomplete_tables} incomplete tables to resume for import {import_id}: {incomplete_tables}" ) - # 4. Resume the import using resume_incomplete_import + # Resume the import using resume_incomplete_import return resume_incomplete_import( influxdb3_local, config, @@ -2459,8 +2536,9 @@ def resume_import( ) except Exception as e: - influxdb3_local.error(f"[{task_id}] Failed to resume import: {e}") - return {"status": "error", "error": str(e)} + influxdb3_local.error(f"[{task_id}] Failed to resume import: {e}. Setting state to paused for resumption.") + _write_pause_state_on_error(influxdb3_local, import_id, task_id) + return {"import_id": import_id, "status": "error", "error": f"Resume failed: {e}. State set to paused — resume after fixing the issue."} def get_import_stats(influxdb3_local, import_id: str, task_id: str) -> Dict[str, Any]: @@ -2493,9 +2571,9 @@ def get_import_stats(influxdb3_local, import_id: str, task_id: str) -> Dict[str, "error": "No import records found", } - # 2. Get pause/cancel state + # 2. Get pause/cancel/completed state pause_query = f""" - SELECT paused, canceled, time + SELECT paused, canceled, completed, time FROM 'import_pause_state' WHERE import_id = '{import_id}' ORDER BY time DESC @@ -2584,10 +2662,12 @@ def get_import_stats(influxdb3_local, import_id: str, task_id: str) -> Dict[str, overall_status = "unknown" is_paused = False is_cancelled = False + is_completed = False if pause_result and len(pause_result) > 0: pause_state = pause_result[0] is_cancelled = str(pause_state.get("canceled", False)).lower() == "true" + is_completed = str(pause_state.get("completed", False)).lower() == "true" is_paused = str(pause_state.get("paused", False)).lower() == "true" if ( @@ -2596,10 +2676,10 @@ def get_import_stats(influxdb3_local, import_id: str, task_id: str) -> Dict[str, and latest_table_states["all"]["status"] == "cancelled" ): overall_status = "cancelled" + elif is_completed or (completed_tables == total_tables and total_tables > 0): + overall_status = "completed" elif is_paused: overall_status = "paused" - elif completed_tables == total_tables and total_tables > 0: - overall_status = "completed" elif in_progress_tables > 0 or pending_tables > 0: overall_status = "running" else: @@ -2671,7 +2751,7 @@ def get_import_stats(influxdb3_local, import_id: str, task_id: str) -> Dict[str, }, "config": config_summary, "pause_state": ( - {"is_paused": is_paused, "is_cancelled": is_cancelled} + {"is_paused": is_paused, "is_cancelled": is_cancelled, "is_completed": is_completed} if pause_result else None ), @@ -2703,13 +2783,11 @@ def cancel_import(influxdb3_local, import_id: str, task_id: str) -> Dict[str, An if pause_state == ImportPauseState.CANCELLED: return {"status": "error", "error": f"Import {import_id} is already cancelled"} - # Write pause state with canceled flag using LineBuilder - pause_builder = LineBuilder("import_pause_state") - pause_builder.tag("import_id", import_id) - pause_builder.bool_field("paused", True) - pause_builder.bool_field("canceled", True) - pause_builder.time_ns(int(time.time() * 1_000_000_000)) - influxdb3_local.write_sync(pause_builder, no_sync=False) + if pause_state == ImportPauseState.COMPLETED: + return {"status": "error", "error": f"Import {import_id} is already completed and cannot be cancelled"} + + # Write pause state with canceled flag + _write_import_pause_state(influxdb3_local, import_id, paused=True, canceled=True, completed=False) # Write cancelled status using LineBuilder status_builder = LineBuilder("import_state")