diff --git a/mcserver/urls.py b/mcserver/urls.py index beb3478..9faf436 100644 --- a/mcserver/urls.py +++ b/mcserver/urls.py @@ -71,7 +71,7 @@ router.register(r'sessions', SessionViewSet, "session") router.register(r'videos', VideoViewSet) -router.register(r'trials', TrialViewSet) +router.register(r'trials', TrialViewSet, "trial") router.register(r'results', ResultViewSet) router.register(r'subjects', SubjectViewSet, "subject") router.register(r'subject-tags', SubjectTagViewSet, "subject-tags") diff --git a/mcserver/views.py b/mcserver/views.py index 2d51fc6..85d06c8 100644 --- a/mcserver/views.py +++ b/mcserver/views.py @@ -14,7 +14,6 @@ from django.shortcuts import get_object_or_404 from django.contrib.auth import login from django.core.files.base import ContentFile -from django.utils.timezone import now from django.utils import timezone from django.http import Http404 from django.db.models import Q @@ -169,6 +168,17 @@ class SessionViewSet(viewsets.ModelViewSet): serializer_class = SessionSerializer permission_classes = [IsPublic | ((IsOwner | IsAdmin | IsBackend))] + def get_permissions(self): + custom_permission_classes = None + if self.action == 'status' or self.action == 'get_status': + return [AllowAny(), ] + elif self.action in ['update', 'partial_update', 'destroy', + 'new', 'rename', 'set_metadata', 'set_subject', + 'stop', 'cancel_trial']: + custom_permission_classes = [IsOwner | IsAdmin | IsBackend] + return [permission() for permission in custom_permission_classes] + return super(SessionViewSet, self).get_permissions() + @action(detail=False, methods=['get'], permission_classes=[IsAdmin | IsBackend | IsAuthenticated]) def search_sessions(self, request): """ @@ -215,7 +225,10 @@ def calibration(self, request, pk): if pk == 'undefined': raise ValueError(_("undefined_uuid")) - session = Session.objects.get(pk=pk) + if request.user.groups.filter(name__in=["admin", "backend"]).exists(): + session = get_object_or_404(Session, pk=pk) + else: + session = get_object_or_404(Session, pk=pk, user=request.user) trial = session.trial_set.filter(name="calibration").order_by("-created_at")[0] trial.meta = { @@ -249,7 +262,10 @@ def get_n_calibrated_cameras(self, request, pk): raise ValueError(_("undefined_uuid")) error_message = '' - session = get_object_or_404(Session, pk=pk) + if request.user.groups.filter(name__in=["admin", "backend"]).exists(): + session = get_object_or_404(Session, pk=pk) + else: + session = get_object_or_404(Session, pk=pk, user=request.user) calibration_trials = session.trial_set.filter(name="calibration") last_calibration_trial_num_videos = 0 @@ -307,7 +323,7 @@ def get_n_calibrated_cameras(self, request, pk): @action(detail=True, methods=['post']) def rename(self, request, pk): # Get session. - session = get_object_or_404(Session.objects.all(), pk=pk) + session = get_object_or_404(Session, pk=pk) try: if pk == 'undefined': @@ -315,10 +331,12 @@ def rename(self, request, pk): error_message = "" # Update session name and save. + if not session.meta: + session.meta = {} session.meta["sessionName"] = request.data['sessionNewName'] self.check_object_permissions(self.request, session) session.save() - + serializer = SessionSerializer(session) except Http404: if settings.DEBUG: @@ -504,7 +522,7 @@ def trash(self, request, pk): session = get_object_or_404(Session, pk=pk, user=request.user) session.trashed = True - session.trashed_at = now() + session.trashed_at = timezone.now() session.save() serializer = SessionSerializer(session) @@ -560,8 +578,6 @@ def new(self, request): user = request.user - if not user.is_authenticated: - user = User.objects.get(id=1) session.user = user session.save() @@ -600,7 +616,7 @@ def get_qr(self, request, pk): raise ValueError(_("undefined_uuid")) session = get_object_or_404(Session, pk=pk, user=request.user) - + # get the QR code from the database if session.qrcode: qr = session.qrcode @@ -626,6 +642,14 @@ def get_qr(self, request, pk): res = {'qr': url} + except Http404: + if settings.DEBUG: + raise APIException(_("error") % {"error_message": str(traceback.format_exc())}) + raise NotFound(_("session_uuid_not_found") % {"uuid": str(pk)}) + except ValueError: + if settings.DEBUG: + raise APIException(_("error") % {"error_message": str(traceback.format_exc())}) + raise NotFound(_("session_uuid_not_valid") % {"uuid": str(pk)}) except Exception: if settings.DEBUG: raise APIException(_("error") % {"error_message": str(traceback.format_exc())}) @@ -699,11 +723,6 @@ def new_subject(self, request, pk): return Response(serializer.data) - - def get_permissions(self): - if self.action == 'status' or self.action == 'get_status': - return [AllowAny(), ] - return super(SessionViewSet, self).get_permissions() def get_status(self, request, pk): if pk == 'undefined': @@ -892,6 +911,8 @@ def get_count_from_name(name, base_name): @action(detail=True) def download(self, request, pk): try: + session = get_object_or_404(Session, pk=pk) + self.check_object_permissions(request, session) # Extract protocol and host. if request.is_secure(): host = "https://" + request.get_host() @@ -900,6 +921,10 @@ def download(self, request, pk): session_zip = downloadAndZipSession(pk, host=host) + except PermissionDenied: + if settings.DEBUG: + raise APIException(_("error") % {"error_message": str(traceback.format_exc())}) + raise NotFound(_('permission_denied')) except Exception: if settings.DEBUG: raise APIException(_("error") % {"error_message": str(traceback.format_exc())}) @@ -926,6 +951,11 @@ def async_download(self, request, pk): task = download_session_archive.delay(session.id, request.user.id) else: task = download_session_archive.delay(session.id) + + except PermissionDenied: + if settings.DEBUG: + raise APIException(_("error") % {"error_message": str(traceback.format_exc())}) + raise NotFound(_('permission_denied')) except Exception: if settings.DEBUG: raise APIException(_("error") % {"error_message": str(traceback.format_exc())}) @@ -1029,8 +1059,7 @@ def set_metadata(self, request, pk): raise ValueError(_("undefined_uuid")) session = get_object_or_404(Session, pk=pk) - - + self.check_object_permissions(self.request, session) if not session.meta: session.meta = {} @@ -1108,6 +1137,10 @@ def set_metadata(self, request, pk): if settings.DEBUG: raise APIException(_("error") % {"error_message": str(traceback.format_exc())}) raise NotFound(_("session_uuid_not_valid") % {"uuid": str(pk)}) + except PermissionDenied: + if settings.DEBUG: + raise APIException(_("error") % {"error_message": str(traceback.format_exc())}) + raise NotFound(_('permission_denied')) except Exception: if settings.DEBUG: raise APIException("Error: " + traceback.format_exc()) @@ -1170,6 +1203,7 @@ def stop(self, request, pk): raise ValueError(_("undefined_uuid")) session = get_object_or_404(Session, pk=pk) + self.check_object_permissions(self.request, session) trials = session.trial_set.order_by("-created_at") # name = request.GET.get("name",None) @@ -1222,6 +1256,10 @@ def stop(self, request, pk): if settings.DEBUG: raise APIException(_("error") % {"error_message": str(traceback.format_exc())}) raise NotFound(_("session_uuid_not_valid") % {"uuid": str(pk)}) + except PermissionDenied: + if settings.DEBUG: + raise APIException(_("error") % {"error_message": str(traceback.format_exc())}) + raise NotFound(_('permission_denied')) except Exception: if settings.DEBUG: raise APIException("Error: " + traceback.format_exc()) @@ -1240,6 +1278,7 @@ def cancel_trial(self, request, pk): raise ValueError(_("undefined_uuid")) session = get_object_or_404(Session, pk=pk) + self.check_object_permissions(self.request, session) trials = session.trial_set.order_by("-created_at") # If there is at least one trial, check its status @@ -1258,6 +1297,10 @@ def cancel_trial(self, request, pk): if settings.DEBUG: raise APIException(_("error") % {"error_message": str(traceback.format_exc())}) raise NotFound(_("session_uuid_not_valid") % {"uuid": str(pk)}) + except PermissionDenied: + if settings.DEBUG: + raise APIException(_("error") % {"error_message": str(traceback.format_exc())}) + raise NotFound(_('permission_denied')) except Exception: if settings.DEBUG: raise APIException("Error: " + traceback.format_exc()) @@ -1483,12 +1526,36 @@ def set_session_status(self, request, pk): # - if no it asks again in 5 sec # - if yes it runs processing and sends back the results class TrialViewSet(viewsets.ModelViewSet): - queryset = Trial.objects.all().order_by("created_at") serializer_class = TrialSerializer - permission_classes = [IsPublic | (IsOwner | IsAdmin | IsBackend)] + + def get_queryset(self): + user = self.request.user + # Admins and backend users see all trials + if user.is_authenticated and user.groups.filter(name__in=["admin", "backend"]).exists(): + return Trial.objects.all().order_by("created_at") + # Authenticated users see their own trials and public trials + elif user.is_authenticated: + return Trial.objects.filter( + Q(session__user=user) | Q(session__public=True) + ).order_by("created_at") + # Unauthenticated users see only public trials + else: + return Trial.objects.filter(session__public=True).order_by("created_at") + + def get_permissions(self): + custom_permission_classes = None + if self.action in ['dequeue', 'get_trials_with_status']: + custom_permission_classes = [IsAdmin | IsBackend] + elif self.request.method in ['POST', 'PUT', 'PATCH', 'DELETE']: + custom_permission_classes = [IsOwner | IsAdmin | IsBackend] + + if custom_permission_classes is not None: + return [permission() for permission in custom_permission_classes] + + return super().get_permissions() - @action(detail=False, permission_classes=[((IsAdmin | IsBackend))]) + @action(detail=False) def dequeue(self, request): try: ip = get_client_ip(request) @@ -1500,14 +1567,14 @@ def dequeue(self, request): # find trials with some videos not uploaded not_uploaded = Video.objects.filter(video='', - updated_at__gte=datetime.now() + timedelta(minutes=-15)).values_list("trial__id", flat=True) + updated_at__gte=timezone.now() + timedelta(minutes=-15)).values_list("trial__id", flat=True) if isMonoQuery == 'False': - uploaded_trials = Trial.objects.filter(updated_at__gte=datetime.now() + timedelta(days=-7)).exclude( + uploaded_trials = Trial.objects.filter(updated_at__gte=timezone.now() + timedelta(days=-7)).exclude( id__in=not_uploaded).exclude(session__isMono=True) else: - uploaded_trials = Trial.objects.filter(updated_at__gte=datetime.now() + timedelta(days=-7)).exclude( + uploaded_trials = Trial.objects.filter(updated_at__gte=timezone.now() + timedelta(days=-7)).exclude( id__in=not_uploaded).filter(session__isMono=True) if workerType != 'dynamic': @@ -1538,7 +1605,7 @@ def dequeue(self, request): if trials.count() == 0 and trialsReprocess.count() == 0: raise Http404 - + # prioritize admin and priority group trials (priority group doesn't exist yet, but should have same priv. as user) trialsPrioritized = trials.filter(session__user__groups__name__in=["admin"]) # if no admin trials, go to priority group trials @@ -1557,8 +1624,6 @@ def dequeue(self, request): trial.processed_count += 1 trial.save() - print(ip) - print(trial.session.server) if (not trial.session.server) or len(trial.session.server) < 1: session = Session.objects.get(id=trial.session.id) session.server = ip @@ -1575,7 +1640,7 @@ def dequeue(self, request): return Response(serializer.data) - @action(detail=False, permission_classes=[((IsAdmin | IsBackend))]) + @action(detail=False) def get_trials_with_status(self, request): """ This view returns a list of all the trials with the specified status @@ -1587,7 +1652,7 @@ def get_trials_with_status(self, request): status = self.request.query_params.get('status') # trials with given status and updated_at more than n hours ago trials = Trial.objects.filter(status=status, - updated_at__lte=(datetime.now() - timedelta(hours=hours_since_update))).order_by("-created_at") + updated_at__lte=(timezone.now() - timedelta(hours=hours_since_update))).order_by("-created_at") serializer = TrialSerializer(trials, many=True) @@ -1600,7 +1665,10 @@ def rename(self, request, pk): raise ValueError(_("undefined_uuid")) # Get trial. - trial = get_object_or_404(Trial, pk=pk, session__user=request.user) + if request.user.groups.filter(name__in=["admin", "backend"]).exists(): + trial = get_object_or_404(Trial, pk=pk) + else: + trial = get_object_or_404(Trial, pk=pk, session__user=request.user) # Update trial name and save. trial.name = request.data['trialNewName'] @@ -1633,7 +1701,10 @@ def permanent_remove(self, request, pk): if pk == 'undefined': raise ValueError(_("undefined_uuid")) - trial = get_object_or_404(Trial, pk=pk, session__user=request.user) + if request.user.groups.filter(name__in=["admin", "backend"]).exists(): + trial = get_object_or_404(Trial, pk=pk) + else: + trial = get_object_or_404(Trial, pk=pk, session__user=request.user) trial.delete() except Http404: @@ -1657,9 +1728,12 @@ def trash(self, request, pk): if pk == 'undefined': raise ValueError(_("undefined_uuid")) - trial = get_object_or_404(Trial, pk=pk, session__user=request.user) + if request.user.groups.filter(name__in=["admin", "backend"]).exists(): + trial = get_object_or_404(Trial, pk=pk) + else: + trial = get_object_or_404(Trial, pk=pk, session__user=request.user) trial.trashed = True - trial.trashed_at = now() + trial.trashed_at = timezone.now() trial.save() serializer = TrialSerializer(trial) @@ -1685,7 +1759,10 @@ def restore(self, request, pk): if pk == 'undefined': raise ValueError(_("undefined_uuid")) - trial = get_object_or_404(Trial, pk=pk, session__user=request.user) + if request.user.groups.filter(name__in=["admin", "backend"]).exists(): + trial = get_object_or_404(Trial, pk=pk) + else: + trial = get_object_or_404(Trial, pk=pk, session__user=request.user) trial.trashed = False trial.trashed_at = None trial.save() @@ -1716,7 +1793,10 @@ def modifyTags(self, request, pk): tags = request.data['trialNewTags'] # Get trial. - trial = get_object_or_404(Trial, pk=pk, session__user=request.user) + if request.user.groups.filter(name__in=["admin", "backend"]).exists(): + trial = get_object_or_404(Trial, pk=pk) + else: + trial = get_object_or_404(Trial, pk=pk, session__user=request.user) # Remove previous tags. if TrialTags.objects.filter(trial=trial).exists(): @@ -1798,7 +1878,7 @@ def get_queryset(self): for the currently authenticated user. """ user = self.request.user - if (user.is_authenticated and user.id == 1) or (user.is_authenticated and user.id == 2): + if user.is_authenticated and user.groups.filter(name__in=["admin", "backend"]).exists(): return Subject.objects.all().prefetch_related('subjecttags_set') # public_subject_ids = Session.objects.filter(public=True).values_list('subject_id', flat=True).distinct() # return Subject.objects.filter(Q(user=user) | Q(id__in=public_subject_ids)).prefetch_related('subjecttags_set') @@ -1859,7 +1939,7 @@ def trash(self, request, pk): subject = get_object_or_404(Subject, pk=pk, user=request.user) subject.trashed = True - subject.trashed_at = now() + subject.trashed_at = timezone.now() subject.save() serializer = SubjectSerializer(subject) diff --git a/tests/test_permissions.py b/tests/test_permissions.py new file mode 100644 index 0000000..3ebfc63 --- /dev/null +++ b/tests/test_permissions.py @@ -0,0 +1,1099 @@ +import tempfile +import zipfile +from unittest import mock +from django.contrib.auth.models import Group +from rest_framework.reverse import reverse +from rest_framework.test import APITestCase + +from mcserver.models import ( + User, Session, Trial, Result, Subject +) + +# Helper classes +class UserSetupMixin: + def setUpUsers(self): + self.owner = User.objects.create_user(username='owner', password='pw') + self.owner.otp_verified = True + self.owner.save() + + self.admin = User.objects.create_user(username='admin', password='pw') + admin_group = Group.objects.create(name='admin') + self.admin.groups.add(admin_group) + + self.backend = User.objects.create_user(username='backend', password='pw') + backend_group = Group.objects.create(name='backend') + self.backend.groups.add(backend_group) + + self.other_user = User.objects.create_user(username='other_user', password='pw') + self.other_user.otp_verified = True + self.other_user.save() + + self.unverified_user = User.objects.create_user(username='unverified', password='pw') + self.unverified_user.otp_verified = False + self.unverified_user.save() + + self.users = { + 'owner': self.owner, + 'admin': self.admin, + 'backend': self.backend, + 'other': self.other_user, + 'unverified': self.unverified_user, + } + +# Tests +class SessionsPermissionsTests(UserSetupMixin, APITestCase): + def setUp(self): + self.setUpUsers() + self.list_url = '/sessions/' + + def _setup_session(self, public): + self.session = Session.objects.create(user=self.owner, public=public) + self.detail_url = f'/sessions/{self.session.pk}/' + + def test_get_list(self): + # Test GET /sessions/ (list) + public_session = Session.objects.create(user=self.owner, public=True) + private_session = Session.objects.create(user=self.owner, public=False) + + for role in self.users: + with self.subTest(role=role): + self.client.force_authenticate(user=self.users[role]) + resp = self.client.get(self.list_url) + + self.assertEqual(resp.status_code, 200) + session_ids = [s['id'] for s in resp.data] + # Owners should see public and private sessions + if role == 'owner': + self.assertIn(str(public_session.pk), session_ids) + self.assertIn(str(private_session.pk), session_ids) + else: + self.assertIn(str(public_session.pk), session_ids) + self.assertNotIn(str(private_session.pk), session_ids) + + def test_get_detail(self): + # Test GET /sessions// (retrieve) + for public in [False, True]: + self._setup_session(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + resp = self.client.get(self.detail_url) + + if role in ['owner', 'admin', 'backend']: + expected = 200 + else: + if public is True: + expected = 200 + else: + expected = 404 + self.assertEqual(resp.status_code, expected) + + def test_post(self): + # Test POST /sessions/ + for public in [False, True]: + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + data = { "user": self.owner.pk, + "server": '1.1.1.1', + "public": public } + resp = self.client.post(self.list_url, data) + expected = 201 if role in ['owner', 'admin', 'backend', 'other'] else 403 + self.assertEqual(resp.status_code, expected) + + def test_put(self): + # Test PUT /sessions// (update) + for public in [False, True]: + self._setup_session(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + data = { "user": self.owner.pk, + "server": '1.1.1.1', + "public": public } + resp = self.client.put(self.detail_url, data) + + if public is True: + expected = 200 if role in ['owner', 'admin', 'backend'] else 403 + else: + if role == 'owner': + expected = 200 + elif role in ['admin', 'backend', 'other']: + expected = 404 + else: + expected = 403 + self.assertEqual(resp.status_code, expected) + + def test_patch(self): + # Test PATCH /sessions// (partial_update) + for public in [False, True]: + self._setup_session(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + data = { "server": '1.1.1.1' } + resp = self.client.patch(self.detail_url, data) + + if public is True: + expected = 200 if role in ['owner', 'admin', 'backend'] else 403 + else: + if role == 'owner': + expected = 200 + elif role in ['admin', 'backend', 'other']: + expected = 404 + else: + expected = 403 + self.assertEqual(resp.status_code, expected) + + def test_delete(self): + # Test DELETE /sessions// (destroy) + for public in [False, True]: + for role in self.users: + with self.subTest(role=role, public=public): + # Re-create the session for delete, to make sure the object exists + self._setup_session(public=public) + self.client.force_authenticate(user=self.users[role]) + resp = self.client.delete(self.detail_url) + + if public is True: + expected = 204 if role in ['owner', 'admin', 'backend'] else 403 + else: + if role == 'owner': + expected = 204 + elif role in ['admin', 'backend', 'other']: + expected = 404 + else: + expected = 403 + self.assertEqual(resp.status_code, expected) + + def test_search_sessions(self): + # Test GET /sessions/search_sessions/?text= + for public in [False, True]: + self._setup_session(public=public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + resp = self.client.get(f'/sessions/search_sessions/?text={str(self.session.id)[:8]}') + + self.assertEqual(resp.status_code, 200) + if role == 'owner': + self.assertTrue(any(str(self.session.id) in s['id'] for s in resp.data)) + else: + self.assertFalse(any(str(self.session.id) in s['id'] for s in resp.data)) + + def test_calibration(self): + # Test GET and POST /sessions//calibration/ + for public in [False, True]: + self._setup_session(public) + trial = Trial.objects.create(session=self.session, name='calibration') + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + get_resp = self.client.get(f'/sessions/{self.session.pk}/calibration/') + post_resp = self.client.post(f'/sessions/{self.session.pk}/calibration/', data={ 'calibration_data': 'data' }) + + if role in ['owner', 'admin', 'backend']: + self.assertEqual(get_resp.status_code, 200) + self.assertEqual(post_resp.status_code, 200) + else: + self.assertIn(get_resp.status_code, [403, 404]) + self.assertIn(post_resp.status_code, [403, 404]) + + def test_get_n_calibrated_cameras(self): + # Test GET /sessions//get_n_calibrated_cameras/ + for public in [False, True]: + self._setup_session(public) + trial = Trial.objects.create(session=self.session, name='calibration') + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + resp = self.client.get(f'/sessions/{self.session.pk}/get_n_calibrated_cameras/') + if role in ['owner', 'admin', 'backend']: + self.assertEqual(resp.status_code, 200) + else: + self.assertIn(resp.status_code, [403, 404]) + + def test_rename(self): + # Test POST /sessions//rename/ + for public in [False, True]: + for role in self.users: + with self.subTest(role=role, public=public): + session = Session.objects.create(user=self.owner, public=public) + self.client.force_authenticate(user=self.users[role]) + url = f'/sessions/{session.pk}/rename/' + data = { "sessionNewName": "session_new_name" } + resp = self.client.post(url, data) + + if role in ['owner', 'admin', 'backend']: + expected = 200 + self.assertEqual(resp.status_code, expected) + else: + expected = 404 if role == 'other' else 403 + self.assertEqual(resp.status_code, expected) + + def test_valid(self): + # Test GET and POST /sessions/valid + for role in self.users: + with self.subTest(role=role): + session_public = Session.objects.create(user=self.owner, public=True) + trial_public = Trial.objects.create(session=session_public, + name='neutral', + status='done') + session_private = Session.objects.create(user=self.owner, public=False) + trial_private = Trial.objects.create(session=session_private, + name='neutral', + status='done') + + self.client.force_authenticate(user=self.users[role]) + resp_get = self.client.get('/sessions/valid/') + resp_post = self.client.post('/sessions/valid/') + + if role in ['owner', 'admin', 'backend', 'other']: + self.assertEqual(resp_get.status_code, 200) + self.assertEqual(resp_post.status_code, 200) + + # Check that the response contains the expected trials for owner + session_ids_get = [t['id'] for t in resp_get.data] + session_ids_post = [t['id'] for t in resp_post.data] + if role == 'owner': + self.assertIn(str(session_public.pk), session_ids_get) + self.assertIn(str(session_private.pk), session_ids_get) + self.assertIn(str(session_public.pk), session_ids_post) + self.assertIn(str(session_private.pk), session_ids_post) + else: + self.assertNotIn(str(session_public.pk), session_ids_get) + self.assertNotIn(str(session_private.pk), session_ids_get) + self.assertNotIn(str(session_public.pk), session_ids_post) + self.assertNotIn(str(session_private.pk), session_ids_post) + + else: + self.assertEqual(resp_get.status_code, 200) + self.assertNotIn(str(session_public.pk), session_ids_get) + self.assertNotIn(str(session_private.pk), session_ids_get) + + self.assertEqual(resp_post.status_code, 403) + + def test_permanent_remove(self): + # Test POST /sessions//permanent_remove/ + for public in [False, True]: + for role in self.users: + with self.subTest(role=role, public=public): + self._setup_session(public=public) + self.client.force_authenticate(user=self.users[role]) + url = f'/sessions/{self.session.pk}/permanent_remove/' + resp = self.client.post(url) + + if role == 'owner': + expected = 200 + self.assertEqual(resp.status_code, expected) + else: + expected = 404 if role in ['admin', 'backend', 'other'] else 403 + self.assertEqual(resp.status_code, expected) + + def test_trash(self): + # Test POST /sessions//trash/ + for public in [False, True]: + for role in self.users: + with self.subTest(role=role, public=public): + self._setup_session(public=public) + self.client.force_authenticate(user=self.users[role]) + url = f'/sessions/{self.session.pk}/trash/' + resp = self.client.post(url) + + if role == 'owner': + expected = 200 + self.assertEqual(resp.status_code, expected) + else: + expected = 404 if role in ['admin', 'backend', 'other'] else 403 + self.assertEqual(resp.status_code, expected) + + def test_restore(self): + # Test POST /sessions//restore/ + for public in [False, True]: + for role in self.users: + with self.subTest(role=role, public=public): + self._setup_session(public=public) + self.client.force_authenticate(user=self.users[role]) + url = f'/sessions/{self.session.pk}/restore/' + resp = self.client.post(url) + + if role == 'owner': + expected = 200 + self.assertEqual(resp.status_code, 200) + else: + expected = 404 if role in ['admin', 'backend', 'other'] else 403 + self.assertEqual(resp.status_code, expected) + + def test_new(self): + # Test GET /sessions/new/ + for role in self.users: + with self.subTest(role=role): + self.client.force_authenticate(user=self.users[role]) + resp = self.client.get('/sessions/new/') + + if role in ['owner', 'admin', 'backend', 'other']: + self.assertEqual(resp.status_code, 200) + else: + self.assertEqual(resp.status_code, 403) + + @mock.patch('mcserver.views.boto3.client') + def test_get_qr(self, mock_boto_client): + # Setup mock + mock_s3 = mock.Mock() + mock_boto_client.return_value = mock_s3 + mock_s3.generate_presigned_url.return_value = 'http://fake-url.com/qr-code' + + # Test GET /sessions//get_qr/ + for public in [False, True]: + session = Session.objects.create(user=self.owner, + public=public, + qrcode='fake-qr-code-path.png') + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + resp = self.client.get(f'/sessions/{session.pk}/get_qr/') + + if role == 'owner': + self.assertEqual(resp.status_code, 200) + else: + self.assertIn(resp.status_code, [403, 404]) + + def test_new_subject(self): + for public in [False, True]: + self._setup_session(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + url = f'/sessions/{self.session.pk}/new_subject/' + resp = self.client.get(url) + + if role == 'owner': + self.assertEqual(resp.status_code, 200) + else: + self.assertEqual(resp.status_code, 404) + + def test_status(self): + for public in [False, True]: + self._setup_session(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + url = f'/sessions/{self.session.pk}/status/' + resp = self.client.get(url) + self.assertEqual(resp.status_code, 200) + + def test_record(self): + for public in [False, True]: + self._setup_session(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + url = f'/sessions/{self.session.pk}/record/?name=new_trial_name' + resp = self.client.get(url) + + if role == 'owner': + self.assertEqual(resp.status_code, 200) + else: + self.assertEqual(resp.status_code, 404) + + @mock.patch('mcserver.views.downloadAndZipSession') + def test_download(self, mock_download_and_zip): + # Test GET /sessions//download/ + for public in [False, True]: + self._setup_session(public) + for role in self.users: + with self.subTest(role=role, public=public): + # Create a temporary zip file + with tempfile.NamedTemporaryFile(suffix='.zip') as tmp_zip: + with zipfile.ZipFile(tmp_zip.name, 'w') as zf: + zf.writestr('dummy.txt', 'dummy content') + tmp_zip.flush() + mock_download_and_zip.return_value = tmp_zip.name + + self.client.force_authenticate(user=self.users[role]) + resp = self.client.get(f'/sessions/{self.session.pk}/download/') + + if role in ['other', 'unverified'] and public is False: + self.assertEqual(resp.status_code, 404) + else: + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp['Content-Type'], 'application/zip') + + @mock.patch('mcserver.tasks.download_session_archive.delay') + def test_async_download(self, mock_download_task): + mock_task = mock.Mock() + mock_task.id = 'fake-download_session_archive-id' + mock_download_task.return_value = mock_task + + # Test GET /sessions//async_download/ + for public in [False, True]: + self._setup_session(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + url = f'/sessions/{self.session.pk}/async-download/' + resp = self.client.get(url) + + if public is False: + if role == 'owner': + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.data, {'task_id': 'fake-download_session_archive-id'}) + else: + self.assertEqual(resp.status_code, 404) + else: + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.data, {'task_id': 'fake-download_session_archive-id'}) + + def test_get_session_permission(self): + # Test GET /sessions//get_session_permission/ + for public in [False, True]: + self._setup_session(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + url = f'/sessions/{self.session.pk}/get_session_permission/' + resp = self.client.get(url) + self.assertEqual(resp.status_code, 200) + + def test_get_session_settings(self): + # Test GET /sessions//get_session_settings/ + for public in [False, True]: + self._setup_session(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + url = f'/sessions/{self.session.pk}/get_session_settings/' + resp = self.client.get(url) + + if role in ['other', 'unverified'] and public is False: + self.assertEqual(resp.status_code, 404) + else: + self.assertEqual(resp.status_code, 200) + + def test_set_metadata(self): + # Test GET /sessions//set_metadata/ + for public in [False, True]: + self._setup_session(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + url = f'/sessions/{self.session.pk}/set_metadata/' + data = {} + resp = self.client.get(url, data) + + if role in ['owner', 'admin', 'backend']: + self.assertEqual(resp.status_code, 200) + else: + expected = 404 if role == 'other' else 403 + self.assertEqual(resp.status_code, expected) + + def test_set_subject(self): + # Test GET /sessions//set_subject/ + for public in [False, True]: + for role in self.users: + with self.subTest(role=role, public=public): + self._setup_session(public) + subject = Subject.objects.create(name='test_subject', + user=self.users['owner']) + + self.client.force_authenticate(user=self.users[role]) + url = f'/sessions/{self.session.pk}/set_subject/' + data = { "subject_id": subject.pk } + resp = self.client.get(url, data) + + if role == 'owner': + self.assertEqual(resp.status_code, 200) + else: + expected = 404 if role in ['admin', 'backend', 'other'] else 403 + self.assertEqual(resp.status_code, expected) + + def test_stop(self): + for public in [False, True]: + for role in self.users: + with self.subTest(role=role, public=public): + self._setup_session(public) + trial = Trial.objects.create(session=self.session, name='test_trial') + self.client.force_authenticate(user=self.users[role]) + url = f'/sessions/{self.session.pk}/stop/' + resp = self.client.get(url) + + if role in ['owner', 'admin', 'backend']: + self.assertEqual(resp.status_code, 200) + else: + expected = 404 if role == 'other' else 403 + self.assertEqual(resp.status_code, expected) + + def test_cancel_trial(self): + for public in [False, True]: + for role in self.users: + self._setup_session(public) + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + url = f'/sessions/{self.session.pk}/cancel_trial/' + resp = self.client.get(url) + + if role in ['owner', 'admin', 'backend']: + self.assertEqual(resp.status_code, 200) + else: + expected = 404 if role == 'other' else 403 + self.assertEqual(resp.status_code, expected) + + def test_calibration_img(self): + for public in [False, True]: + self._setup_session(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + url = f'/sessions/{self.session.pk}/calibration_img/' + resp = self.client.get(url) + + if public is True: + self.assertEqual(resp.status_code, 200) + else: + if role in ['owner', 'admin', 'backend']: + self.assertEqual(resp.status_code, 200) + else: + self.assertEqual(resp.status_code, 404) + + def test_neutral_img(self): + for public in [False, True]: + self._setup_session(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + url = f'/sessions/{self.session.pk}/neutral_img/' + resp = self.client.get(url) + + if public is True: + self.assertEqual(resp.status_code, 200) + else: + if role in ['owner', 'admin', 'backend']: + self.assertEqual(resp.status_code, 200) + else: + self.assertEqual(resp.status_code, 404) + + def test_get_session_statuses(self): + for public in [False, True]: + self._setup_session(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + url = f'/sessions/get_session_statuses/' + data = {'status': 'done'} + resp = self.client.post(url, data) + if role in ['admin', 'backend', 'owner', 'other']: + self.assertEqual(resp.status_code, 200) + else: + self.assertEqual(resp.status_code, 403) + + def test_set_session_status(self): + for public in [False, True]: + self._setup_session(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + url = f'/sessions/{self.session.pk}/set_session_status/' + data = { "status": "archived" } + resp = self.client.post(url, data) + + if role in ['admin', 'backend']: + self.assertEqual(resp.status_code, 200) + else: + expected = 403 + self.assertEqual(resp.status_code, expected) + + +class TrialsPermissionsTests(UserSetupMixin, APITestCase): + def setUp(self): + self.setUpUsers() + self.list_url = '/trials/' + + def _setup_trial(self, public): + self.session = Session.objects.create(user=self.owner, public=public) + self.trial = Trial.objects.create(session=self.session) + self.detail_url = f'/trials/{self.trial.pk}/' + + def test_get_list(self): + # Test GET /trials/ (list) + public_session = Session.objects.create(user=self.owner, public=True) + private_session = Session.objects.create(user=self.owner, public=False) + public_trial = Trial.objects.create(session=public_session) + private_trial = Trial.objects.create(session=private_session) + + for role in self.users: + with self.subTest(role=role): + self.client.force_authenticate(user=self.users[role]) + resp = self.client.get(self.list_url) + # Unverified users should get 200 but only see public trials + if role == 'unverified': + self.assertEqual(resp.status_code, 200) + trial_ids = [t['id'] for t in resp.data] + self.assertIn(str(public_trial.pk), trial_ids) + self.assertNotIn(str(private_trial.pk), trial_ids) + else: + self.assertEqual(resp.status_code, 200) + + def test_get_detail(self): + # Test GET /trials// (retrieve) + for public in [False, True]: + self._setup_trial(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + resp = self.client.get(self.detail_url) + + if role in ['owner', 'admin', 'backend']: + expected = 200 + else: + if public is True: + expected = 200 + else: + expected = 404 + self.assertEqual(resp.status_code, expected) + + def test_patch(self): + # Test PATCH /trials// (partial_update) + for public in [False, True]: + self._setup_trial(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + data = { "status": "processing" } + resp = self.client.patch(self.detail_url, data) + + if role in ['owner', 'admin', 'backend']: + expected = 200 + else: + if public is True: + expected = 403 + else: + expected = 404 if role == 'other' else 403 + self.assertEqual(resp.status_code, expected) + + def test_delete(self): + # Test DELETE /trials// (destroy) + for public in [False, True]: + for role in self.users: + with self.subTest(role=role, public=public): + # Re-create the trial for delete, to make sure the object exists + self._setup_trial(public=public) + self.client.force_authenticate(user=self.users[role]) + resp = self.client.delete(self.detail_url) + + if role in ['owner', 'admin', 'backend']: + expected = 204 + else: + if public is True: + expected = 403 + else: + expected = 404 if role == 'other' else 403 + self.assertEqual(resp.status_code, expected) + + def test_dequeue(self): + # Test GET /trials/dequeue/ + # Custom permissions + for public in [False, True]: + for role in self.users: + with self.subTest(role=role, public=public): + self._setup_trial(public) + self.trial.status = 'stopped' + self.trial.save() + self.client.force_authenticate(user=self.users[role]) + url = f'/trials/dequeue/' + resp = self.client.get(url) + if role in ['admin', 'backend']: + expected = 200 + self.assertEqual(resp.status_code, expected) + else: + expected = 403 + self.assertEqual(resp.status_code, expected) + + def test_get_trials_with_status(self): + # Test GET /trials/get_trials_with_status/?status=stopped + for public in [False, True]: + self._setup_trial(public) + self.trial.status = 'stopped' + self.trial.save() + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + url = '/trials/get_trials_with_status/?status=stopped' + resp = self.client.get(url) + if role in ['admin', 'backend']: + expected = 200 + self.assertEqual(resp.status_code, expected) + trial_ids = [t['id'] for t in resp.data] + self.assertIn(str(self.trial.pk), trial_ids) + else: + expected = 403 + self.assertEqual(resp.status_code, expected) + + def test_rename(self): + # Test POST /trials//rename/ + for public in [False, True]: + for role in self.users: + with self.subTest(role=role, public=public): + self._setup_trial(public) + self.client.force_authenticate(user=self.users[role]) + url = f'/trials/{self.trial.pk}/rename/' + data = { "trialNewName": "trial_new_name" } + resp = self.client.post(url, data) + + if role in ['owner', 'admin', 'backend']: + expected = 200 + self.assertEqual(resp.status_code, expected) + self.assertEqual(resp.data['data']['name'], "trial_new_name") + else: + expected = 404 if role == 'other' else 403 + self.assertEqual(resp.status_code, expected) + + def test_permanent_remove(self): + # Test POST /trials//permanent_remove/ + for public in [False, True]: + for role in self.users: + with self.subTest(role=role, public=public): + self._setup_trial(public) + self.client.force_authenticate(user=self.users[role]) + url = f'/trials/{self.trial.pk}/permanent_remove/' + resp = self.client.post(url) + + if role in ['owner', 'admin', 'backend']: + expected = 200 + else: + expected = 404 if role == 'other' else 403 + self.assertEqual(resp.status_code, expected) + + def test_trash(self): + # Test POST /trials//trash/ + for public in [False, True]: + for role in self.users: + with self.subTest(role=role, public=public): + self._setup_trial(public) + self.client.force_authenticate(user=self.users[role]) + url = f'/trials/{self.trial.pk}/trash/' + resp = self.client.post(url) + + if role in ['owner', 'admin', 'backend']: + expected = 200 + else: + expected = 404 if role == 'other' else 403 + self.assertEqual(resp.status_code, expected) + + def test_restore(self): + # Test POST /trials//restore/ + for public in [False, True]: + for role in self.users: + with self.subTest(role=role, public=public): + self._setup_trial(public) + self.client.force_authenticate(user=self.users[role]) + url = f'/trials/{self.trial.pk}/restore/' + resp = self.client.post(url) + + if role in ['owner', 'admin', 'backend']: + expected = 200 + else: + expected = 404 if role == 'other' else 403 + self.assertEqual(resp.status_code, expected) + + def test_modifyTags(self): + # Test POST /trials//modifyTags/ + for public in [False, True]: + for role in self.users: + with self.subTest(role=role, public=public): + self._setup_trial(public) + self.client.force_authenticate(user=self.users[role]) + url = f'/trials/{self.trial.pk}/modifyTags/' + data = { "trialNewTags": ["tag1", "tag2"] } + resp = self.client.post(url, data) + + if role in ['owner', 'admin', 'backend']: + expected = 200 + else: + expected = 404 if role == 'other' else 403 + self.assertEqual(resp.status_code, expected) + + +class ResultsPermissionsTests(UserSetupMixin, APITestCase): + def setUp(self): + self.setUpUsers() + self.list_url = '/results/' + + def _setup_result(self, public): + self.session = Session.objects.create(user=self.owner, public=public) + self.trial = Trial.objects.create(session=self.session) + self.result = Result.objects.create(trial=self.trial, device_id='dev123', tag='tag1') + self.detail_url = f'/results/{self.result.pk}/' + + def test_get_list(self): + # Test GET /results/ (list) + for role in self.users: + with self.subTest(role=role): + self.client.force_authenticate(user=self.users[role]) + resp = self.client.get(self.list_url) + if role in ['owner', 'admin', 'backend', 'other']: + self.assertEqual(resp.status_code, 200) + else: + self.assertEqual(resp.status_code, 403) + + def test_get_detail(self): + # Test GET /results// (retrieve) + for public in [False, True]: + self._setup_result(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + resp = self.client.get(self.detail_url) + if role in ['owner', 'admin', 'backend']: + self.assertEqual(resp.status_code, 200) + else: + self.assertEqual(resp.status_code, 403) + + def test_post(self): + # Test POST /results/ + for public in [False, True]: + self._setup_result(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + data = { + "trial": self.trial.pk, + "tag": "new_tag", + "device_id": "dev999", + "media_url": "fakekey" + } + resp = self.client.post(self.list_url, data) + expected = 201 if role in ['owner', 'admin', 'backend'] else 403 + self.assertEqual(resp.status_code, expected) + + def test_put(self): + # Test PUT /results// (update) + for public in [False, True]: + self._setup_result(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + data = { + "trial": self.trial.pk, + "tag": "updated_tag", + "device_id": "dev123", + "media_url": "fakekey" + } + resp = self.client.put(self.detail_url, data) + expected = 200 if role in ['owner', 'admin', 'backend'] else 403 + self.assertEqual(resp.status_code, expected) + + def test_patch(self): + # Test PATCH /results// (partial_update) + for public in [False, True]: + self._setup_result(public) + for role in self.users: + with self.subTest(role=role, public=public): + self.client.force_authenticate(user=self.users[role]) + data = { "tag": "patched_tag" } + resp = self.client.patch(self.detail_url, data) + expected = 200 if role in ['owner', 'admin', 'backend'] else 403 + self.assertEqual(resp.status_code, expected) + + def test_delete(self): + # Test DELETE /results// (destroy) + for public in [False, True]: + for role in self.users: + with self.subTest(role=role, public=public): + # Re-create the result for delete, to make sure the object exists + self._setup_result(public=public) + self.client.force_authenticate(user=self.users[role]) + resp = self.client.delete(self.detail_url) + expected = 204 if role in ['owner', 'admin', 'backend'] else 403 + self.assertEqual(resp.status_code, expected) + + +class SubjectPermissionsTests(UserSetupMixin, APITestCase): + def setUp(self): + self.setUpUsers() + self.list_url = '/subjects/' + + def _setup_subject(self): + self.subject = Subject.objects.create(name='test_subject', user=self.owner) + self.detail_url = f'/subjects/{self.subject.pk}/' + + def test_get_list(self): + # Test GET /subjects/ (list) + self._setup_subject() + for role in self.users: + with self.subTest(role=role): + self.client.force_authenticate(user=self.users[role]) + resp = self.client.get(self.list_url) + + if role in ['owner', 'admin', 'backend', 'other']: + self.assertEqual(resp.status_code, 200) + + if role in ['owner', 'admin', 'backend']: + self.assertEqual(len(resp.data['subjects']), 1) + else: + self.assertEqual(len(resp.data['subjects']), 0) + else: + self.assertEqual(resp.status_code, 403) + + def test_get_detail(self): + # Test GET /subjects// (retrieve) + self._setup_subject() + for role in self.users: + with self.subTest(role=role): + self.client.force_authenticate(user=self.users[role]) + resp = self.client.get(self.detail_url) + if role in ['owner', 'admin', 'backend']: + self.assertEqual(resp.status_code, 200) + elif role == 'other': + self.assertEqual(resp.status_code, 404) + else: + self.assertEqual(resp.status_code, 403) + + def test_post(self): + for role in self.users: + with self.subTest(role=role): + self.client.force_authenticate(user=self.users[role]) + data = { "name": "new_subject" } + resp = self.client.post(self.list_url, data) + + if role in ['owner', 'admin', 'backend', 'other']: + self.assertEqual(resp.status_code, 201) + else: + self.assertEqual(resp.status_code, 403) + + def test_put(self): + for role in self.users: + with self.subTest(role=role): + self._setup_subject() + self.client.force_authenticate(user=self.users[role]) + data = { "id": self.subject.pk, + "name": 'new_subject_name', + "subject_tags": ['one', 'two'] } + resp = self.client.put(self.detail_url, data) + + if role in ['owner', 'admin', 'backend']: + self.assertEqual(resp.status_code, 200) + elif role == 'other': + self.assertEqual(resp.status_code, 404) + else: + self.assertEqual(resp.status_code, 403) + + def test_patch(self): + for role in self.users: + with self.subTest(role=role): + self._setup_subject() + self.client.force_authenticate(user=self.users[role]) + data = { "id": self.subject.pk, + "name": 'new_subject_name', + "subject_tags": ['one', 'two'] } + resp = self.client.patch(self.detail_url, data) + + if role in ['owner', 'admin', 'backend']: + self.assertEqual(resp.status_code, 200) + elif role == 'other': + self.assertEqual(resp.status_code, 404) + else: + self.assertEqual(resp.status_code, 403) + + def test_delete(self): + for role in self.users: + with self.subTest(role=role): + self._setup_subject() + self.client.force_authenticate(user=self.users[role]) + resp = self.client.delete(self.detail_url) + + if role in ['owner', 'admin', 'backend']: + self.assertEqual(resp.status_code, 204) + elif role == 'other': + self.assertEqual(resp.status_code, 404) + else: + self.assertEqual(resp.status_code, 403) + + def test_trash(self): + for role in self.users: + with self.subTest(role=role): + self._setup_subject() + self.client.force_authenticate(user=self.users[role]) + url = f'/subjects/{self.subject.pk}/trash/' + resp = self.client.post(url) + + if role in ['owner']: + self.assertEqual(resp.status_code, 200) + elif role in ['admin', 'backend', 'other']: + self.assertEqual(resp.status_code, 404) + else: + self.assertEqual(resp.status_code, 403) + + def test_restore(self): + for role in self.users: + with self.subTest(role=role): + self._setup_subject() + self.client.force_authenticate(user=self.users[role]) + url = f'/subjects/{self.subject.pk}/restore/' + resp = self.client.post(url) + + if role in ['owner']: + self.assertEqual(resp.status_code, 200) + elif role in ['admin', 'backend', 'other']: + self.assertEqual(resp.status_code, 404) + else: + self.assertEqual(resp.status_code, 403) + + @mock.patch('mcserver.views.downloadAndZipSubject') + def test_download(self, mock_download_and_zip): + for role in self.users: + with self.subTest(role=role): + # Create a temporary zip file + with tempfile.NamedTemporaryFile(suffix='.zip') as tmp_zip: + with zipfile.ZipFile(tmp_zip.name, 'w') as zf: + zf.writestr('dummy.txt', 'dummy content') + tmp_zip.flush() + mock_download_and_zip.return_value = tmp_zip.name + + self._setup_subject() + self.client.force_authenticate(user=self.users[role]) + url = f'/subjects/{self.subject.pk}/download/' + resp = self.client.get(url) + + if role == 'owner': + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp['Content-Type'], 'application/zip') + elif role in ['admin', 'backend', 'other']: + self.assertEqual(resp.status_code, 404) + else: + self.assertEqual(resp.status_code, 403) + + @mock.patch('mcserver.tasks.download_subject_archive.delay') + def test_async_download(self, mock_download_task): + mock_task = mock.Mock() + mock_task.id = 'fake-download_subject_archive-id' + mock_download_task.return_value = mock_task + # Test GET /subject//async-download/ + self._setup_subject() + for role in self.users: + with self.subTest(role=role): + self.client.force_authenticate(user=self.users[role]) + url = f'/subjects/{self.subject.pk}/async-download/' + resp = self.client.get(url) + + if role == 'owner': + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.data, {'task_id': 'fake-download_subject_archive-id'}) + elif role in ['admin', 'backend', 'other']: + self.assertEqual(resp.status_code, 404) + else: + self.assertEqual(resp.status_code, 403) + + def test_permanent_remove(self): + for role in self.users: + with self.subTest(role=role): + self._setup_subject() + self.client.force_authenticate(user=self.users[role]) + url = f'/subjects/{self.subject.pk}/permanent_remove/' + resp = self.client.post(url) + + if role == 'owner': + self.assertEqual(resp.status_code, 200) + elif role in ['admin', 'backend', 'other']: + self.assertEqual(resp.status_code, 404) + else: + self.assertEqual(resp.status_code, 403)