From 93e87c1cd0ae60e5319cd489edce676edce6c4d8 Mon Sep 17 00:00:00 2001 From: "bishoy.sameh@progressiosolutions.com" Date: Thu, 30 Oct 2025 19:49:59 +0300 Subject: [PATCH] feat: add role management helper methods Add convenient methods for managing user role assignments: - assign_role_to_user(username, role_name): Assign a role to a user - remove_role_from_user(username, role_name): Remove a role from a user - get_user_roles(username): Get all roles assigned to a user - get_roles(): Get all roles in the organization - get_role(role_name): Get a specific role - update_role(role): Update a role Both sync and async versions included. These methods simplify the common task of role assignment by handling the workflow of fetching the role, modifying the users list, and updating the role in a single call. --- src/casdoor/async_main.py | 104 ++++++++++++++++++++++++++++++++++++++ src/casdoor/role.py | 60 ++++++++++++++++++++++ 2 files changed, 164 insertions(+) diff --git a/src/casdoor/async_main.py b/src/casdoor/async_main.py index e40f40e..27c070c 100644 --- a/src/casdoor/async_main.py +++ b/src/casdoor/async_main.py @@ -462,3 +462,107 @@ async def update_user(self, user: User) -> Dict: async def delete_user(self, user: User) -> Dict: response = await self.modify_user("delete-user", user) return response + + async def get_roles(self) -> List[Dict]: + """ + Get all roles from Casdoor for the configured organization. + + :return: list of role dicts + """ + path = "/api/get-roles" + params = {"owner": self.org_name} + async with self._session as session: + response = await session.get(path, headers=self.headers, params=params) + if response.get("status") != "ok": + raise Exception(response.get("msg", "Failed to get roles")) + return response.get("data", []) + + async def get_role(self, role_name: str) -> Dict: + """ + Get a specific role from Casdoor. + + :param role_name: the name of the role + :return: role dict + """ + path = "/api/get-role" + params = {"id": f"{self.org_name}/{role_name}"} + async with self._session as session: + response = await session.get(path, headers=self.headers, params=params) + if response.get("status") != "ok": + raise Exception(response.get("msg", f"Role {role_name} not found")) + return response.get("data") + + async def update_role(self, role: Dict) -> Dict: + """ + Update a role in Casdoor. + + :param role: role dict with updated data + :return: response dict with status + """ + path = "/api/update-role" + params = {"id": f"{role['owner']}/{role['name']}"} + async with self._session as session: + response = await session.post(path, params=params, headers=self.headers, json=role) + if response.get("status") != "ok": + raise Exception(response.get("msg", "Failed to update role")) + return response + + async def assign_role_to_user(self, username: str, role_name: str) -> Dict: + """ + Assign a role to a user by adding the user to the role's users list. + + :param username: the username to assign the role to + :param role_name: the name of the role to assign + :return: response dict with status + """ + user_id = f"{self.org_name}/{username}" + + role = await self.get_role(role_name) + if not role: + raise Exception(f"Role {role_name} not found") + + if not role.get("users"): + role["users"] = [] + + if user_id not in role["users"]: + role["users"].append(user_id) + return await self.update_role(role) + + return {"status": "ok", "msg": "User already has this role"} + + async def remove_role_from_user(self, username: str, role_name: str) -> Dict: + """ + Remove a role from a user by removing the user from the role's users list. + + :param username: the username to remove the role from + :param role_name: the name of the role to remove + :return: response dict with status + """ + user_id = f"{self.org_name}/{username}" + + role = await self.get_role(role_name) + if not role: + raise Exception(f"Role {role_name} not found") + + if role.get("users") and user_id in role["users"]: + role["users"].remove(user_id) + return await self.update_role(role) + + return {"status": "ok", "msg": "User does not have this role"} + + async def get_user_roles(self, username: str) -> List[Dict]: + """ + Get all roles assigned to a user. + + :param username: the username to get roles for + :return: list of role dicts assigned to the user + """ + user_id = f"{self.org_name}/{username}" + all_roles = await self.get_roles() + + user_roles = [] + for role in all_roles: + if role.get("users") and user_id in role["users"]: + user_roles.append(role) + + return user_roles diff --git a/src/casdoor/role.py b/src/casdoor/role.py index c2238b1..76e51bb 100644 --- a/src/casdoor/role.py +++ b/src/casdoor/role.py @@ -125,3 +125,63 @@ def update_role(self, role: Role) -> Dict: def delete_role(self, role: Role) -> Dict: response = self.modify_role("delete-role", role) return response + + def assign_role_to_user(self, username: str, role_name: str) -> Dict: + """ + Assign a role to a user by adding the user to the role's users list. + + :param username: the username to assign the role to + :param role_name: the name of the role to assign + :return: response dict with status + """ + user_id = f"{self.org_name}/{username}" + + role = self.get_role(role_name) + if role is None: + raise Exception(f"Role {role_name} not found") + + if not hasattr(role, "users") or role.users is None: + role.users = [] + + if user_id not in role.users: + role.users.append(user_id) + return self.update_role(role) + + return {"status": "ok", "msg": "User already has this role"} + + def remove_role_from_user(self, username: str, role_name: str) -> Dict: + """ + Remove a role from a user by removing the user from the role's users list. + + :param username: the username to remove the role from + :param role_name: the name of the role to remove + :return: response dict with status + """ + user_id = f"{self.org_name}/{username}" + + role = self.get_role(role_name) + if role is None: + raise Exception(f"Role {role_name} not found") + + if hasattr(role, "users") and role.users and user_id in role.users: + role.users.remove(user_id) + return self.update_role(role) + + return {"status": "ok", "msg": "User does not have this role"} + + def get_user_roles(self, username: str) -> List[Role]: + """ + Get all roles assigned to a user. + + :param username: the username to get roles for + :return: list of Role objects assigned to the user + """ + user_id = f"{self.org_name}/{username}" + all_roles = self.get_roles() + + user_roles = [] + for role in all_roles: + if hasattr(role, "users") and role.users and user_id in role.users: + user_roles.append(role) + + return user_roles