|
1 | 1 | from pathlib import Path |
2 | 2 |
|
3 | | -from sqlmesh.core.linter.rule import Range, Position |
| 3 | +from sqlmesh.core.linter.definition import AnnotatedRuleViolation |
| 4 | +from sqlmesh.core.linter.rule import Range, Position, TextEdit |
4 | 5 | from sqlmesh.utils.pydantic import PydanticModel |
5 | 6 | from sqlglot import tokenize, TokenType |
6 | 7 | import typing as t |
@@ -210,3 +211,46 @@ def get_range_of_a_key_in_model_block( |
210 | 211 | end=key_token.end, |
211 | 212 | ) |
212 | 213 | return position.to_range(sql.splitlines()) |
| 214 | + |
| 215 | + |
| 216 | +def apply_text_edits(path: Path, edits: t.Sequence[TextEdit]) -> None: |
| 217 | + """Apply a sequence of TextEdits to a file.""" |
| 218 | + if not edits: |
| 219 | + return |
| 220 | + |
| 221 | + with open(path, "r", encoding="utf-8") as file: |
| 222 | + content = file.read() |
| 223 | + |
| 224 | + lines = content.splitlines(keepends=True) |
| 225 | + offsets = [0] |
| 226 | + for line in lines: |
| 227 | + offsets.append(offsets[-1] + len(line)) |
| 228 | + |
| 229 | + def to_offset(pos: Position) -> int: |
| 230 | + line = min(pos.line, len(lines) - 1) |
| 231 | + char = min(pos.character, len(lines[line])) |
| 232 | + return offsets[line] + char |
| 233 | + |
| 234 | + sorted_edits = sorted( |
| 235 | + edits, key=lambda e: (e.range.start.line, e.range.start.character), reverse=True |
| 236 | + ) |
| 237 | + for edit in sorted_edits: |
| 238 | + start = to_offset(edit.range.start) |
| 239 | + end = to_offset(edit.range.end) |
| 240 | + content = content[:start] + edit.new_text + content[end:] |
| 241 | + |
| 242 | + with open(path, "w", encoding="utf-8") as file: |
| 243 | + file.write(content) |
| 244 | + |
| 245 | + |
| 246 | +def apply_fixes(violations: t.Iterable[AnnotatedRuleViolation]) -> None: |
| 247 | + """Apply fixes from the provided violations.""" |
| 248 | + edits_by_path: dict[Path, list[TextEdit]] = {} |
| 249 | + |
| 250 | + for violation in violations: |
| 251 | + for fix in violation.fixes: |
| 252 | + for edit in fix.edits: |
| 253 | + edits_by_path.setdefault(edit.path, []).append(edit) |
| 254 | + |
| 255 | + for path, edits in edits_by_path.items(): |
| 256 | + apply_text_edits(path, edits) |
0 commit comments