Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions src/casdoor/async_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,3 +462,107 @@ async def update_user(self, user: User) -> Dict:
async def delete_user(self, user: User) -> Dict:
response = await self.modify_user("delete-user", user)
return response

async def get_roles(self) -> List[Dict]:
"""
Get all roles from Casdoor for the configured organization.

:return: list of role dicts
"""
path = "/api/get-roles"
params = {"owner": self.org_name}
async with self._session as session:
response = await session.get(path, headers=self.headers, params=params)
if response.get("status") != "ok":
raise Exception(response.get("msg", "Failed to get roles"))
return response.get("data", [])

async def get_role(self, role_name: str) -> Dict:
"""
Get a specific role from Casdoor.

:param role_name: the name of the role
:return: role dict
"""
path = "/api/get-role"
params = {"id": f"{self.org_name}/{role_name}"}
async with self._session as session:
response = await session.get(path, headers=self.headers, params=params)
if response.get("status") != "ok":
raise Exception(response.get("msg", f"Role {role_name} not found"))
return response.get("data")

async def update_role(self, role: Dict) -> Dict:
"""
Update a role in Casdoor.

:param role: role dict with updated data
:return: response dict with status
"""
path = "/api/update-role"
params = {"id": f"{role['owner']}/{role['name']}"}
async with self._session as session:
response = await session.post(path, params=params, headers=self.headers, json=role)
if response.get("status") != "ok":
raise Exception(response.get("msg", "Failed to update role"))
return response

async def assign_role_to_user(self, username: str, role_name: str) -> Dict:
"""
Assign a role to a user by adding the user to the role's users list.

:param username: the username to assign the role to
:param role_name: the name of the role to assign
:return: response dict with status
"""
user_id = f"{self.org_name}/{username}"

role = await self.get_role(role_name)
if not role:
raise Exception(f"Role {role_name} not found")

if not role.get("users"):
role["users"] = []

if user_id not in role["users"]:
role["users"].append(user_id)
return await self.update_role(role)

return {"status": "ok", "msg": "User already has this role"}

async def remove_role_from_user(self, username: str, role_name: str) -> Dict:
"""
Remove a role from a user by removing the user from the role's users list.

:param username: the username to remove the role from
:param role_name: the name of the role to remove
:return: response dict with status
"""
user_id = f"{self.org_name}/{username}"

role = await self.get_role(role_name)
if not role:
raise Exception(f"Role {role_name} not found")

if role.get("users") and user_id in role["users"]:
role["users"].remove(user_id)
return await self.update_role(role)

return {"status": "ok", "msg": "User does not have this role"}

async def get_user_roles(self, username: str) -> List[Dict]:
"""
Get all roles assigned to a user.

:param username: the username to get roles for
:return: list of role dicts assigned to the user
"""
user_id = f"{self.org_name}/{username}"
all_roles = await self.get_roles()

user_roles = []
for role in all_roles:
if role.get("users") and user_id in role["users"]:
user_roles.append(role)

return user_roles
60 changes: 60 additions & 0 deletions src/casdoor/role.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,63 @@ def update_role(self, role: Role) -> Dict:
def delete_role(self, role: Role) -> Dict:
response = self.modify_role("delete-role", role)
return response

def assign_role_to_user(self, username: str, role_name: str) -> Dict:
"""
Assign a role to a user by adding the user to the role's users list.

:param username: the username to assign the role to
:param role_name: the name of the role to assign
:return: response dict with status
"""
user_id = f"{self.org_name}/{username}"

role = self.get_role(role_name)
if role is None:
raise Exception(f"Role {role_name} not found")

if not hasattr(role, "users") or role.users is None:
role.users = []

if user_id not in role.users:
role.users.append(user_id)
return self.update_role(role)

return {"status": "ok", "msg": "User already has this role"}

def remove_role_from_user(self, username: str, role_name: str) -> Dict:
"""
Remove a role from a user by removing the user from the role's users list.

:param username: the username to remove the role from
:param role_name: the name of the role to remove
:return: response dict with status
"""
user_id = f"{self.org_name}/{username}"

role = self.get_role(role_name)
if role is None:
raise Exception(f"Role {role_name} not found")

if hasattr(role, "users") and role.users and user_id in role.users:
role.users.remove(user_id)
return self.update_role(role)

return {"status": "ok", "msg": "User does not have this role"}

def get_user_roles(self, username: str) -> List[Role]:
"""
Get all roles assigned to a user.

:param username: the username to get roles for
:return: list of Role objects assigned to the user
"""
user_id = f"{self.org_name}/{username}"
all_roles = self.get_roles()

user_roles = []
for role in all_roles:
if hasattr(role, "users") and role.users and user_id in role.users:
user_roles.append(role)

return user_roles