diff --git a/auth_execute_as/README.rst b/auth_execute_as/README.rst new file mode 100644 index 0000000000..40188b0bbc --- /dev/null +++ b/auth_execute_as/README.rst @@ -0,0 +1,125 @@ +=============== +Auth Execute As +=============== + +.. + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + !! This file is generated by oca-gen-addon-readme !! + !! changes will be overwritten. !! + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + !! source digest: sha256:d397c0e122f92e416e9078b2e733e40e9dcbb37feb6205c7d7e60e175ca77972 + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + +.. |badge1| image:: https://img.shields.io/badge/maturity-Beta-yellow.png + :target: https://odoo-community.org/page/development-status + :alt: Beta +.. |badge2| image:: https://img.shields.io/badge/licence-AGPL--3-blue.png + :target: http://www.gnu.org/licenses/agpl-3.0-standalone.html + :alt: License: AGPL-3 +.. |badge3| image:: https://img.shields.io/badge/github-OCA%2Fserver--auth-lightgray.png?logo=github + :target: https://github.com/OCA/server-auth/tree/16.0/auth_execute_as + :alt: OCA/server-auth +.. |badge4| image:: https://img.shields.io/badge/weblate-Translate%20me-F47D42.png + :target: https://translation.odoo-community.org/projects/server-auth-16-0/server-auth-16-0-auth_execute_as + :alt: Translate me on Weblate +.. |badge5| image:: https://img.shields.io/badge/runboat-Try%20me-875A7B.png + :target: https://runboat.odoo-community.org/builds?repo=OCA/server-auth&target_branch=16.0 + :alt: Try me on Runboat + +|badge1| |badge2| |badge3| |badge4| |badge5| + +This module provides a secure API endpoint that allows external systems to +execute Odoo methods as a specific user. + +The key feature is **User Impersonation** - executing actions under a specific +user's identity so that all Odoo access controls (ACLs & Record Rules) are +automatically applied. + +**Security Architecture** + +The module manages access through 3 layers: + +* **API Client**: Identifies the connecting application/service with a secret token +* **API Whitelist**: Groups permissions by purpose (e.g., "Sales Agent Group") +* **API Whitelist Line**: Defines allowed Model + Method combinations and field restrictions + +**Features** + +* Token-based authentication via ``X-API-Key`` header +* IP address whitelist (supports CIDR notation) +* Token expiration dates +* User whitelist per client +* Field-level access control +* Request/response logging with execution time metrics +* LLM-friendly response formatting (simplified Many2one fields, ISO dates) + +**API Endpoint** + +``POST /execute_as`` + +Request body:: + + { + "login": "user@example.com", + "model": "sale.order", + "method": "search_read", + "args": [[["state", "=", "sale"]]], + "kwargs": { + "fields": ["name", "amount_total"], + "limit": 10 + } + } + +**HTTP Status Codes** + +* 200 - Success +* 401 - Invalid or missing API key, expired token +* 403 - Method not whitelisted, IP not allowed, user not allowed +* 404 - User or record not found +* 422 - Validation error (Odoo UserError/ValidationError) +* 500 - Internal server error + +**Table of contents** + +.. contents:: + :local: + +Bug Tracker +=========== + +Bugs are tracked on `GitHub Issues `_. +In case of trouble, please check there if your issue has already been reported. +If you spotted it first, help us to smash it by providing a detailed and welcomed +`feedback `_. + +Do not contact contributors directly about support or help with technical issues. + +Credits +======= + +Authors +~~~~~~~ + +* Kencove + +Contributors +~~~~~~~~~~~~ + +* Thien Vo + +Maintainers +~~~~~~~~~~~ + +This module is maintained by the OCA. + +.. image:: https://odoo-community.org/logo.png + :alt: Odoo Community Association + :target: https://odoo-community.org + +OCA, or the Odoo Community Association, is a nonprofit organization whose +mission is to support the collaborative development of Odoo features and +promote its widespread use. + +This module is part of the `OCA/server-auth `_ project on GitHub. + +You are welcome to contribute. To learn how please visit https://odoo-community.org/page/Contribute. diff --git a/auth_execute_as/__init__.py b/auth_execute_as/__init__.py new file mode 100644 index 0000000000..cef4ba4974 --- /dev/null +++ b/auth_execute_as/__init__.py @@ -0,0 +1,5 @@ +# Copyright 2026 Kencove (https://www.kencove.com). +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from . import controllers +from . import models diff --git a/auth_execute_as/__manifest__.py b/auth_execute_as/__manifest__.py new file mode 100644 index 0000000000..ae8c1fb74c --- /dev/null +++ b/auth_execute_as/__manifest__.py @@ -0,0 +1,22 @@ +# Copyright 2026 Kencove (https://www.kencove.com). +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +{ + "name": "Auth Execute As", + "summary": "Execute API calls as a specific user with whitelist-based access control", + "version": "16.0.1.0.0", + "license": "AGPL-3", + "author": "Kencove, Odoo Community Association (OCA)", + "website": "https://github.com/OCA/server-auth", + "category": "Tools", + "depends": ["base"], + "data": [ + "security/ir.model.access.csv", + "views/auth_api_whitelist_views.xml", + "views/auth_api_whitelist_line_views.xml", + "views/auth_api_client_views.xml", + "views/auth_api_log_views.xml", + "data/ir_cron_data.xml", + ], + "installable": True, +} diff --git a/auth_execute_as/controllers/__init__.py b/auth_execute_as/controllers/__init__.py new file mode 100644 index 0000000000..3d67870c32 --- /dev/null +++ b/auth_execute_as/controllers/__init__.py @@ -0,0 +1,4 @@ +# Copyright 2026 Kencove (https://www.kencove.com). +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from . import main diff --git a/auth_execute_as/controllers/main.py b/auth_execute_as/controllers/main.py new file mode 100644 index 0000000000..ef43164f24 --- /dev/null +++ b/auth_execute_as/controllers/main.py @@ -0,0 +1,268 @@ +# Copyright 2026 Kencove (https://www.kencove.com). +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +import json +import logging +import time + +from odoo import api, fields, http +from odoo.exceptions import AccessError, MissingError, UserError, ValidationError +from odoo.http import request +from odoo.service.model import get_public_method + +_logger = logging.getLogger(__name__) + + +class AuthExecuteAsController(http.Controller): + @http.route("/execute_as", type="json", auth="none", methods=["POST"], csrf=False) + def execute_as(self, **kwargs): + """ + Execute an API call as a specific user with whitelist-based access control. + """ + start_time = time.time() + + # Get request data + params = ( + request.get_json_data() if hasattr(request, "get_json_data") else kwargs + ) + login = params.get("login") + model_name = params.get("model") + method_name = params.get("method") + args = params.get("args", []) + method_kwargs = params.get("kwargs", {}) + + # Calculate request size + request_payload = json.dumps(params, default=str) + request_size = len(request_payload.encode("utf-8")) + + # Initialize log data + log_data = { + "model_name": model_name, + "method": method_name, + "request_payload": request_payload, + "request_size_bytes": request_size, + } + + whitelist = None + truncate = True + try: + # 1. Authentication & Authorization + client, whitelist, user, error = self._validate_request( + login, model_name, method_name, log_data + ) + if error: + return error + + log_data["client_id"] = client.id + log_data["user_id"] = user.id + truncate = whitelist.truncate_response + + # 2. Field Filtering + if method_name in ("read", "search_read") and "fields" in method_kwargs: + method_kwargs["fields"] = self._filter_fields( + whitelist, method_kwargs.get("fields", []) + ) + + # 3. Execution - Execute method as the target user + env = request.env(user=user.id) + model = env[model_name] + + # Validate method exists and is public (checks _ prefix and @api.private) + try: + get_public_method(model, method_name) + except AttributeError: + return self._error_response( + 403, + f"Method '{method_name}' not found or not allowed on model '{model_name}'", + log_data, + truncate, + ) + + result = api.call_kw(model, method_name, args, method_kwargs) + + # 4. Finalize and return + return self._finalize_response(result, whitelist, log_data, start_time) + + except AccessError as e: + return self._error_response(403, str(e), log_data, truncate) + except MissingError as e: + return self._error_response(404, str(e), log_data, truncate) + except (ValidationError, UserError) as e: + return self._error_response(422, str(e), log_data, truncate) + except Exception as e: + _logger.exception("Error in /execute_as endpoint") + return self._error_response(500, str(e), log_data, truncate) + + # ==================== Helper Methods ==================== + + def _validation_error(self, status_code, message, log_data, truncate=False): + """Return a validation failure tuple.""" + return ( + None, + None, + None, + self._error_response(status_code, message, log_data, truncate), + ) + + def _validate_request(self, login, model_name, method_name, log_data): + """Validate authentication, authorization, and return client/whitelist/user.""" + # Authentication - Validate API Key + api_key = request.httprequest.headers.get("X-API-Key") + if not api_key: + return self._validation_error(401, "Missing API Key", log_data) + + client = self._authenticate_client(api_key) + if not client: + return self._validation_error(401, "Invalid API Key", log_data) + + # Check token expiry + if client.is_token_expired(): + return self._validation_error(401, "Token has expired", log_data) + + # Check IP whitelist + client_ip = request.httprequest.remote_addr + if not client.is_ip_allowed(client_ip): + return self._validation_error( + 403, f"IP address '{client_ip}' is not allowed", log_data + ) + + # Authorization - Check Whitelist + whitelist = self._check_whitelist( + client.whitelist_id.id, model_name, method_name + ) + if not whitelist: + return self._validation_error( + 403, + f"Method '{method_name}' on model '{model_name}' is not allowed", + log_data, + ) + + # Impersonation - Find and switch to target user + user = self._find_user(login) + if not user: + return self._validation_error( + 404, f"User with login '{login}' not found", log_data + ) + + # Check user whitelist + if not client.is_user_allowed(user): + return self._validation_error( + 403, f"User '{login}' is not allowed for this client", log_data + ) + + return client, whitelist, user, None + + def _authenticate_client(self, api_key): + """Find and return active client by API key.""" + return ( + request.env["auth.api.client"] + .sudo() + .search([("secret_token", "=", api_key), ("active", "=", True)], limit=1) + ) + + def _check_whitelist(self, whitelist_id, model_name, method_name): + """Check if model/method is in whitelist.""" + if not whitelist_id: + return False + + whitelist = request.env["auth.api.whitelist"].sudo().browse(whitelist_id) + if not whitelist.exists(): + return False + + whitelist_line = whitelist.line_ids.filtered( + lambda w: w.model_id.model == model_name and w.method == method_name + ) + return whitelist_line[:1] if whitelist_line else False + + def _find_user(self, login): + """Find user by login.""" + return ( + request.env["res.users"] + .sudo() + .search([("login", "=", login), ("active", "=", True)], limit=1) + ) + + def _filter_fields(self, whitelist, requested_fields): + """Filter requested fields based on whitelist configuration.""" + if not whitelist.field_ids: + return requested_fields + + allowed_fields = whitelist.field_ids.mapped("name") + return [f for f in requested_fields if f in allowed_fields] + + def _clean_data(self, data): + """Clean data: convert dates to ISO format, simplify Many2one (id, name) to name.""" + if isinstance(data, (list, tuple)) and not self._is_many2one_tuple(data): + return [self._clean_data(item) for item in data] + elif isinstance(data, dict): + cleaned = {} + for key, value in data.items(): + if isinstance(value, (fields.Date, fields.Datetime)): + cleaned[key] = value.isoformat() + elif self._is_many2one_tuple(value): + # Many2one field: (id, name) -> name + cleaned[key] = value[1] + else: + cleaned[key] = self._clean_data(value) + return cleaned + elif isinstance(data, (fields.Date, fields.Datetime)): + return data.isoformat() + return data + + def _is_many2one_tuple(self, value): + """Check if value is a Many2one tuple (id, name).""" + return ( + isinstance(value, tuple) + and len(value) == 2 + and isinstance(value[0], int) + and isinstance(value[1], str) + ) + + def _finalize_response(self, result, whitelist, log_data, start_time): + """Clean data, log, and return response.""" + # Apply clean_response setting from whitelist + clean = whitelist.clean_response if whitelist else True + if clean: + result = self._clean_data(result) + + # Get logging settings from whitelist + log_call = whitelist.log_call if whitelist else True + log_response = whitelist.log_response if whitelist else True + truncate_response = whitelist.truncate_response if whitelist else True + + if log_call: + # Calculate sizes and timing + response_json = json.dumps(result, default=str) + log_data["status_code"] = 200 + log_data["execution_time_ms"] = int((time.time() - start_time) * 1000) + log_data["response_size_bytes"] = len(response_json.encode("utf-8")) + + if log_response: + if truncate_response: + log_data["response_payload"] = response_json[:1000] + else: + log_data["response_payload"] = response_json + + self._create_log(log_data) + + return result + + def _error_response(self, status_code, message, log_data, truncate=True): + """Create error response and log it.""" + response_json = json.dumps({"error": message}) + log_data["status_code"] = status_code + log_data["response_size_bytes"] = len(response_json.encode("utf-8")) + if truncate: + log_data["response_payload"] = response_json[:1000] + else: + log_data["response_payload"] = response_json + self._create_log(log_data) + + return {"error": message, "status_code": status_code} + + def _create_log(self, log_data): + """Create API log entry using sudo to ensure it's always recorded.""" + try: + request.env["auth.api.log"].sudo().create(log_data) + except Exception: + _logger.exception("Failed to create API log") diff --git a/auth_execute_as/data/ir_cron_data.xml b/auth_execute_as/data/ir_cron_data.xml new file mode 100644 index 0000000000..0a331ff868 --- /dev/null +++ b/auth_execute_as/data/ir_cron_data.xml @@ -0,0 +1,13 @@ + + + + API Logs: Cleanup Old Entries + + code + model._cron_cleanup_old_logs(days=30) + 1 + days + -1 + False + + diff --git a/auth_execute_as/models/__init__.py b/auth_execute_as/models/__init__.py new file mode 100644 index 0000000000..4dbb35c91a --- /dev/null +++ b/auth_execute_as/models/__init__.py @@ -0,0 +1,7 @@ +# Copyright 2026 Kencove (https://www.kencove.com). +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from . import auth_api_whitelist +from . import auth_api_whitelist_line +from . import auth_api_client +from . import auth_api_log diff --git a/auth_execute_as/models/auth_api_client.py b/auth_execute_as/models/auth_api_client.py new file mode 100644 index 0000000000..3daa8d7d6f --- /dev/null +++ b/auth_execute_as/models/auth_api_client.py @@ -0,0 +1,112 @@ +# Copyright 2026 Kencove (https://www.kencove.com). +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +import ipaddress +import secrets + +from odoo import api, fields, models + + +class AuthApiClient(models.Model): + _name = "auth.api.client" + _description = "API Client" + + name = fields.Char(required=True) + whitelist_id = fields.Many2one( + "auth.api.whitelist", + string="Whitelist", + required=True, + ondelete="restrict", + ) + secret_token = fields.Char( + required=True, + copy=False, + default=lambda self: secrets.token_urlsafe(32), + ) + active = fields.Boolean(default=True) + token_expires_at = fields.Datetime( + help="Leave empty for no expiration", + ) + last_rotated = fields.Datetime() + allowed_ips = fields.Text( + help="Comma-separated list of allowed IPs or CIDR ranges. Empty = all allowed.", + ) + allowed_user_ids = fields.Many2many( + "res.users", + string="Allowed Users", + help="Specific users that can be impersonated.", + ) + allowed_group_ids = fields.Many2many( + "res.groups", + string="Allowed Groups", + help="User groups that can be impersonated (e.g., Portal Users).", + ) + + _sql_constraints = [ + ( + "secret_token_unique", + "UNIQUE(secret_token)", + "The secret token must be unique!", + ), + ] + + @api.model_create_multi + def create(self, vals_list): + for vals in vals_list: + if not vals.get("secret_token"): + vals["secret_token"] = secrets.token_urlsafe(32) + vals["last_rotated"] = fields.Datetime.now() + return super().create(vals_list) + + def action_regenerate_token(self): + for record in self: + record.secret_token = secrets.token_urlsafe(32) + record.last_rotated = fields.Datetime.now() + + def is_token_expired(self): + """Check if token has expired. Returns False if no expiry set.""" + self.ensure_one() + if not self.token_expires_at: + return False + return fields.Datetime.now() > self.token_expires_at + + def is_ip_allowed(self, ip_address): + """Check if IP address is allowed. Returns True if no restriction set.""" + self.ensure_one() + if not self.allowed_ips: + return True + + allowed_list = [ip.strip() for ip in self.allowed_ips.split(",") if ip.strip()] + if not allowed_list: + return True + + try: + client_ip = ipaddress.ip_address(ip_address) + for allowed in allowed_list: + try: + if "/" in allowed: + network = ipaddress.ip_network(allowed, strict=False) + if client_ip in network: + return True + else: + if client_ip == ipaddress.ip_address(allowed): + return True + except ValueError: + continue + return False + except ValueError: + return False + + def is_user_allowed(self, user): + """Check if user can be impersonated. Returns True if no restriction set.""" + self.ensure_one() + # No restrictions = all users allowed + if not self.allowed_user_ids and not self.allowed_group_ids: + return True + # Check if user is in allowed users list + if self.allowed_user_ids and user in self.allowed_user_ids: + return True + # Check if user belongs to any allowed group + if self.allowed_group_ids and (user.groups_id & self.allowed_group_ids): + return True + return False diff --git a/auth_execute_as/models/auth_api_log.py b/auth_execute_as/models/auth_api_log.py new file mode 100644 index 0000000000..87ce9f3ca6 --- /dev/null +++ b/auth_execute_as/models/auth_api_log.py @@ -0,0 +1,39 @@ +# Copyright 2026 Kencove (https://www.kencove.com). +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from datetime import timedelta + +from odoo import api, fields, models + + +class AuthApiLog(models.Model): + _name = "auth.api.log" + _description = "API Log" + _order = "create_date desc" + + client_id = fields.Many2one( + "auth.api.client", + string="Client", + index=True, + ) + user_id = fields.Many2one( + "res.users", + string="Impersonated User", + index=True, + ) + model_name = fields.Char(string="Model") + method = fields.Char() + request_payload = fields.Text() + response_payload = fields.Text() + status_code = fields.Integer() + execution_time_ms = fields.Integer(string="Execution Time (ms)") + request_size_bytes = fields.Integer(string="Request Size (bytes)") + response_size_bytes = fields.Integer(string="Response Size (bytes)") + + @api.model + def _cron_cleanup_old_logs(self, days=30): + """Scheduled action to clean up old logs.""" + cutoff = fields.Datetime.now() - timedelta(days=days) + old_logs = self.search([("create_date", "<=", cutoff)]) + old_logs.unlink() + return True diff --git a/auth_execute_as/models/auth_api_whitelist.py b/auth_execute_as/models/auth_api_whitelist.py new file mode 100644 index 0000000000..0a4fd0f83f --- /dev/null +++ b/auth_execute_as/models/auth_api_whitelist.py @@ -0,0 +1,16 @@ +# Copyright 2026 Kencove (https://www.kencove.com). +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +from odoo import fields, models + + +class AuthApiWhitelist(models.Model): + _name = "auth.api.whitelist" + _description = "API Whitelist" + + name = fields.Char(required=True) + description = fields.Text() + line_ids = fields.One2many( + comodel_name="auth.api.whitelist.line", + inverse_name="whitelist_id", + string="Whitelist Lines", + ) diff --git a/auth_execute_as/models/auth_api_whitelist_line.py b/auth_execute_as/models/auth_api_whitelist_line.py new file mode 100644 index 0000000000..35a2386c82 --- /dev/null +++ b/auth_execute_as/models/auth_api_whitelist_line.py @@ -0,0 +1,62 @@ +# Copyright 2026 Kencove (https://www.kencove.com). +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from odoo import api, fields, models + + +class AuthApiWhitelistLine(models.Model): + _name = "auth.api.whitelist.line" + _description = "API Whitelist Line" + _rec_name = "display_name" + + whitelist_id = fields.Many2one( + comodel_name="auth.api.whitelist", + string="Whitelist", + required=True, + ondelete="cascade", + index=True, + ) + model_id = fields.Many2one( + comodel_name="ir.model", + string="Model", + required=True, + ondelete="cascade", + ) + method = fields.Char(required=True) + field_ids = fields.Many2many( + comodel_name="ir.model.fields", + relation="auth_api_whitelist_line_field_rel", + column1="line_id", + column2="field_id", + string="Allowed Fields", + domain="[('model_id', '=', model_id)]", + ) + clean_response = fields.Boolean( + default=True, + help=""" + Clean response data: + convert dates to ISO format, simplify Many2one (id, name) to just name + """, + ) + log_call = fields.Boolean( + default=True, + help="Log API calls for this method", + ) + log_response = fields.Boolean( + default=True, + help="Include response payload in logs", + ) + truncate_response = fields.Boolean( + default=True, + help="Truncate response payload to 10KB in logs", + ) + display_name = fields.Char( + compute="_compute_display_name", + store=True, + ) + + @api.depends("model_id", "method") + def _compute_display_name(self): + for record in self: + model_name = record.model_id.model if record.model_id else "" + record.display_name = f"{model_name}.{record.method or ''}" diff --git a/auth_execute_as/readme/CONTRIBUTORS.rst b/auth_execute_as/readme/CONTRIBUTORS.rst new file mode 100644 index 0000000000..b05a5d70c7 --- /dev/null +++ b/auth_execute_as/readme/CONTRIBUTORS.rst @@ -0,0 +1 @@ +* Thien Vo diff --git a/auth_execute_as/readme/DESCRIPTION.rst b/auth_execute_as/readme/DESCRIPTION.rst new file mode 100644 index 0000000000..f04ea6081f --- /dev/null +++ b/auth_execute_as/readme/DESCRIPTION.rst @@ -0,0 +1,50 @@ +This module provides a secure API endpoint that allows external systems to +execute Odoo methods as a specific user. + +The key feature is **User Impersonation** - executing actions under a specific +user's identity so that all Odoo access controls (ACLs & Record Rules) are +automatically applied. + +**Security Architecture** + +The module manages access through 3 layers: + +* **API Client**: Identifies the connecting application/service with a secret token +* **API Whitelist**: Groups permissions by purpose (e.g., "Sales Agent Group") +* **API Whitelist Line**: Defines allowed Model + Method combinations and field restrictions + +**Features** + +* Token-based authentication via ``X-API-Key`` header +* IP address whitelist (supports CIDR notation) +* Token expiration dates +* User whitelist per client +* Field-level access control +* Request/response logging with execution time metrics +* LLM-friendly response formatting (simplified Many2one fields, ISO dates) + +**API Endpoint** + +``POST /execute_as`` + +Request body:: + + { + "login": "user@example.com", + "model": "sale.order", + "method": "search_read", + "args": [[["state", "=", "sale"]]], + "kwargs": { + "fields": ["name", "amount_total"], + "limit": 10 + } + } + +**HTTP Status Codes** + +* 200 - Success +* 401 - Invalid or missing API key, expired token +* 403 - Method not whitelisted, IP not allowed, user not allowed +* 404 - User or record not found +* 422 - Validation error (Odoo UserError/ValidationError) +* 500 - Internal server error diff --git a/auth_execute_as/security/ir.model.access.csv b/auth_execute_as/security/ir.model.access.csv new file mode 100644 index 0000000000..9dfed3f817 --- /dev/null +++ b/auth_execute_as/security/ir.model.access.csv @@ -0,0 +1,5 @@ +id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink +access_auth_api_whitelist_admin,auth.api.whitelist admin,model_auth_api_whitelist,base.group_system,1,1,1,1 +access_auth_api_whitelist_line_admin,auth.api.whitelist.line admin,model_auth_api_whitelist_line,base.group_system,1,1,1,1 +access_auth_api_client_admin,auth.api.client admin,model_auth_api_client,base.group_system,1,1,1,1 +access_auth_api_log_admin,auth.api.log admin,model_auth_api_log,base.group_system,1,1,1,1 diff --git a/auth_execute_as/static/description/index.html b/auth_execute_as/static/description/index.html new file mode 100644 index 0000000000..df192e1e92 --- /dev/null +++ b/auth_execute_as/static/description/index.html @@ -0,0 +1,468 @@ + + + + + +Auth Execute As + + + +
+

Auth Execute As

+ + +

Beta License: AGPL-3 OCA/server-auth Translate me on Weblate Try me on Runboat

+

This module provides a secure API endpoint that allows external systems to +execute Odoo methods as a specific user.

+

The key feature is User Impersonation - executing actions under a specific +user’s identity so that all Odoo access controls (ACLs & Record Rules) are +automatically applied.

+

Security Architecture

+

The module manages access through 3 layers:

+
    +
  • API Client: Identifies the connecting application/service with a secret token
  • +
  • API Whitelist: Groups permissions by purpose (e.g., “Sales Agent Group”)
  • +
  • API Whitelist Line: Defines allowed Model + Method combinations and field restrictions
  • +
+

Features

+
    +
  • Token-based authentication via X-API-Key header
  • +
  • IP address whitelist (supports CIDR notation)
  • +
  • Token expiration dates
  • +
  • User whitelist per client
  • +
  • Field-level access control
  • +
  • Request/response logging with execution time metrics
  • +
  • LLM-friendly response formatting (simplified Many2one fields, ISO dates)
  • +
+

API Endpoint

+

POST /execute_as

+

Request body:

+
+{
+    "login": "user@example.com",
+    "model": "sale.order",
+    "method": "search_read",
+    "args": [[["state", "=", "sale"]]],
+    "kwargs": {
+        "fields": ["name", "amount_total"],
+        "limit": 10
+    }
+}
+
+

HTTP Status Codes

+
    +
  • 200 - Success
  • +
  • 401 - Invalid or missing API key, expired token
  • +
  • 403 - Method not whitelisted, IP not allowed, user not allowed
  • +
  • 404 - User or record not found
  • +
  • 422 - Validation error (Odoo UserError/ValidationError)
  • +
  • 500 - Internal server error
  • +
+

Table of contents

+ +
+

Bug Tracker

+

Bugs are tracked on GitHub Issues. +In case of trouble, please check there if your issue has already been reported. +If you spotted it first, help us to smash it by providing a detailed and welcomed +feedback.

+

Do not contact contributors directly about support or help with technical issues.

+
+
+

Credits

+
+

Authors

+
    +
  • Kencove
  • +
+
+
+

Contributors

+ +
+
+

Maintainers

+

This module is maintained by the OCA.

+ +Odoo Community Association + +

OCA, or the Odoo Community Association, is a nonprofit organization whose +mission is to support the collaborative development of Odoo features and +promote its widespread use.

+

This module is part of the OCA/server-auth project on GitHub.

+

You are welcome to contribute. To learn how please visit https://odoo-community.org/page/Contribute.

+
+
+
+ + diff --git a/auth_execute_as/tests/__init__.py b/auth_execute_as/tests/__init__.py new file mode 100644 index 0000000000..92fb0ab1fb --- /dev/null +++ b/auth_execute_as/tests/__init__.py @@ -0,0 +1,5 @@ +# Copyright 2026 Kencove (https://www.kencove.com). +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from . import test_auth_api_controller +from . import test_auth_api_log_cleanup diff --git a/auth_execute_as/tests/common.py b/auth_execute_as/tests/common.py new file mode 100644 index 0000000000..8e30f0829c --- /dev/null +++ b/auth_execute_as/tests/common.py @@ -0,0 +1,44 @@ +# Copyright 2026 Kencove (https://www.kencove.com). +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from odoo.tests.common import TransactionCase + + +class AuthExecuteAsTestCommon(TransactionCase): + def setUp(self): + super().setUp() + + self.test_model = self.env["ir.model"].search( + [("model", "=", "res.partner")], limit=1 + ) + + self.whitelist = self.env["auth.api.whitelist"].create( + { + "name": "Test Whitelist", + "description": "Whitelist for testing", + } + ) + + self.whitelist_line = self.env["auth.api.whitelist.line"].create( + { + "whitelist_id": self.whitelist.id, + "model_id": self.test_model.id, + "method": "search_read", + "clean_response": True, + } + ) + + self.client = self.env["auth.api.client"].create( + { + "name": "Test Client", + "whitelist_id": self.whitelist.id, + } + ) + + self.test_user = self.env["res.users"].create( + { + "name": "Test API User", + "login": "test_api_user2@example.com", + "email": "test_api_user2@example.com", + } + ) diff --git a/auth_execute_as/tests/test_auth_api_controller.py b/auth_execute_as/tests/test_auth_api_controller.py new file mode 100644 index 0000000000..1eefcd3641 --- /dev/null +++ b/auth_execute_as/tests/test_auth_api_controller.py @@ -0,0 +1,407 @@ +# Copyright 2026 Kencove (https://www.kencove.com). +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from datetime import timedelta +from unittest.mock import MagicMock, patch + +from odoo import fields +from odoo.exceptions import AccessError + +from odoo.addons.auth_execute_as.controllers.main import AuthExecuteAsController + +from .common import AuthExecuteAsTestCommon + + +class TestAuthExecuteAsController(AuthExecuteAsTestCommon): + """Test cases for AuthExecuteAsController. + + Test flow: + 1. Authentication: API key validation, token expiry, client active + 2. Authorization: IP whitelist, method whitelist, user whitelist + 3. Execution: api.call_kw with @api.model and record methods + 4. Response: clean_response, field filtering, logging + """ + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.controller = AuthExecuteAsController() + cls.request_path = "odoo.addons.auth_execute_as.controllers.main.request" + + def _mock_request(self, api_key=None, remote_addr="127.0.0.1"): + """Create mock request with env and headers.""" + mock = MagicMock() + mock.env = self.env + mock.httprequest.headers.get.return_value = api_key + mock.httprequest.remote_addr = remote_addr + mock.httprequest.environ = {} + del mock.get_json_data # Force using kwargs + return mock + + def _execute( + self, + api_key, + login=None, + model="res.partner", + method="search_read", + args=None, + kwargs=None, + remote_addr="127.0.0.1", + ): + """Execute controller method with mock request.""" + mock = self._mock_request(api_key=api_key, remote_addr=remote_addr) + with patch(self.request_path, mock): + return self.controller.execute_as( + login=login or self.test_user.login, + model=model, + method=method, + args=args if args is not None else [[]], + kwargs=kwargs if kwargs is not None else {"limit": 1}, + ) + + def _add_whitelist_method(self, method, **options): + """Add method to whitelist with optional settings.""" + vals = { + "whitelist_id": self.whitelist.id, + "model_id": self.test_model.id, + "method": method, + } + vals.update(options) + return self.env["auth.api.whitelist.line"].create(vals) + + # ==================== 1. Authentication Tests (401) ==================== + + def test_auth_01_missing_api_key(self): + """Request without X-API-Key header returns 401.""" + result = self._execute(api_key=None) + self.assertEqual(result["status_code"], 401) + self.assertIn("Missing API Key", result["error"]) + + def test_auth_02_invalid_api_key(self): + """Request with non-existent API key returns 401.""" + result = self._execute(api_key="invalid_token_12345") + self.assertEqual(result["status_code"], 401) + self.assertIn("Invalid API Key", result["error"]) + + def test_auth_03_expired_token(self): + """Request with expired token returns 401.""" + self.client.token_expires_at = fields.Datetime.now() - timedelta(hours=1) + result = self._execute(api_key=self.client.secret_token) + self.assertEqual(result["status_code"], 401) + self.assertIn("expired", result["error"]) + + def test_auth_04_inactive_client(self): + """Request with inactive client returns 401.""" + self.client.active = False + result = self._execute(api_key=self.client.secret_token) + self.assertEqual(result["status_code"], 401) + self.assertIn("Invalid API Key", result["error"]) + + # ==================== 2. Authorization Tests (403) ==================== + + def test_authz_01_ip_not_allowed(self): + """Request from non-allowed IP returns 403.""" + self.client.allowed_ips = "10.0.0.1, 192.168.0.0/24" + result = self._execute( + api_key=self.client.secret_token, remote_addr="172.16.0.1" + ) + self.assertEqual(result["status_code"], 403) + self.assertIn("not allowed", result["error"]) + + def test_authz_02_ip_allowed_single(self): + """Request from allowed single IP succeeds.""" + self.client.allowed_ips = "192.168.1.100" + result = self._execute( + api_key=self.client.secret_token, remote_addr="192.168.1.100" + ) + self.assertIsInstance(result, list) + + def test_authz_03_ip_allowed_cidr(self): + """Request from IP in CIDR range succeeds.""" + self.client.allowed_ips = "192.168.1.0/24" + result = self._execute( + api_key=self.client.secret_token, remote_addr="192.168.1.55" + ) + self.assertIsInstance(result, list) + + def test_authz_04_method_not_whitelisted(self): + """Request for non-whitelisted method returns 403.""" + result = self._execute(api_key=self.client.secret_token, method="unlink") + self.assertEqual(result["status_code"], 403) + self.assertIn("not allowed", result["error"]) + + def test_authz_05_user_not_in_allowed_list(self): + """Request to impersonate non-allowed user returns 403.""" + other_user = self.env["res.users"].create( + { + "name": "Other User", + "login": "other@example.com", + } + ) + self.client.allowed_user_ids = [(6, 0, [self.test_user.id])] + result = self._execute(api_key=self.client.secret_token, login=other_user.login) + self.assertEqual(result["status_code"], 403) + self.assertIn("not allowed", result["error"]) + + def test_authz_06_user_in_allowed_list(self): + """Request to impersonate allowed user succeeds.""" + self.client.allowed_user_ids = [(6, 0, [self.test_user.id])] + result = self._execute(api_key=self.client.secret_token) + self.assertIsInstance(result, list) + + def test_authz_07_user_not_in_allowed_group(self): + """Request for user not in allowed group returns 403.""" + group = self.env["res.groups"].create({"name": "Restricted Group"}) + self.client.allowed_group_ids = [(6, 0, [group.id])] + result = self._execute(api_key=self.client.secret_token) + self.assertEqual(result["status_code"], 403) + + def test_authz_08_user_in_allowed_group(self): + """Request for user in allowed group succeeds.""" + group = self.env["res.groups"].create({"name": "API Group"}) + self.test_user.groups_id = [(4, group.id)] + self.client.allowed_group_ids = [(6, 0, [group.id])] + result = self._execute(api_key=self.client.secret_token) + self.assertIsInstance(result, list) + + def test_authz_09_private_method_rejected(self): + """Private method (starting with _) is rejected.""" + self._add_whitelist_method("_compute_display_name") + result = self._execute( + api_key=self.client.secret_token, + method="_compute_display_name", + args=[], + kwargs={}, + ) + # Private methods rejected by get_public_method (403) or api.call_kw (500) + self.assertIn(result["status_code"], [403, 500]) + self.assertTrue( + "Private" in result["error"] or "not allowed" in result["error"] + ) + + # ==================== 3. Not Found Tests (404) ==================== + + def test_notfound_01_user_not_found(self): + """Request with non-existent login returns 404.""" + result = self._execute( + api_key=self.client.secret_token, login="nonexistent@example.com" + ) + self.assertEqual(result["status_code"], 404) + self.assertIn("not found", result["error"]) + + def test_notfound_02_method_not_found(self): + """Request for non-existent method on model returns error.""" + self._add_whitelist_method("nonexistent_method_xyz") + result = self._execute( + api_key=self.client.secret_token, + method="nonexistent_method_xyz", + args=[], + kwargs={}, + ) + self.assertIn(result["status_code"], [403, 404]) + + # ==================== 4. api.call_kw Tests ==================== + + def test_callkw_01_search_read(self): + """Test search_read method (setup in common).""" + result = self._execute(api_key=self.client.secret_token) + self.assertIsInstance(result, list) + + def test_callkw_02_create(self): + """Test create method (@api.model_create_multi).""" + self._add_whitelist_method("create") + result = self._execute( + api_key=self.client.secret_token, + method="create", + args=[{"name": "Created Partner"}], + kwargs={}, + ) + self.assertIsInstance(result, int) + partner = self.env["res.partner"].browse(result) + self.assertEqual(partner.name, "Created Partner") + + def test_callkw_03_read_with_ids(self): + """Test read method with specific IDs.""" + self._add_whitelist_method("read") + partner = self.env["res.partner"].create( + {"name": "Read Test", "email": "read@test.com"} + ) + result = self._execute( + api_key=self.client.secret_token, + method="read", + args=[[partner.id]], + kwargs={"fields": ["name", "email"]}, + ) + self.assertEqual(len(result), 1) + self.assertEqual(result[0]["name"], "Read Test") + self.assertEqual(result[0]["email"], "read@test.com") + + def test_callkw_04_write_with_ids(self): + """Test write method on specific records.""" + self._add_whitelist_method("write") + partner = self.env["res.partner"].create({"name": "Old Name"}) + result = self._execute( + api_key=self.client.secret_token, + method="write", + args=[[partner.id], {"name": "New Name"}], + kwargs={}, + ) + self.assertTrue(result) + partner.invalidate_recordset() + self.assertEqual(partner.name, "New Name") + + def test_callkw_05_action_on_record(self): + """Test action method (action_archive) on record.""" + self._add_whitelist_method("action_archive") + partner = self.env["res.partner"].create({"name": "To Archive"}) + self.assertTrue(partner.active) + + self._execute( + api_key=self.client.secret_token, + method="action_archive", + args=[[partner.id]], + kwargs={}, + ) + partner.invalidate_recordset() + self.assertFalse(partner.active) + + def test_callkw_06_multiple_ids(self): + """Test method on multiple records.""" + self._add_whitelist_method("write") + p1 = self.env["res.partner"].create({"name": "P1"}) + p2 = self.env["res.partner"].create({"name": "P2"}) + + self._execute( + api_key=self.client.secret_token, + method="write", + args=[[p1.id, p2.id], {"ref": "BULK001"}], + kwargs={}, + ) + p1.invalidate_recordset() + p2.invalidate_recordset() + self.assertEqual(p1.ref, "BULK001") + self.assertEqual(p2.ref, "BULK001") + + def test_callkw_07_invalid_id_returns_empty(self): + """Test read with non-existent ID returns empty list.""" + self._add_whitelist_method("read") + result = self._execute( + api_key=self.client.secret_token, + method="read", + args=[[999999999]], + kwargs={"fields": ["name"]}, + ) + # Odoo read() with non-existent IDs returns empty list, not MissingError + self.assertIsInstance(result, list) + self.assertEqual(len(result), 0) + + # ==================== 5. Response Processing Tests ==================== + + def test_response_01_clean_response_true(self): + """clean_response=True: Many2one (id, name) becomes name string.""" + self.whitelist_line.clean_response = True + parent = self.env["res.partner"].create({"name": "Parent Co"}) + child = self.env["res.partner"].create( + {"name": "Child", "parent_id": parent.id} + ) + + result = self._execute( + api_key=self.client.secret_token, + args=[[["id", "=", child.id]]], + kwargs={"fields": ["name", "parent_id"]}, + ) + self.assertEqual(result[0]["parent_id"], "Parent Co") + + def test_response_02_clean_response_false(self): + """clean_response=False: Many2one keeps tuple (id, name).""" + self.whitelist_line.clean_response = False + parent = self.env["res.partner"].create({"name": "Parent Raw"}) + child = self.env["res.partner"].create( + {"name": "Child Raw", "parent_id": parent.id} + ) + + result = self._execute( + api_key=self.client.secret_token, + args=[[["id", "=", child.id]]], + kwargs={"fields": ["name", "parent_id"]}, + ) + self.assertIsInstance(result[0]["parent_id"], (list, tuple)) + self.assertEqual(result[0]["parent_id"][0], parent.id) + self.assertEqual(result[0]["parent_id"][1], "Parent Raw") + + def test_response_03_field_filtering(self): + """field_ids filter: only allowed fields returned.""" + name_field = self.env["ir.model.fields"].search( + [("model_id", "=", self.test_model.id), ("name", "=", "name")], limit=1 + ) + self.whitelist_line.field_ids = [(6, 0, [name_field.id])] + + result = self._execute( + api_key=self.client.secret_token, + kwargs={"fields": ["name", "email", "phone"], "limit": 1}, + ) + if result: + self.assertIn("name", result[0]) + self.assertNotIn("email", result[0]) + self.assertNotIn("phone", result[0]) + + # ==================== 6. Logging Tests ==================== + + def test_logging_01_log_created(self): + """log_call=True: creates log entry with full info.""" + self.whitelist_line.log_call = True + self.whitelist_line.log_response = True + + count_before = self.env["auth.api.log"].search_count([]) + self._execute(api_key=self.client.secret_token) + count_after = self.env["auth.api.log"].search_count([]) + + self.assertEqual(count_after, count_before + 1) + + log = self.env["auth.api.log"].search([], order="id desc", limit=1) + self.assertEqual(log.client_id, self.client) + self.assertEqual(log.user_id, self.test_user) + self.assertEqual(log.model_name, "res.partner") + self.assertEqual(log.method, "search_read") + self.assertEqual(log.status_code, 200) + self.assertTrue(log.response_payload) + + def test_logging_02_log_call_disabled(self): + """log_call=False: no log entry created.""" + self.whitelist_line.log_call = False + + count_before = self.env["auth.api.log"].search_count([]) + self._execute(api_key=self.client.secret_token) + count_after = self.env["auth.api.log"].search_count([]) + + self.assertEqual(count_after, count_before) + + def test_logging_03_log_response_disabled(self): + """log_response=False: log has no response_payload.""" + self.whitelist_line.log_call = True + self.whitelist_line.log_response = False + + self._execute(api_key=self.client.secret_token) + + log = self.env["auth.api.log"].search([], order="id desc", limit=1) + self.assertFalse(log.response_payload) + + # ==================== 7. Error Handling Tests ==================== + + def test_error_01_access_error_returns_403(self): + """AccessError during execution returns 403.""" + self._add_whitelist_method("read") + + mock = self._mock_request(api_key=self.client.secret_token) + with patch(self.request_path, mock): + with patch("odoo.api.call_kw", side_effect=AccessError("Access Denied")): + result = self.controller.execute_as( + login=self.test_user.login, + model="res.partner", + method="read", + args=[[1]], + kwargs={}, + ) + + self.assertEqual(result["status_code"], 403) + self.assertIn("Access Denied", result["error"]) diff --git a/auth_execute_as/tests/test_auth_api_log_cleanup.py b/auth_execute_as/tests/test_auth_api_log_cleanup.py new file mode 100644 index 0000000000..c3a2b281cd --- /dev/null +++ b/auth_execute_as/tests/test_auth_api_log_cleanup.py @@ -0,0 +1,42 @@ +# Copyright 2026 Kencove (https://www.kencove.com). +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from odoo import fields + +from .common import AuthExecuteAsTestCommon + + +class TestLogCleanup(AuthExecuteAsTestCommon): + def test_log_cleanup_cron(self): + """Test log cleanup cron job.""" + log = self.env["auth.api.log"].create( + { + "client_id": self.client.id, + "model_name": "test", + "method": "test", + "status_code": 200, + } + ) + log.create_date = fields.Datetime.to_string( + fields.Datetime.subtract(fields.Datetime.now(), days=31) + ) + # Run cleanup with 30 days + self.env["auth.api.log"]._cron_cleanup_old_logs(days=30) + + self.assertFalse(log.exists()) + + def test_log_cleanup_keeps_recent(self): + """Test that cleanup keeps recent logs.""" + log = self.env["auth.api.log"].create( + { + "client_id": self.client.id, + "model_name": "test", + "method": "test", + "status_code": 200, + } + ) + + # Run cleanup with 30 days + self.env["auth.api.log"]._cron_cleanup_old_logs(days=30) + + self.assertTrue(log.exists()) diff --git a/auth_execute_as/views/auth_api_client_views.xml b/auth_execute_as/views/auth_api_client_views.xml new file mode 100644 index 0000000000..44d399def0 --- /dev/null +++ b/auth_execute_as/views/auth_api_client_views.xml @@ -0,0 +1,92 @@ + + + + auth.api.client.form + auth.api.client + +
+ +
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+
+
+
+ + + auth.api.client.tree + auth.api.client + + + + + + + + + + + + Auth API Clients + auth.api.client + tree,form + + + +
diff --git a/auth_execute_as/views/auth_api_log_views.xml b/auth_execute_as/views/auth_api_log_views.xml new file mode 100644 index 0000000000..d6c4038691 --- /dev/null +++ b/auth_execute_as/views/auth_api_log_views.xml @@ -0,0 +1,115 @@ + + + + auth.api.log.form + auth.api.log + +
+ + + + + + + + + + + + + + + + + + + + + + + + + +
+
+
+ + + auth.api.log.tree + auth.api.log + + + + + + + + + + + + + + + auth.api.log.search + auth.api.log + + + + + + + + + + + + + + + + + + + + Logs + auth.api.log + tree,form + + + +
diff --git a/auth_execute_as/views/auth_api_whitelist_line_views.xml b/auth_execute_as/views/auth_api_whitelist_line_views.xml new file mode 100644 index 0000000000..15b0d8863c --- /dev/null +++ b/auth_execute_as/views/auth_api_whitelist_line_views.xml @@ -0,0 +1,54 @@ + + + + auth.api.whitelist.line.form + auth.api.whitelist.line + +
+ + + + + + + + + + + + + + + + + + + + + +
+
+
+ + + auth.api.whitelist.line.tree + auth.api.whitelist.line + + + + + + + + + + + + +
diff --git a/auth_execute_as/views/auth_api_whitelist_views.xml b/auth_execute_as/views/auth_api_whitelist_views.xml new file mode 100644 index 0000000000..07cbdb2140 --- /dev/null +++ b/auth_execute_as/views/auth_api_whitelist_views.xml @@ -0,0 +1,55 @@ + + + + auth.api.whitelist.form + auth.api.whitelist + +
+ + + + + + + + + + + +
+
+
+ + + auth.api.whitelist.tree + auth.api.whitelist + + + + + + + + + + Whitelist + auth.api.whitelist + tree,form + + + + + +
diff --git a/setup/auth_execute_as/odoo/addons/auth_execute_as b/setup/auth_execute_as/odoo/addons/auth_execute_as new file mode 120000 index 0000000000..ee54076b49 --- /dev/null +++ b/setup/auth_execute_as/odoo/addons/auth_execute_as @@ -0,0 +1 @@ +../../../../auth_execute_as \ No newline at end of file diff --git a/setup/auth_execute_as/setup.py b/setup/auth_execute_as/setup.py new file mode 100644 index 0000000000..28c57bb640 --- /dev/null +++ b/setup/auth_execute_as/setup.py @@ -0,0 +1,6 @@ +import setuptools + +setuptools.setup( + setup_requires=['setuptools-odoo'], + odoo_addon=True, +)