diff --git a/src/casdoor/async_main.py b/src/casdoor/async_main.py index e40f40e..27c070c 100644 --- a/src/casdoor/async_main.py +++ b/src/casdoor/async_main.py @@ -462,3 +462,107 @@ async def update_user(self, user: User) -> Dict: async def delete_user(self, user: User) -> Dict: response = await self.modify_user("delete-user", user) return response + + async def get_roles(self) -> List[Dict]: + """ + Get all roles from Casdoor for the configured organization. + + :return: list of role dicts + """ + path = "/api/get-roles" + params = {"owner": self.org_name} + async with self._session as session: + response = await session.get(path, headers=self.headers, params=params) + if response.get("status") != "ok": + raise Exception(response.get("msg", "Failed to get roles")) + return response.get("data", []) + + async def get_role(self, role_name: str) -> Dict: + """ + Get a specific role from Casdoor. + + :param role_name: the name of the role + :return: role dict + """ + path = "/api/get-role" + params = {"id": f"{self.org_name}/{role_name}"} + async with self._session as session: + response = await session.get(path, headers=self.headers, params=params) + if response.get("status") != "ok": + raise Exception(response.get("msg", f"Role {role_name} not found")) + return response.get("data") + + async def update_role(self, role: Dict) -> Dict: + """ + Update a role in Casdoor. + + :param role: role dict with updated data + :return: response dict with status + """ + path = "/api/update-role" + params = {"id": f"{role['owner']}/{role['name']}"} + async with self._session as session: + response = await session.post(path, params=params, headers=self.headers, json=role) + if response.get("status") != "ok": + raise Exception(response.get("msg", "Failed to update role")) + return response + + async def assign_role_to_user(self, username: str, role_name: str) -> Dict: + """ + Assign a role to a user by adding the user to the role's users list. + + :param username: the username to assign the role to + :param role_name: the name of the role to assign + :return: response dict with status + """ + user_id = f"{self.org_name}/{username}" + + role = await self.get_role(role_name) + if not role: + raise Exception(f"Role {role_name} not found") + + if not role.get("users"): + role["users"] = [] + + if user_id not in role["users"]: + role["users"].append(user_id) + return await self.update_role(role) + + return {"status": "ok", "msg": "User already has this role"} + + async def remove_role_from_user(self, username: str, role_name: str) -> Dict: + """ + Remove a role from a user by removing the user from the role's users list. + + :param username: the username to remove the role from + :param role_name: the name of the role to remove + :return: response dict with status + """ + user_id = f"{self.org_name}/{username}" + + role = await self.get_role(role_name) + if not role: + raise Exception(f"Role {role_name} not found") + + if role.get("users") and user_id in role["users"]: + role["users"].remove(user_id) + return await self.update_role(role) + + return {"status": "ok", "msg": "User does not have this role"} + + async def get_user_roles(self, username: str) -> List[Dict]: + """ + Get all roles assigned to a user. + + :param username: the username to get roles for + :return: list of role dicts assigned to the user + """ + user_id = f"{self.org_name}/{username}" + all_roles = await self.get_roles() + + user_roles = [] + for role in all_roles: + if role.get("users") and user_id in role["users"]: + user_roles.append(role) + + return user_roles diff --git a/src/casdoor/role.py b/src/casdoor/role.py index c2238b1..76e51bb 100644 --- a/src/casdoor/role.py +++ b/src/casdoor/role.py @@ -125,3 +125,63 @@ def update_role(self, role: Role) -> Dict: def delete_role(self, role: Role) -> Dict: response = self.modify_role("delete-role", role) return response + + def assign_role_to_user(self, username: str, role_name: str) -> Dict: + """ + Assign a role to a user by adding the user to the role's users list. + + :param username: the username to assign the role to + :param role_name: the name of the role to assign + :return: response dict with status + """ + user_id = f"{self.org_name}/{username}" + + role = self.get_role(role_name) + if role is None: + raise Exception(f"Role {role_name} not found") + + if not hasattr(role, "users") or role.users is None: + role.users = [] + + if user_id not in role.users: + role.users.append(user_id) + return self.update_role(role) + + return {"status": "ok", "msg": "User already has this role"} + + def remove_role_from_user(self, username: str, role_name: str) -> Dict: + """ + Remove a role from a user by removing the user from the role's users list. + + :param username: the username to remove the role from + :param role_name: the name of the role to remove + :return: response dict with status + """ + user_id = f"{self.org_name}/{username}" + + role = self.get_role(role_name) + if role is None: + raise Exception(f"Role {role_name} not found") + + if hasattr(role, "users") and role.users and user_id in role.users: + role.users.remove(user_id) + return self.update_role(role) + + return {"status": "ok", "msg": "User does not have this role"} + + def get_user_roles(self, username: str) -> List[Role]: + """ + Get all roles assigned to a user. + + :param username: the username to get roles for + :return: list of Role objects assigned to the user + """ + user_id = f"{self.org_name}/{username}" + all_roles = self.get_roles() + + user_roles = [] + for role in all_roles: + if hasattr(role, "users") and role.users and user_id in role.users: + user_roles.append(role) + + return user_roles