Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
name: Publish Rules

on:
workflow_call:
push:
branches:
- main

jobs:
publish:
runs-on: ubuntu-latest

steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Get changed files
id: changed-files
uses: tj-actions/changed-files@v44
with:
files_ignore: |
.github/**
scripts/**
README.md
.gitignore

- name: Extract new rule directories
id: newdirs
run: |
NEW_DIRS=""

for file in ${{ steps.changed-files.outputs.added_files }}; do
dir=$(dirname "$file")

if [ -f "$dir/rule.yaml" ]; then
NEW_DIRS="$NEW_DIRS $dir"
fi
done

NEW_DIRS=$(echo $NEW_DIRS | xargs -n1 | sort -u | xargs)

echo "NEW_DIRS=$NEW_DIRS"
echo "NEW_DIRS=$NEW_DIRS" >> $GITHUB_OUTPUT

- name: Stop if nothing to publish
if: steps.newdirs.outputs.NEW_DIRS == ''
run: |
echo "No new rules found"
exit 0

- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.11"

- name: Install dependencies
run: |
pip install pyyaml

- name: Run publish script
run: |
python scripts/publish.py \
--new-dirs "${{ steps.newdirs.outputs.NEW_DIRS }}" \
--existing "${{ steps.coreids.outputs.COREIDS }}" \
--algorithm min

- name: Commit changes
run: |
git config user.name "github-actions"
git config user.email "github-actions@github.com"
git add .
git commit -m "Auto-publish new rules" || exit 0

git diff --cached --quiet || git commit -m "Auto-publish rules"

- name: Push
run: |
git push
154 changes: 154 additions & 0 deletions scripts/publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import csv
import re
from pathlib import Path
import yaml
import argparse

CORE_PATTERN = re.compile(r"^CORE-(\d{6})$")


def parse_rule(rule_path: Path):
with open(rule_path, encoding="utf-8") as f:
data = yaml.safe_load(f)

standards = []

for auth in data.get("Authorities", []):
for std in auth.get("Standards", []):
name = std.get("Name")
rule_id = None
rule_ver = None

refs = std.get("References", [])
if refs:
rid_info = refs[0].get("Rule Identifier")
if rid_info:
rule_id = rid_info.get("Id")
rule_ver = rid_info.get("Version")

std_version = str(std.get("Version"))

standards.append(
{
"name": name,
"version": std_version,
"rule_id": rule_id,
"rule_version": rule_ver,
}
)

return standards


def get_next_core_id(mappings_dir: Path, algorithm="max"):
existing_ids = []

for file in mappings_dir.glob("*_mappings.csv"):
with open(file, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
core = row.get("CORE-ID", "").strip()
match = CORE_PATTERN.match(core)
if match:
existing_ids.append(int(match.group(1)))

existing_ids.sort()

if algorithm == "min":
next_id = 1
for eid in existing_ids:
if eid != next_id:
break
next_id += 1
else:
next_id = max(existing_ids, default=0) + 1

return f"CORE-{next_id:06d}"


def _get_field_names(grouped_standard):
all_versions = set(grouped_standard.keys()) - {'rule_id', 'name'}
version_columns = sorted(all_versions)
fieldnames = ["Rule ID"] + version_columns + ["Status", "CORE-ID"]
return fieldnames


def update_csv(mapping_file: Path, grouped_standard: dict, core_id: str) -> str:
rows = []
fieldnames = []
if mapping_file.exists():
with open(mapping_file, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
fieldnames = reader.fieldnames or []
for row in reader:
rows.append(row)
if not fieldnames:
fieldnames = _get_field_names(grouped_standard)
row = next((x for x in rows if x.get('Rule ID') == grouped_standard['rule_id']), {})
if not row:
rows.append(row)
row.update({col: grouped_standard.get(col) for col in set(fieldnames) - {"Status", "CORE-ID"}})
row["Rule ID"] = grouped_standard['rule_id']
row["CORE-ID"] = row.get("CORE-ID") or core_id
row["Status"] = "PUBLISHED"

with open(mapping_file, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
return row.get("CORE-ID")


def main():
parser = argparse.ArgumentParser()
parser.add_argument("--new-dirs", required=True, help="Папки правил через пробел")
parser.add_argument(
"--algorithm", choices=["min", "max"], default="max", help="Алгоритм CORE-ID"
)
args = parser.parse_args()

mappings_dir = Path("mappings")

for rule_dir in args.new_dirs.split():
rule_path = Path(rule_dir) / "rule.yaml"
if not rule_path.exists():
continue
standards = parse_rule(rule_path)

core_id = get_next_core_id(mappings_dir, args.algorithm)

result = {}
for item in standards:
key = (item['name'], item['rule_id'])
if key not in result:
result[key] = {
'name': item['name'],
'rule_id': item['rule_id']
}

result[key][item['version']] = item['version']

actual_core_id = core_id
for (std, rule_id), versions in result.items():
mapping_file = mappings_dir / f"{std}_mappings.csv"
actual_core_id = update_csv(mapping_file, versions, core_id)

update_rule_yaml(actual_core_id, rule_path)

new_path = Path(actual_core_id)
Path(rule_dir).rename(new_path)


def update_rule_yaml(actual_core_id: str, rule_path: Path):
with open(rule_path, encoding="utf-8") as f:
doc = yaml.safe_load(f)
if "Core" not in doc:
doc["Core"] = {}
doc["Core"]["Id"] = actual_core_id
doc["Core"]["Status"] = "Published"
with open(rule_path, "w", encoding="utf-8") as f:
yaml.safe_dump(doc, f, sort_keys=False)


if __name__ == "__main__":
main()