diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml index b44af2a..b6f49ef 100644 --- a/.github/workflows/docker-publish.yml +++ b/.github/workflows/docker-publish.yml @@ -19,7 +19,7 @@ env: IMAGE_NAME: ${{ github.repository }} jobs: - build-and-push: + build-and-publish: runs-on: ubuntu-latest permissions: contents: read @@ -65,13 +65,15 @@ jobs: type=semver,pattern={{major}}.{{minor}} type=semver,pattern={{major}} type=raw,value=latest,enable={{is_default_branch}} - # add commit sha for unique identification - type=sha,prefix={{branch}}- + type=sha,prefix=sha- labels: | org.opencontainers.image.title=Naminter org.opencontainers.image.description=The most powerful and fast username availability checker org.opencontainers.image.vendor=3xp0rt org.opencontainers.image.licenses=MIT + org.opencontainers.image.source=${{ github.server_url }}/${{ github.repository }} + org.opencontainers.image.revision=${{ github.sha }} + org.opencontainers.image.created=${{ steps.meta.outputs.created }} - name: Build and push Docker image id: build @@ -80,14 +82,16 @@ jobs: context: . file: ./Dockerfile platforms: linux/amd64,linux/arm64 - push: ${{ github.event_name != 'pull_request' }} + push: ${{ github.event_name == 'release' }} tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} cache-from: type=gha cache-to: type=gha,mode=max + provenance: true + sbom: true - name: Generate artifact attestation - if: github.event_name != 'pull_request' + if: github.event_name == 'release' uses: actions/attest-build-provenance@v1 with: subject-name: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} diff --git a/naminter/__init__.py b/naminter/__init__.py index 8241a21..6b27a86 100644 --- a/naminter/__init__.py +++ b/naminter/__init__.py @@ -1,4 +1,9 @@ from .core.main import Naminter __version__ = "1.0.5" +__author__ = "3xp0rt" +__description__ = "WhatsMyName Enumeration Tool" +__license__ = "MIT" +__email__ = "contact@3xp0rt.com" +__url__ = "https://github.com/3xp0rt/Naminter" __all__ = ['Naminter'] \ No newline at end of file diff --git a/naminter/cli/config.py b/naminter/cli/config.py index ead4829..e4cb94b 100644 --- a/naminter/cli/config.py +++ b/naminter/cli/config.py @@ -36,6 +36,7 @@ class NaminterConfig: filter_errors: bool = False filter_not_found: bool = False filter_unknown: bool = False + filter_ambiguous: bool = False # Network and concurrency max_tasks: int = MAX_CONCURRENT_TASKS @@ -77,9 +78,7 @@ def __post_init__(self) -> None: "using known usernames from site configurations instead." ) if not self.self_check and not self.usernames: - error_msg = "No usernames provided and self-check not enabled." - display_error(error_msg) - raise ValueError(error_msg) + raise ValueError("No usernames provided and self-check not enabled.") try: if self.local_list_paths: self.local_list_paths = [str(p) for p in self.local_list_paths] @@ -88,9 +87,7 @@ def __post_init__(self) -> None: if not self.local_list_paths and not self.remote_list_urls: self.remote_list_urls = [WMN_REMOTE_URL] except Exception as e: - error_msg = f"Configuration validation failed: {e}" - display_error(error_msg) - raise ValueError(error_msg) from e + raise ValueError(f"Configuration validation failed: {e}") from e self.impersonate = self.get_impersonation() def get_impersonation(self) -> Optional[str]: @@ -159,5 +156,6 @@ def to_dict(self) -> Dict[str, Any]: "filter_errors": self.filter_errors, "filter_not_found": self.filter_not_found, "filter_unknown": self.filter_unknown, + "filter_ambiguous": self.filter_ambiguous, "no_progressbar": self.no_progressbar, } diff --git a/naminter/cli/console.py b/naminter/cli/console.py index 293be5c..5aba788 100644 --- a/naminter/cli/console.py +++ b/naminter/cli/console.py @@ -10,6 +10,7 @@ from rich.tree import Tree from ..core.models import ResultStatus, SiteResult, SelfCheckResult +from .. import __description__, __version__, __author__, __license__, __email__, __url__ console: Console = Console() @@ -28,6 +29,7 @@ ResultStatus.UNKNOWN: "?", ResultStatus.ERROR: "!", ResultStatus.NOT_VALID: "X", + ResultStatus.AMBIGUOUS: "*", } _STATUS_STYLES: Dict[ResultStatus, Style] = { @@ -36,6 +38,7 @@ ResultStatus.UNKNOWN: Style(color=THEME['warning']), ResultStatus.ERROR: Style(color=THEME['error'], bold=True), ResultStatus.NOT_VALID: Style(color=THEME['error']), + ResultStatus.AMBIGUOUS: Style(color=THEME['warning'], bold=True), } class ResultFormatter: @@ -142,19 +145,18 @@ def _add_debug_info(self, node: Tree, response_code: Optional[int] = None, elaps if error: node.add(Text(f"Error: {error}", style=THEME['error'])) -def display_version(version: str, author: str, description: str) -> None: +def display_version() -> None: """Display version and metadata of the application.""" - - if not all([version and version.strip(), author and author.strip(), description and description.strip()]): - raise ValueError("Version, author, and description must be non-empty strings") - version_table = Table.grid(padding=(0, 2)) version_table.add_column(style=THEME['info']) version_table.add_column(style="bold") - version_table.add_row("Version:", version) - version_table.add_row("Author:", author) - version_table.add_row("Description:", description) + version_table.add_row("Version:", __version__) + version_table.add_row("Author:", __author__) + version_table.add_row("Description:", __description__) + version_table.add_row("License:", __license__) + version_table.add_row("Email:", __email__) + version_table.add_row("GitHub:", __url__) panel = Panel( version_table, diff --git a/naminter/cli/main.py b/naminter/cli/main.py index fe3da91..8f3e987 100644 --- a/naminter/cli/main.py +++ b/naminter/cli/main.py @@ -3,18 +3,20 @@ import logging import webbrowser from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Annotated, Any, Dict, List, Optional, Tuple, Union import typer from curl_cffi import requests +from rich import box +from rich.panel import Panel +from rich.table import Table from ..cli.config import BrowserImpersonation, NaminterConfig from ..cli.console import ( console, - THEME, display_error, - display_version, display_warning, + display_version, ResultFormatter, ) from ..cli.exporters import Exporter @@ -23,10 +25,7 @@ from ..core.main import Naminter from ..core.constants import MAX_CONCURRENT_TASKS, HTTP_REQUEST_TIMEOUT_SECONDS, HTTP_ALLOW_REDIRECTS, HTTP_SSL_VERIFY, WMN_REMOTE_URL, WMN_SCHEMA_URL from ..core.exceptions import DataError, ConfigurationError - -__version__ = "1.0.5" -__author__ = "3xp0rt" -__description__ = "WhatsMyName Enumeration Tool" +from .. import __description__, __version__ app = typer.Typer( help=__description__, @@ -136,42 +135,32 @@ def _merge_data(data: Dict[str, Any]) -> None: async def run(self) -> None: """Main execution method with progress tracking.""" - try: - wmn_data, wmn_schema = self._load_wmn_lists( - local_list_paths=self.config.local_list_paths, - remote_list_urls=self.config.remote_list_urls, - skip_validation=self.config.skip_validation - ) + wmn_data, wmn_schema = self._load_wmn_lists( + local_list_paths=self.config.local_list_paths, + remote_list_urls=self.config.remote_list_urls, + skip_validation=self.config.skip_validation + ) + + async with Naminter( + wmn_data=wmn_data, + wmn_schema=wmn_schema, + max_tasks=self.config.max_tasks, + timeout=self.config.timeout, + impersonate=self.config.impersonate, + verify_ssl=self.config.verify_ssl, + allow_redirects=self.config.allow_redirects, + proxy=self.config.proxy, + ) as naminter: + if self.config.self_check: + results = await self._run_self_check(naminter) + else: + results = await self._run_check(naminter) - async with Naminter( - wmn_data=wmn_data, - wmn_schema=wmn_schema, - max_tasks=self.config.max_tasks, - timeout=self.config.timeout, - impersonate=self.config.impersonate, - verify_ssl=self.config.verify_ssl, - allow_redirects=self.config.allow_redirects, - proxy=self.config.proxy, - ) as naminter: - if self.config.self_check: - results = await self._run_self_check(naminter) - else: - results = await self._run_check(naminter) - - filtered_results = [r for r in results if self._should_include_result(r)] - - if self.config.export_formats: - export_manager = Exporter(self.config.usernames or [], __version__) - export_manager.export(filtered_results, self.config.export_formats) - except KeyboardInterrupt: - display_warning("Operation interrupted") - raise typer.Exit(1) - except asyncio.TimeoutError: - display_error("Operation timed out") - raise typer.Exit(1) - except Exception as e: - display_error(f"Unexpected error: {e}") - raise typer.Exit(1) + filtered_results = [r for r in results if self._should_include_result(r)] + + if self.config.export_formats: + export_manager = Exporter(self.config.usernames or [], __version__) + export_manager.export(filtered_results, self.config.export_formats) async def _run_check(self, naminter: Naminter) -> List[SiteResult]: """Run the username check functionality.""" @@ -273,7 +262,9 @@ def _should_include_result(self, result: Union[SiteResult, SelfCheckResult]) -> return True elif self.config.filter_unknown and status == ResultStatus.UNKNOWN: return True - elif not any([self.config.filter_errors, self.config.filter_not_found, self.config.filter_unknown]): + elif self.config.filter_ambiguous and status == ResultStatus.AMBIGUOUS: + return True + elif not any([self.config.filter_errors, self.config.filter_not_found, self.config.filter_unknown, self.config.filter_ambiguous]): return status == ResultStatus.FOUND return False @@ -313,14 +304,18 @@ async def _process_result(self, result: SiteResult) -> Optional[Path]: return response_file -@app.callback(invoke_without_command=True) +def version_callback(value: bool): + """Callback to handle version display.""" + if value: + display_version() + raise typer.Exit() + def main( usernames: Optional[List[str]] = typer.Option(None, "--username", "-u", help="Username(s) to search for across social media platforms", show_default=False), site_names: Optional[List[str]] = typer.Option(None, "--site", "-s", help="Specific site name(s) to check (e.g., 'GitHub', 'Twitter')", show_default=False), - version: bool = typer.Option(False, "--version", help="Display version information and exit"), + version: Annotated[Optional[bool], typer.Option("--version", help="Show version information and exit", callback=version_callback, is_eager=True)] = None, no_color: bool = typer.Option(False, "--no-color", help="Disable colored console output"), no_progressbar: bool = typer.Option(False, "--no-progressbar", help="Disable progress bar during execution"), - ctx: typer.Context = None, # Input lists local_list: Optional[List[Path]] = typer.Option( @@ -388,19 +383,13 @@ def main( filter_errors: bool = typer.Option(False, "--filter-errors", help="Show only error results in console output and exports"), filter_not_found: bool = typer.Option(False, "--filter-not-found", help="Show only not found results in console output and exports"), filter_unknown: bool = typer.Option(False, "--filter-unknown", help="Show only unknown results in console output and exports"), + filter_ambiguous: bool = typer.Option(False, "--filter-ambiguous", help="Show only ambiguous results in console output and exports"), ) -> None: """Main CLI entry point.""" - if ctx and ctx.invoked_subcommand: - return - if no_color: console.no_color = True - if version: - display_version(__version__, __author__, __description__) - raise typer.Exit() - try: config = NaminterConfig( usernames=usernames, @@ -439,6 +428,7 @@ def main( filter_errors=filter_errors, filter_not_found=filter_not_found, filter_unknown=filter_unknown, + filter_ambiguous=filter_ambiguous, no_progressbar=no_progressbar, ) @@ -472,7 +462,7 @@ def main( def entry_point() -> None: """Entry point for the application.""" - app() + typer.run(main) if __name__ == "__main__": entry_point() \ No newline at end of file diff --git a/naminter/cli/progress.py b/naminter/cli/progress.py index c3a167a..71d648c 100644 --- a/naminter/cli/progress.py +++ b/naminter/cli/progress.py @@ -51,6 +51,7 @@ def get_progress_text(self) -> str: unknown = self.status_counts[ResultStatus.UNKNOWN] errors = self.status_counts[ResultStatus.ERROR] not_valid = self.status_counts[ResultStatus.NOT_VALID] + ambiguous = self.status_counts[ResultStatus.AMBIGUOUS] sections = [ f"[{THEME['primary']}]{rate:.1f} req/s[/]", @@ -60,6 +61,8 @@ def get_progress_text(self) -> str: if unknown > 0: sections.append(f"[{THEME['warning']}]? {unknown}[/]") + if ambiguous > 0: + sections.append(f"[{THEME['warning']}]* {ambiguous}[/]") if errors > 0: sections.append(f"[{THEME['error']}]! {errors}[/]") if not_valid > 0: diff --git a/naminter/core/main.py b/naminter/core/main.py index a62a2de..0beb1e9 100644 --- a/naminter/core/main.py +++ b/naminter/core/main.py @@ -17,6 +17,13 @@ ValidationError, ConcurrencyError, ) +from ..core.utils import ( + validate_wmn_data, + validate_numeric_values, + configure_proxy, + validate_usernames, + filter_sites, +) from ..core.constants import ( HTTP_REQUEST_TIMEOUT_SECONDS, HTTP_SSL_VERIFY, @@ -55,7 +62,7 @@ def __init__( self._logger.addHandler(logging.NullHandler()) self._logger.info( - "Naminter initializing: max_tasks=%d, timeout=%ds, browser=%s, ssl_verify=%s, allow_redirects=%s, proxy=%s", + "Initializing Naminter with configuration: max_tasks=%d, timeout=%ds, browser=%s, ssl_verify=%s, allow_redirects=%s, proxy=%s", max_tasks, timeout, impersonate, verify_ssl, allow_redirects, bool(proxy) ) @@ -64,195 +71,46 @@ def __init__( self.impersonate = impersonate if impersonate is not None else BROWSER_IMPERSONATE_AGENT self.verify_ssl = verify_ssl if verify_ssl is not None else HTTP_SSL_VERIFY self.allow_redirects = allow_redirects if allow_redirects is not None else HTTP_ALLOW_REDIRECTS - self.proxy = self._configure_proxy(proxy) + self.proxy = configure_proxy(proxy) - self._validate_numeric_values(self.max_tasks, self.timeout) - self._validate_schema(wmn_data, wmn_schema) + validate_numeric_values(self.max_tasks, self.timeout) + validate_wmn_data(wmn_data, wmn_schema) self._wmn_data = wmn_data self._wmn_schema = wmn_schema - - try: - self._semaphore = asyncio.Semaphore(self.max_tasks) - self._logger.debug("Semaphore created with max_tasks=%d", self.max_tasks) - except Exception as e: - self._logger.critical("Failed to create semaphore: %s", e) - raise ConcurrencyError(f"Failed to create semaphore with max_tasks={self.max_tasks}: {e}") from e - + self._semaphore = asyncio.Semaphore(self.max_tasks) self._session: Optional[AsyncSession] = None sites_count = len(self._wmn_data.get("sites", [])) if self._wmn_data else 0 self._logger.info( - "Naminter ready. Sites: %d, Max tasks: %d, Timeout: %ds, Browser: %s, SSL verify: %s, Proxy: %s", + "Naminter initialized successfully: loaded %d sites, max_tasks=%d, timeout=%ds, browser=%s, ssl_verify=%s, proxy=%s", sites_count, self.max_tasks, self.timeout, self.impersonate, self.verify_ssl, bool(self.proxy) ) - def _validate_schema(self, data: Dict[str, Any], schema: Optional[Dict[str, Any]]) -> None: - """Validate WMN data against schema.""" - if not data: - self._logger.error("No WMN data provided during initialization.") - raise DataError("No WMN data provided during initialization.") - - if schema: - try: - self._logger.debug("Validating WMN data against schema.") - jsonschema.validate(instance=data, schema=schema) - self._logger.info("WMN data validated successfully against schema.") - except jsonschema.ValidationError as e: - self._logger.error("WMN data does not match schema: %s", e.message) - raise SchemaValidationError(f"WMN data does not match schema: {e.message}") from e - except jsonschema.SchemaError as e: - self._logger.error("Invalid WMN schema: %s", e.message) - raise SchemaValidationError(f"Invalid WMN schema: {e.message}") from e - else: - self._logger.warning("WMN data provided without schema. Skipping validation.") - - def _validate_numeric_values(self, max_tasks: int, timeout: int) -> None: - """Validate numeric configuration values.""" - self._logger.debug("Validating numeric values: max_tasks=%d, timeout=%d", max_tasks, timeout) - if not (MIN_TASKS <= max_tasks <= MAX_TASKS_LIMIT): - self._logger.error("max_tasks out of range: %d not in [%d-%d]", max_tasks, MIN_TASKS, MAX_TASKS_LIMIT) - raise ConfigurationError(f"Invalid max_tasks: {max_tasks} must be between {MIN_TASKS} and {MAX_TASKS_LIMIT}") - if not (MIN_TIMEOUT <= timeout <= MAX_TIMEOUT): - self._logger.error("timeout out of range: %d not in [%d-%d]", timeout, MIN_TIMEOUT, MAX_TIMEOUT) - raise ConfigurationError(f"Invalid timeout: {timeout} must be between {MIN_TIMEOUT} and {MAX_TIMEOUT} seconds") - - if max_tasks > HIGH_CONCURRENCY_THRESHOLD and timeout < HIGH_CONCURRENCY_MIN_TIMEOUT: - self._logger.warning( - "High concurrency (%d tasks) with low timeout (%ds) may cause failures. Increase timeout or reduce max_tasks.", - max_tasks, timeout - ) - elif max_tasks > VERY_HIGH_CONCURRENCY_THRESHOLD and timeout < VERY_HIGH_CONCURRENCY_MIN_TIMEOUT: - self._logger.warning( - "Very high concurrency (%d tasks) with very low timeout (%ds) may cause connection issues. Recommend timeout >= %ds for max_tasks > %d.", - max_tasks, timeout, HIGH_CONCURRENCY_MIN_TIMEOUT, VERY_HIGH_CONCURRENCY_THRESHOLD - ) - - if max_tasks > EXTREME_CONCURRENCY_THRESHOLD: - self._logger.warning( - "Extremely high concurrency (%d tasks) may overwhelm servers or cause rate limiting. Lower value recommended.", - max_tasks - ) - - if timeout < LOW_TIMEOUT_WARNING_THRESHOLD: - self._logger.warning( - "Very low timeout (%ds) may cause legitimate requests to fail. Increase timeout for better accuracy.", - timeout - ) - - self._logger.debug("Numeric configuration validation successful.") - - def _configure_proxy(self, proxy: Optional[Union[str, Dict[str, str]]]) -> Optional[Dict[str, str]]: - """Validate and configure proxy settings.""" - if proxy is None: - self._logger.debug("No proxy configuration provided.") - return None - - self._logger.debug("Validating and configuring proxy: %s", type(proxy).__name__) - if isinstance(proxy, str): - if not proxy.strip(): - self._logger.error("Proxy validation failed: empty string.") - raise ConfigurationError("Invalid proxy: proxy string cannot be empty") - if not (proxy.startswith('http://') or proxy.startswith('https://') or proxy.startswith('socks5://')): - self._logger.error("Proxy validation failed: invalid protocol in '%s'", proxy) - raise ConfigurationError("Invalid proxy: must be http://, https://, or socks5:// URL") - self._logger.info("Proxy string validated and configured successfully.") - return {"http": proxy, "https": proxy} - elif isinstance(proxy, dict): - for protocol, proxy_url in proxy.items(): - if protocol not in ['http', 'https']: - self._logger.error("Proxy validation failed: invalid protocol '%s' in dict.", protocol) - raise ConfigurationError(f"Invalid proxy protocol: {protocol}") - if not isinstance(proxy_url, str) or not proxy_url.strip(): - self._logger.error("Proxy validation failed: empty or invalid URL for protocol '%s'.", protocol) - raise ConfigurationError(f"Invalid proxy URL for {protocol}: must be non-empty string") - self._logger.info("Proxy dict validated and configured successfully.") - return proxy - else: - self._logger.error("Proxy validation failed: not a string or dict. Value: %r", proxy) - raise ConfigurationError("Invalid proxy: must be string or dict") - - def _validate_usernames(self, usernames: List[str]) -> List[str]: - """Validate and deduplicate usernames.""" - self._logger.debug("Validating and deduplicating usernames: %r", usernames) - unique_usernames = list({u.strip() for u in usernames if isinstance(u, str) and u.strip()}) - if not unique_usernames: - self._logger.error("No valid usernames provided after validation.") - raise ValidationError("No valid usernames provided") - self._logger.info("Validated usernames: %r", unique_usernames) - return unique_usernames - - async def __aenter__(self) -> 'Naminter': - """Async context manager entry.""" - self._logger.debug("Entering async context manager for Naminter.") - try: - self._session = await self._create_session() - if self._session is None: - self._logger.critical("Session creation failed: No session returned in __aenter__.") - raise SessionError("Session creation failed: No session returned") - self._logger.info("Async context manager entry successful.") - return self - except Exception as e: - self._logger.error("Failed to enter async context: %s", e, exc_info=True) - if self._session: - try: - await self._session.close() - except Exception: - pass - self._session = None - raise SessionError(f"Failed to initialize session: {str(e)}") from e + async def __aenter__(self) -> "Naminter": + self._session = AsyncSession( + impersonate=self.impersonate, + verify=self.verify_ssl, + timeout=self.timeout, + allow_redirects=self.allow_redirects, + proxies=self.proxy, + ) + return self async def __aexit__(self, exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Optional[Any]) -> None: """Async context manager exit.""" - self._logger.debug("Exiting async context manager for Naminter.") if self._session: try: await self._session.close() - self._logger.info("Session closed successfully during context exit.") + self._logger.info("HTTP session closed successfully.") except Exception as e: self._logger.warning("Error closing session during cleanup: %s", e, exc_info=True) finally: self._session = None - else: - self._logger.debug("No session to close on context exit.") - - async def _create_session(self) -> AsyncSession: - """Create and configure an asynchronous HTTP session.""" - try: - self._logger.debug("Creating AsyncSession (impersonate=%s, verify_ssl=%s, timeout=%d, allow_redirects=%s, proxies=%s)", - self.impersonate, self.verify_ssl, self.timeout, self.allow_redirects, self.proxy) - session = AsyncSession( - impersonate=self.impersonate, - verify=self.verify_ssl, - timeout=self.timeout, - allow_redirects=self.allow_redirects, - proxies=self.proxy, - ) - self._logger.info("AsyncSession created successfully.") - return session - except Exception as e: - self._logger.critical("Failed to create session: %s", e, exc_info=True) - raise SessionError(f"Failed to create session: {e}") from e - - async def _close_session(self) -> None: - """Helper method to close the session safely.""" - if self._session: - try: - self._logger.debug("Closing session.") - await self._session.close() - self._session = None - self._logger.info("Session closed successfully.") - except Exception as e: - self._logger.error("Error closing session: %s", e, exc_info=True) - self._session = None - raise NetworkError(f"Failed to close session: {e}") from e - else: - self._logger.debug("No session to close.") async def get_wmn_info(self) -> Dict[str, Any]: """Get WMN metadata information.""" - self._logger.debug("Retrieving WMN metadata information.") try: info = { "license": self._wmn_data.get("license", []), @@ -260,7 +118,7 @@ async def get_wmn_info(self) -> Dict[str, Any]: "categories": list(set(site.get("cat", "") for site in self._wmn_data.get("sites", []))), "sites_count": len(self._wmn_data.get("sites", [])) } - self._logger.info("WMN metadata retrieved: %d sites, %d categories.", + self._logger.info("Retrieved WMN metadata: %d sites across %d categories", info["sites_count"], len(info["categories"])) return info except Exception as e: @@ -269,16 +127,14 @@ async def get_wmn_info(self) -> Dict[str, Any]: def list_sites(self) -> List[str]: """List all site names.""" - self._logger.debug("Listing all site names.") sites = [site.get("name", "") for site in self._wmn_data.get("sites", [])] - self._logger.info("Found %d sites.", len(sites)) + self._logger.info("Retrieved %d site names from WMN data", len(sites)) return sites def list_categories(self) -> List[str]: """List all unique categories.""" - self._logger.debug("Listing all unique categories.") category_list = sorted({site.get("cat") for site in self._wmn_data.get("sites", []) if site.get("cat")}) - self._logger.info("Found %d unique categories.", len(category_list)) + self._logger.info("Retrieved %d unique categories from WMN data", len(category_list)) return category_list async def check_site( @@ -296,7 +152,7 @@ async def check_site( m_code, m_string = site.get("m_code"), site.get("m_string") if not site_name: - self._logger.error("Site missing required field: name. Site: %r", site) + self._logger.error("Site configuration missing required 'name' field: %r", site) return SiteResult( site_name="", category=category, @@ -306,7 +162,7 @@ async def check_site( ) if not category: - self._logger.error("Site '%s' missing required field: cat", site_name) + self._logger.error("Site '%s' missing required 'cat' field", site_name) return SiteResult( site_name=site_name, category=category, @@ -316,7 +172,7 @@ async def check_site( ) if not uri_check_template: - self._logger.error("Site '%s' missing required field: uri_check", site_name) + self._logger.error("Site '%s' missing required 'uri_check' field", site_name) return SiteResult( site_name=site_name, category=category, @@ -327,9 +183,7 @@ async def check_site( has_placeholder = ACCOUNT_PLACEHOLDER in uri_check_template or (post_body_template and ACCOUNT_PLACEHOLDER in post_body_template) if not has_placeholder: - error_msg = f"Site '{site_name}' missing {ACCOUNT_PLACEHOLDER} placeholder" - self._logger.error(error_msg) - return SiteResult(site_name, category, username, ResultStatus.ERROR, error=error_msg) + return SiteResult(site_name, category, username, ResultStatus.ERROR, error=f"Site '{site_name}' missing {ACCOUNT_PLACEHOLDER} placeholder") matchers = { 'e_code': e_code, @@ -341,7 +195,7 @@ async def check_site( if fuzzy_mode: if all(val is None for val in matchers.values()): self._logger.error( - "Site '%s' must define at least one of e_code, e_string, m_code, or m_string in fuzzy mode", + "Site '%s' must define at least one matcher (e_code, e_string, m_code, or m_string) for fuzzy mode", site_name ) return SiteResult( @@ -355,7 +209,7 @@ async def check_site( missing = [name for name, val in matchers.items() if val is None] if missing: self._logger.error( - "Site '%s' missing required matchers in strict mode: %s", + "Site '%s' missing required matchers for strict mode: %s", site_name, missing ) return SiteResult( @@ -368,14 +222,13 @@ async def check_site( clean_username = username.translate(str.maketrans("", "", site.get("strip_bad_char", ""))) if not clean_username: - error_msg = f"Username '{username}' became empty after character stripping" - self._logger.error(error_msg) - return SiteResult(site_name, category, username, ResultStatus.ERROR, error=error_msg) + return SiteResult(site_name, category, username, ResultStatus.ERROR, error=f"Username '{username}' became empty after character stripping") uri_check = uri_check_template.replace(ACCOUNT_PLACEHOLDER, clean_username) uri_pretty = site.get("uri_pretty", uri_check_template).replace(ACCOUNT_PLACEHOLDER, clean_username) - self._logger.info("Checking site '%s' for username '%s' (fuzzy_mode=%s)", site_name, username, fuzzy_mode) + self._logger.info("Checking site '%s' (category: %s) for username '%s' in %s mode", + site_name, category, username, "fuzzy" if fuzzy_mode else "strict") try: async with self._semaphore: @@ -385,19 +238,19 @@ async def check_site( if post_body: post_body = post_body.replace(ACCOUNT_PLACEHOLDER, clean_username) - self._logger.debug("POST %s, body: %.100s, headers: %r", uri_check, post_body, headers) + self._logger.debug("Making POST request to %s with body: %.100s", uri_check, post_body) response = await self._session.post(uri_check, headers=headers, data=post_body) else: - self._logger.debug("GET %s, headers: %r", uri_check, headers) + self._logger.debug("Making GET request to %s", uri_check) response = await self._session.get(uri_check, headers=headers) elapsed = time.monotonic() - start_time - self._logger.info("Request to '%s' completed in %.2fs (status %d)", site_name, elapsed, response.status_code) + self._logger.info("Request to '%s' completed in %.2fs with status %d", site_name, elapsed, response.status_code) except asyncio.CancelledError: - self._logger.warning("Request to '%s' was cancelled.", site_name) + self._logger.warning("Request to '%s' was cancelled", site_name) raise except RequestsError as e: - self._logger.warning("Network error checking '%s': %s", site_name, e, exc_info=True) + self._logger.warning("Network error while checking '%s': %s", site_name, e, exc_info=True) return SiteResult( site_name=site_name, category=category, @@ -407,7 +260,7 @@ async def check_site( error=f"Network error: {e}", ) except Exception as e: - self._logger.error("Unexpected error checking '%s': %s", site_name, e, exc_info=True) + self._logger.error("Unexpected error while checking '%s': %s", site_name, e, exc_info=True) return SiteResult( site_name=site_name, category=category, @@ -420,7 +273,7 @@ async def check_site( response_text = response.text response_code = response.status_code - result_status = SiteResult.determine_result_status( + result_status = SiteResult.get_result_status( response_code=response_code, response_text=response_text, e_code=e_code, @@ -431,12 +284,12 @@ async def check_site( ) self._logger.debug( - "[%s] Result: %s, Code: %s, Time: %.2fs, Mode: %s", + "Site '%s' result: %s (HTTP %d) in %.2fs (%s mode)", site_name, result_status.name, response_code, elapsed, - "fuzzy" if fuzzy_mode else "full", + "fuzzy" if fuzzy_mode else "strict", ) return SiteResult( @@ -458,39 +311,25 @@ async def check_usernames( as_generator: bool = False, ) -> Union[List[SiteResult], AsyncGenerator[SiteResult, None]]: """Check one or multiple usernames across all loaded sites.""" - usernames = self._validate_usernames(usernames) - self._logger.info("Checking %d username(s): %s", len(usernames), usernames) + usernames = validate_usernames(usernames) + self._logger.info("Starting username enumeration for %d username(s): %s", len(usernames), usernames) - sites = self._wmn_data.get("sites") - - if site_names: - site_names_set = set(site_names) - available_sites_set = set(site.get("name", "") for site in sites) - missing_sites = [name for name in site_names if name not in available_sites_set] - if missing_sites: - self._logger.error("Site names not found in WMN data: %s", missing_sites) - raise DataError(f"Site names not found in WMN data: {missing_sites}") - sites = [site for site in sites if site.get("name", "") in site_names_set] - self._logger.info("Filtered sites to only include: %s", site_names) - - self._logger.info("Checking against %d sites in %s mode.", len(sites), "fuzzy" if fuzzy_mode else "full") + sites = await filter_sites(site_names, self._wmn_data.get("sites", [])) + self._logger.info("Will check against %d sites in %s mode", len(sites), "fuzzy" if fuzzy_mode else "strict") tasks: List[Coroutine[Any, Any, SiteResult]] = [ self.check_site(site, username, fuzzy_mode) for site in sites for username in usernames ] - self._logger.debug("Created %d check tasks.", len(tasks)) async def generate_results() -> AsyncGenerator[SiteResult, None]: for task in asyncio.as_completed(tasks): yield await task if as_generator: - self._logger.info("Returning username check results as async generator.") return generate_results() results = await asyncio.gather(*tasks) - self._logger.info("Username check complete. Generated %d results.", len(results)) return results async def self_check( @@ -500,19 +339,9 @@ async def self_check( as_generator: bool = False, ) -> Union[List[SelfCheckResult], AsyncGenerator[SelfCheckResult, None]]: """Run self-checks using known accounts for each site.""" - sites = self._wmn_data.get("sites", []) if isinstance(self._wmn_data, dict) else [] - - if site_names: - site_names_set = set(site_names) - available_sites_set = set(site.get("name", "") for site in sites) - missing_sites = [name for name in site_names if name not in available_sites_set] - if missing_sites: - self._logger.error("Site names not found in WMN data: %s", missing_sites) - raise DataError(f"Site names not found in WMN data: {missing_sites}") - sites = [site for site in sites if site.get("name", "") in site_names_set] - self._logger.info("Filtered sites to only include: %s", site_names) + sites = await filter_sites(site_names, self._wmn_data.get("sites", [])) - self._logger.info("Starting self-check for %d sites (fuzzy_mode=%s)", len(sites), fuzzy_mode) + self._logger.info("Starting self-check validation for %d sites in %s mode", len(sites), "fuzzy" if fuzzy_mode else "strict") async def _check_known(site: Dict[str, Any]) -> SelfCheckResult: """Helper function to check a site with all its known users.""" @@ -521,7 +350,7 @@ async def _check_known(site: Dict[str, Any]) -> SelfCheckResult: known = site.get("known") if not site_name: - self._logger.error("Site missing required field: name. Site: %r", site) + self._logger.error("Site configuration missing required 'name' field for self-check: %r", site) return SelfCheckResult( site_name=site_name, category=category, @@ -530,7 +359,7 @@ async def _check_known(site: Dict[str, Any]) -> SelfCheckResult: ) if not category: - self._logger.error("Site '%s' missing required field: cat", site_name) + self._logger.error("Site '%s' missing required 'cat' field for self-check", site_name) return SelfCheckResult( site_name=site_name, category=category, @@ -539,7 +368,7 @@ async def _check_known(site: Dict[str, Any]) -> SelfCheckResult: ) if known is None: - self._logger.error("Site '%s' missing required field: known.", site_name) + self._logger.error("Site '%s' missing required 'known' field for self-check", site_name) return SelfCheckResult( site_name=site_name, category=category, @@ -547,14 +376,12 @@ async def _check_known(site: Dict[str, Any]) -> SelfCheckResult: error=f"Site '{site_name}' missing required field: known" ) - self._logger.info("Self-checking site '%s' (category: %s) with %d known accounts.", site_name, category, len(known)) + self._logger.info("Self-checking site '%s' (category: %s) with %d known accounts", site_name, category, len(known)) try: tasks = [self.check_site(site, username, fuzzy_mode) for username in known] - self._logger.debug("Created %d self-check tasks for site '%s'", len(tasks), site_name) site_results = await asyncio.gather(*tasks) - self._logger.info("Self-check completed for site '%s': %d test results.", site_name, len(site_results)) return SelfCheckResult( site_name=site_name, category=category, @@ -572,16 +399,13 @@ async def _check_known(site: Dict[str, Any]) -> SelfCheckResult: tasks: List[Coroutine[Any, Any, SelfCheckResult]] = [ _check_known(site) for site in sites if isinstance(site, dict) ] - self._logger.debug("Created %d self-check tasks for all sites.", len(tasks)) async def generate_results() -> AsyncGenerator[SelfCheckResult, None]: for task in asyncio.as_completed(tasks): yield await task if as_generator: - self._logger.info("Returning self-check results as async generator.") return generate_results() results = await asyncio.gather(*tasks) - self._logger.info("Self-check complete. Results generated for %d sites.", len(results)) return results \ No newline at end of file diff --git a/naminter/core/models.py b/naminter/core/models.py index 6dc8c08..a347027 100644 --- a/naminter/core/models.py +++ b/naminter/core/models.py @@ -3,53 +3,14 @@ from typing import Optional, Dict, Any, List, Union, Set from datetime import datetime -def serialize_datetime(dt: Optional[datetime]) -> Optional[str]: - """Convert datetime to ISO format string.""" - if dt is None: - return None - if not isinstance(dt, datetime): - raise ValueError(f"Input must be a datetime object, got {type(dt)}") - return dt.isoformat() - -def deserialize_datetime(dt_str: Union[str, datetime, None]) -> Optional[datetime]: - """Convert ISO string to datetime.""" - if dt_str is None: - return None - - if isinstance(dt_str, datetime): - return dt_str - - if not isinstance(dt_str, str): - raise ValueError(f"Datetime input must be string, datetime, or None, got {type(dt_str)}") - - dt_str = dt_str.strip() - if not dt_str: - return None - - try: - return datetime.fromisoformat(dt_str) - except ValueError as e: - raise ValueError(f"Invalid datetime format '{dt_str}': {e}") from e - class ResultStatus(Enum): """Status of username search results.""" FOUND = "found" NOT_FOUND = "not_found" ERROR = "error" UNKNOWN = "unknown" + AMBIGUOUS = "ambiguous" NOT_VALID = "not_valid" - - def __str__(self) -> str: - """Return status as string.""" - return self.value - - @classmethod - def from_string(cls, value: str) -> "ResultStatus": - """Create ResultStatus from string.""" - try: - return cls(value.lower().strip()) - except ValueError as e: - raise ValueError(f"Invalid result status: '{value}'") from e class BrowserImpersonation(str, Enum): """Browser impersonation options.""" @@ -61,10 +22,6 @@ class BrowserImpersonation(str, Enum): EDGE = "edge" FIREFOX = "firefox" - def __str__(self) -> str: - """Return browser impersonation as string.""" - return self.value - @dataclass class SiteResult: """Result of testing a username on a site.""" @@ -80,24 +37,15 @@ class SiteResult: created_at: datetime = field(default_factory=datetime.now) def __post_init__(self) -> None: - """Validate fields after initialization.""" - self._validate_string_field('site_name', self.site_name) - self._validate_string_field('category', self.category) - self._validate_string_field('username', self.username) - if self.result_url is not None: - self._validate_string_field('result_url', self.result_url) - - if not isinstance(self.result_status, ResultStatus): - raise ValueError("result_status must be a valid ResultStatus") - - if self.response_code is not None and (not isinstance(self.response_code, int) or self.response_code < 0): - raise ValueError("response_code must be a non-negative integer") + """Validate numeric fields after initialization.""" + if self.response_code is not None and self.response_code < 0: + raise ValueError("response_code must be non-negative") - if self.elapsed is not None and (not isinstance(self.elapsed, (int, float)) or self.elapsed < 0): - raise ValueError("elapsed must be a non-negative number") + if self.elapsed is not None and self.elapsed < 0: + raise ValueError("elapsed must be non-negative") @classmethod - def determine_result_status( + def get_result_status( cls, response_code: int, response_text: str, @@ -107,51 +55,42 @@ def determine_result_status( m_string: Optional[str] = None, fuzzy_mode: bool = False, ) -> ResultStatus: - """Determine result status from response data.""" - exists_status_matches = e_code is not None and response_code == e_code - exists_string_matches = bool(e_string and e_string in response_text) - not_exists_status_matches = m_code is not None and response_code == m_code - not_exists_string_matches = bool(m_string and m_string in response_text) + condition_found = False + condition_not_found = False if fuzzy_mode: - condition_found = exists_status_matches or exists_string_matches - condition_not_found = not_exists_status_matches or not_exists_string_matches + condition_found = (e_code is not None and response_code == e_code) or (e_string and e_string in response_text) + condition_not_found = (m_code is not None and response_code == m_code) or (m_string and m_string in response_text) else: condition_found = ( - (e_code is not None and e_string and exists_status_matches and exists_string_matches) or - (e_code is not None and not e_string and exists_status_matches) or - (e_code is None and e_string and exists_string_matches) + (e_code is None or response_code == e_code) and + (e_string is None or e_string in response_text) and + (e_code is not None or e_string is not None) ) + condition_not_found = ( - (m_code is not None and m_string and not_exists_status_matches and not_exists_string_matches) or - (m_code is not None and not m_string and not_exists_status_matches) or - (m_code is None and m_string and not_exists_string_matches) + (m_code is None or response_code == m_code) and + (m_string is None or m_string in response_text) and + (m_code is not None or m_string is not None) ) - return ( - ResultStatus.FOUND if condition_found else - ResultStatus.NOT_FOUND if condition_not_found else - ResultStatus.UNKNOWN - ) - - def _validate_string_field(self, field_name: str, value: Any) -> None: - """Validate a non-empty string field.""" - if not isinstance(value, str) or not value.strip(): - raise ValueError(f"{field_name} must be a non-empty string") + if condition_found and condition_not_found: + return ResultStatus.AMBIGUOUS + elif condition_found: + return ResultStatus.FOUND + elif condition_not_found: + return ResultStatus.NOT_FOUND + else: + return ResultStatus.UNKNOWN def to_dict(self, exclude_response_text: bool = False) -> Dict[str, Any]: """Convert SiteResult to dict.""" - try: - result = asdict(self) - result['result_status'] = self.result_status.value - result['created_at'] = serialize_datetime(self.created_at) - - if exclude_response_text: - result.pop('response_text', None) - - return result - except Exception as e: - raise ValueError(f"Failed to serialize SiteResult: {e}") from e + result = asdict(self) + result['result_status'] = self.result_status.value + result['created_at'] = self.created_at.isoformat() + if exclude_response_text: + result.pop('response_text', None) + return result @dataclass class SelfCheckResult: @@ -164,29 +103,14 @@ class SelfCheckResult: created_at: datetime = field(default_factory=datetime.now) def __post_init__(self) -> None: - """Validate fields after initialization.""" - self._validate_string_field('site_name', self.site_name) - self._validate_string_field('category', self.category) - - if not isinstance(self.results, list): - raise ValueError("results must be a list") - - for i, result in enumerate(self.results): - if not isinstance(result, SiteResult): - raise ValueError(f"results[{i}] must be a SiteResult instance") - - self.overall_status = self._determine_overall_status() - - if self.error: - self.overall_status = ResultStatus.ERROR - - def _validate_string_field(self, field_name: str, value: Any) -> None: - """Validate a non-empty string field.""" - if not isinstance(value, str) or not value.strip(): - raise ValueError(f"{field_name} must be a non-empty string") + """Calculate overall status from results.""" + self.overall_status = self._get_overall_status() - def _determine_overall_status(self) -> ResultStatus: + def _get_overall_status(self) -> ResultStatus: """Determine overall status from results.""" + if self.error: + return ResultStatus.ERROR + if not self.results: return ResultStatus.UNKNOWN @@ -202,18 +126,15 @@ def _determine_overall_status(self) -> ResultStatus: return ResultStatus.UNKNOWN return next(iter(statuses)) - + def to_dict(self, exclude_response_text: bool = False) -> Dict[str, Any]: """Convert SelfCheckResult to dict.""" - try: - return { - 'site_name': self.site_name, - 'category': self.category, - 'overall_status': self.overall_status.value, - 'results': [result.to_dict(exclude_response_text=exclude_response_text) for result in self.results], - 'created_at': serialize_datetime(self.created_at), - 'error': self.error, - } - except Exception as e: - raise ValueError(f"Failed to serialize SelfCheckResult: {e}") from e + return { + 'site_name': self.site_name, + 'category': self.category, + 'overall_status': self.overall_status.value, + 'results': [result.to_dict(exclude_response_text=exclude_response_text) for result in self.results], + 'created_at': self.created_at.isoformat(), + 'error': self.error, + } diff --git a/naminter/core/utils.py b/naminter/core/utils.py new file mode 100644 index 0000000..2f32241 --- /dev/null +++ b/naminter/core/utils.py @@ -0,0 +1,155 @@ +import logging +from typing import Any, Dict, List, Optional, Union, Set + +import jsonschema + +from .exceptions import ( + ConfigurationError, + DataError, + SchemaValidationError, + ValidationError, +) +from .constants import ( + MIN_TASKS, + MAX_TASKS_LIMIT, + MIN_TIMEOUT, + MAX_TIMEOUT, + HIGH_CONCURRENCY_THRESHOLD, + HIGH_CONCURRENCY_MIN_TIMEOUT, + VERY_HIGH_CONCURRENCY_THRESHOLD, + VERY_HIGH_CONCURRENCY_MIN_TIMEOUT, + EXTREME_CONCURRENCY_THRESHOLD, + LOW_TIMEOUT_WARNING_THRESHOLD, +) + +logger = logging.getLogger(__name__) + + +def validate_wmn_data(data: Dict[str, Any], schema: Optional[Dict[str, Any]]) -> None: + """Validate WMN data against schema.""" + if not data: + logger.error("No WMN data provided during initialization.") + raise DataError("No WMN data provided during initialization.") + + if schema: + try: + jsonschema.validate(instance=data, schema=schema) + logger.info("WMN data validation successful") + except jsonschema.ValidationError as e: + logger.error(f"WMN data does not match schema: {e.message}") + raise SchemaValidationError(f"WMN data does not match schema: {e.message}") from e + except jsonschema.SchemaError as e: + logger.error(f"Invalid WMN schema: {e.message}") + raise SchemaValidationError(f"Invalid WMN schema: {e.message}") from e + else: + logger.warning("No schema provided - skipping WMN data validation") + + +def validate_numeric_values(max_tasks: int, timeout: int) -> None: + """Validate numeric configuration values for max_tasks and timeout.""" + logger.debug(f"Validating numeric values: max_tasks={max_tasks}, timeout={timeout}") + + if not (MIN_TASKS <= max_tasks <= MAX_TASKS_LIMIT): + logger.error(f"max_tasks out of range: {max_tasks} not in [{MIN_TASKS}-{MAX_TASKS_LIMIT}]") + raise ConfigurationError(f"Invalid max_tasks: {max_tasks} must be between {MIN_TASKS} and {MAX_TASKS_LIMIT}") + + if not (MIN_TIMEOUT <= timeout <= MAX_TIMEOUT): + logger.error(f"timeout out of range: {timeout} not in [{MIN_TIMEOUT}-{MAX_TIMEOUT}]") + raise ConfigurationError(f"Invalid timeout: {timeout} must be between {MIN_TIMEOUT} and {MAX_TIMEOUT} seconds") + + if max_tasks > HIGH_CONCURRENCY_THRESHOLD and timeout < HIGH_CONCURRENCY_MIN_TIMEOUT: + logger.warning( + f"High concurrency ({max_tasks} tasks) with low timeout ({timeout}s) may cause failures - consider increasing timeout or reducing max_tasks" + ) + elif max_tasks > VERY_HIGH_CONCURRENCY_THRESHOLD and timeout < VERY_HIGH_CONCURRENCY_MIN_TIMEOUT: + logger.warning( + f"Very high concurrency ({max_tasks} tasks) with very low timeout ({timeout}s) may cause connection issues - recommend timeout >= {HIGH_CONCURRENCY_MIN_TIMEOUT}s for max_tasks > {VERY_HIGH_CONCURRENCY_THRESHOLD}" + ) + + if max_tasks > EXTREME_CONCURRENCY_THRESHOLD: + logger.warning( + f"Extremely high concurrency ({max_tasks} tasks) may overwhelm servers or cause rate limiting - lower value recommended" + ) + + if timeout < LOW_TIMEOUT_WARNING_THRESHOLD: + logger.warning( + f"Very low timeout ({timeout}s) may cause legitimate requests to fail - increase timeout for better accuracy" + ) + + +def configure_proxy(proxy: Optional[Union[str, Dict[str, str]]]) -> Optional[Dict[str, str]]: + """Validate and configure proxy settings.""" + if proxy is None: + return None + + if isinstance(proxy, str): + if not proxy.strip(): + logger.error("Proxy validation failed: empty string.") + raise ConfigurationError("Invalid proxy: proxy string cannot be empty") + + if not (proxy.startswith('http://') or proxy.startswith('https://') or proxy.startswith('socks5://')): + logger.error(f"Proxy validation failed: invalid protocol in '{proxy}'") + raise ConfigurationError("Invalid proxy: must be http://, https://, or socks5:// URL") + + logger.info("Proxy configuration validated successfully") + return {"http": proxy, "https": proxy} + + elif isinstance(proxy, dict): + for protocol, proxy_url in proxy.items(): + if protocol not in ['http', 'https']: + logger.error(f"Proxy validation failed: invalid protocol '{protocol}' in dict.") + raise ConfigurationError(f"Invalid proxy protocol: {protocol}") + + if not isinstance(proxy_url, str) or not proxy_url.strip(): + logger.error(f"Proxy validation failed: empty or invalid URL for protocol '{protocol}'.") + raise ConfigurationError(f"Invalid proxy URL for {protocol}: must be non-empty string") + + logger.info("Proxy dictionary configuration validated successfully") + return proxy + + else: + logger.error(f"Proxy validation failed: not a string or dict. Value: {proxy!r}") + raise ConfigurationError("Invalid proxy: must be string or dict") + + +def validate_usernames(usernames: List[str]) -> List[str]: + """Validate and deduplicate usernames, preserving order.""" + logger.debug(f"Validating and deduplicating usernames: {usernames!r}") + + seen: Set[str] = set() + unique_usernames: List[str] = [] + + for u in usernames: + if isinstance(u, str): + name = u.strip() + if name and name not in seen: + seen.add(name) + unique_usernames.append(name) + + if not unique_usernames: + logger.error("No valid usernames provided after validation.") + raise ValidationError("No valid usernames provided") + + logger.info(f"Validated {len(unique_usernames)} unique usernames") + return unique_usernames + + +async def filter_sites( + site_names: Optional[List[str]], + sites: List[Dict[str, Any]], +) -> List[Dict[str, Any]]: + """Filter the list of sites by the provided site names.""" + if not site_names: + return sites + + # Convert to set for O(1) lookup performance + site_names_set = set(site_names) + available = {site.get("name") for site in sites} + missing = site_names_set - available + + if missing: + raise DataError(f"Unknown site names: {missing}") + + filtered_sites = [site for site in sites if site.get("name") in site_names_set] + logger.info(f"Filtered to {len(filtered_sites)} sites from {len(sites)} total") + return filtered_sites \ No newline at end of file