diff --git a/tests/test_mig_shared_vgrid.py b/tests/test_mig_shared_vgrid.py index d6aa79d88..55a8f3be2 100644 --- a/tests/test_mig_shared_vgrid.py +++ b/tests/test_mig_shared_vgrid.py @@ -3,7 +3,7 @@ # --- BEGIN_HEADER --- # # test_mig_shared_vgrid - unit tests for vgrid helper functions -# Copyright (C) 2003-2025 The MiG Project by the Science HPC Center at UCPH +# Copyright (C) 2003-2026 The MiG Project by the Science HPC Center at UCPH # # This file is part of MiG. # @@ -34,21 +34,26 @@ from mig.shared.base import client_id_dir from mig.shared.serial import dump -from mig.shared.vgrid import get_vgrid_workflow_jobs, legacy_main, \ - vgrid_add_entities, vgrid_add_members, vgrid_add_owners, \ +from mig.shared.vgrid import get_vgrid_workflow_jobs, init_vgrid_script_add_rem, \ + legacy_main, vgrid_add_entities, vgrid_add_members, vgrid_add_owners, \ vgrid_add_resources, vgrid_add_workflow_jobs, vgrid_allow_restrict_write, \ vgrid_exists, vgrid_flat_name, vgrid_is_default, vgrid_is_member, \ vgrid_is_owner, vgrid_is_owner_or_member, vgrid_is_trigger, vgrid_list, \ vgrid_list_parents, vgrid_list_subvgrids, vgrid_list_vgrids, \ vgrid_match_resources, vgrid_nest_sep, vgrid_remove_entities, \ vgrid_restrict_write, vgrid_set_entities, vgrid_set_owners, \ - vgrid_settings + vgrid_settings, is_user from tests.support import MigTestCase, ensure_dirs_exist, testmain class TestMigSharedVgrid(MigTestCase): """Unit tests for vgrid helpers""" # Standard user IDs following X.500 DN format + TEST_USER_DN = '/C=DK/ST=NA/L=NA/O=Test Org/OU=NA/CN=Test User/'\ + 'emailAddress=test@example.com' + TEST_UNKNOWN_DN = \ + '/C=XX/ST=NA/L=NA/O=Test Org/OU=NA/CN=Unknown User/'\ + 'emailAddress=unknown@example.com' TEST_OWNER_DN = \ '/C=DK/ST=NA/L=NA/O=Test Org/OU=NA/CN=Test Owner/'\ 'emailAddress=owner@example.com' @@ -58,7 +63,8 @@ class TestMigSharedVgrid(MigTestCase): TEST_OUTSIDER_DN = \ '/C=DK/ST=NA/L=NA/O=Test Org/OU=NA/CN=Test Outsider/'\ 'emailAddress=outsider@example.com' - TEST_RESOURCE_DN = 'test.example.org' + TEST_RESOURCE_ID = 'test.example.org' + TEST_UNKNOWN_ID = 'no-such-resource.example.org' TEST_OWNER_DIR = \ '+C=DK+ST=NA+L=NA+O=Test_Org+OU=NA+CN=Test_Owner+'\ 'emailAddress=owner@example.com' @@ -72,6 +78,10 @@ def before_each(self): """Create test environment for vgrid tests""" # Setup configuration + ensure_dirs_exist(self.configuration.user_db_home) + ensure_dirs_exist(self.configuration.user_home) + ensure_dirs_exist(self.configuration.user_settings) + ensure_dirs_exist(self.configuration.resource_home) ensure_dirs_exist(self.configuration.vgrid_home) ensure_dirs_exist(self.configuration.vgrid_files_home) ensure_dirs_exist(self.configuration.vgrid_files_writable) @@ -303,14 +313,14 @@ def test_vgrid_list_parents(self): def test_vgrid_match_resources(self): """Test resource filtering for vgrid""" - test_resources = ['res1', 'res2', 'invalid_res', self.TEST_RESOURCE_DN] + test_resources = ['res1', 'res2', 'invalid_res', self.TEST_RESOURCE_ID] added, msg = vgrid_add_entities(self.configuration, self.test_vgrid, - 'resources', [self.TEST_RESOURCE_DN]) + 'resources', [self.TEST_RESOURCE_ID]) self.assertTrue(added, msg) matched = vgrid_match_resources(self.test_vgrid, test_resources, self.configuration) - self.assertEqual(matched, [self.TEST_RESOURCE_DN]) + self.assertEqual(matched, [self.TEST_RESOURCE_ID]) # TODO: adjust API to allow enabling the next test @unittest.skipIf(True, "requires read-only mount") @@ -668,6 +678,287 @@ def test_vgrid_entity_listing(self): self.assertTrue(status) self.assertEqual(owners, [self.TEST_OWNER_DN]) + def test_init_vgrid_script_add_rem_valid_params(self): + """Test valid parameters for init_vgrid_script_add_rem""" + # Ensure test users exist before testing + self._provision_test_user(self, self.TEST_OWNER_DN) + # TODO: this fails as _provision_test_user hard-codes TEST_USER_DN! + # self.assertTrue(is_user(self.TEST_OWNER_DN, self.configuration)) + self._provision_test_user(self, self.TEST_USER_DN) + self.assertTrue(is_user(self.TEST_USER_DN, self.configuration)) + self._provision_test_user(self, self.TEST_MEMBER_DN) + # TODO: this fails as _provision_test_user hard-codes TEST_USER_DN! + # self.assertTrue(is_user(self.TEST_MEMBER_DN, self.configuration)) + + vgrid_name = self.test_vgrid + # TODO: fix _provision_test_user and use TEST_MEMBER_DN instead here + # subject = self.TEST_MEMBER_DN + subject = self.TEST_USER_DN + subject_type = 'member' + + status, msg, _ = init_vgrid_script_add_rem( + vgrid_name, self.TEST_OWNER_DN, subject, subject_type, self.configuration + ) + self.assertTrue(status, msg) + + def test_init_vgrid_script_add_rem_non_owner(self): + """Test non-owner attempt with init_vgrid_script_add_rem""" + # Ensure test users exist before testing + self._provision_test_user(self, self.TEST_OUTSIDER_DN) + # TODO: this fails as _provision_test_user hard-codes TEST_USER_DN! + # self.assertTrue(is_user(self.TEST_OUTSIDER_DN, self.configuration)) + self._provision_test_user(self, self.TEST_USER_DN) + self.assertTrue(is_user(self.TEST_USER_DN, self.configuration)) + self._provision_test_user(self, self.TEST_MEMBER_DN) + # TODO: this fails as _provision_test_user hard-codes TEST_USER_DN! + # self.assertTrue(is_user(self.TEST_MEMBER_DN, self.configuration)) + + vgrid_name = self.test_vgrid + # TODO: fix _provision_test_user and use TEST_MEMBER_DN instead here + # subject = self.TEST_MEMBER_DN + subject = self.TEST_USER_DN + subject_type = 'member' + + status, msg, _ = init_vgrid_script_add_rem( + vgrid_name, self.TEST_OUTSIDER_DN, subject, subject_type, self.configuration + ) + self.assertFalse(status) + self.assertIn('must be an owner', msg) + + def test_init_vgrid_script_add_rem_invalid_vgrid(self): + """Test invalid vgrid name validation""" + vgrid_name = '/../invalid!vgrid' + subject = self.TEST_MEMBER_DN + subject_type = 'member' + + status, msg, _ = init_vgrid_script_add_rem( + vgrid_name, self.TEST_OWNER_DN, subject, subject_type, self.configuration + ) + self.assertFalse(status) + self.assertIn('Illegal vgrid_name', msg) + + def test_init_vgrid_script_add_rem_missing_subject_type(self): + """Test invalid subject_type parameter""" + vgrid_name = self.test_vgrid + subject = self.TEST_MEMBER_DN + subject_type = None + + status, msg, _ = init_vgrid_script_add_rem( + vgrid_name, self.TEST_OWNER_DN, subject, subject_type, self.configuration + ) + self.assertFalse(status) + self.assertIn('unknown subject type', msg) + + def test_init_vgrid_script_add_rem_invalid_subject_type(self): + """Test invalid subject_type parameter""" + vgrid_name = self.test_vgrid + subject = self.TEST_MEMBER_DN + subject_type = 'invalid_type' + + status, msg, _ = init_vgrid_script_add_rem( + vgrid_name, self.TEST_OWNER_DN, subject, subject_type, self.configuration + ) + self.assertFalse(status) + self.assertIn('unknown subject type', msg) + + def test_init_vgrid_script_add_rem_member_self_removal(self): + """Test member self-removal exception""" + # Add member first + vgrid_add_members(self.configuration, self.test_vgrid, + [self.TEST_MEMBER_DN]) + + vgrid_name = self.test_vgrid + subject = self.TEST_MEMBER_DN + subject_type = 'member' + + status, msg, _ = init_vgrid_script_add_rem( + vgrid_name, self.TEST_MEMBER_DN, subject, subject_type, + self.configuration, from_remove=True + ) + self.assertTrue(status, msg) + + def test_init_vgrid_script_add_rem_add_owner_as_non_owner(self): + """Test attempt to add owner without privileges""" + # Ensure test users exist + self._provision_test_user(self, self.TEST_OWNER_DN) + self._provision_test_user(self, self.TEST_USER_DN) + self._provision_test_user(self, self.TEST_OUTSIDER_DN) + + subject = self.TEST_USER_DN + subject_type = 'owner' + + status, msg, _ = init_vgrid_script_add_rem( + self.test_vgrid, self.TEST_OUTSIDER_DN, subject, subject_type, + self.configuration, from_remove=False + ) + self.assertFalse(status) + self.assertIn('must be an owner', msg) + + def test_init_vgrid_script_add_rem_remove_owner_as_non_owner(self): + """Test attempt to remove owner without privileges""" + # Ensure test users exist + self._provision_test_user(self, self.TEST_OWNER_DN) + self._provision_test_user(self, self.TEST_USER_DN) + self._provision_test_user(self, self.TEST_OUTSIDER_DN) + + subject = self.TEST_USER_DN + subject_type = 'owner' + + status, msg, _ = init_vgrid_script_add_rem( + self.test_vgrid, self.TEST_OUTSIDER_DN, subject, subject_type, + self.configuration, from_remove=True + ) + self.assertFalse(status) + self.assertIn('must be an owner', msg) + + def test_init_vgrid_script_add_rem_add_unknown_resource(self): + """Test addition of (e.g. pending) unknown resource always allowed""" + subject = self.TEST_UNKNOWN_ID + subject_type = 'resource' + + status, msg, _ = init_vgrid_script_add_rem( + self.test_vgrid, self.TEST_OWNER_DN, subject, subject_type, + self.configuration, from_remove=False + ) + self.assertTrue(status, msg) + + def test_init_vgrid_script_add_rem_remove_unknown_resource(self): + """Test removal of unknown resource always allowed""" + subject = self.TEST_UNKNOWN_ID + subject_type = 'resource' + + status, msg, _ = init_vgrid_script_add_rem( + self.test_vgrid, self.TEST_OWNER_DN, subject, subject_type, + self.configuration, from_remove=True + ) + self.assertTrue(status, msg) + + def test_init_vgrid_script_add_rem_add_unknown_user(self): + """Test addition of unknown user refused""" + unknown_user = self.TEST_UNKNOWN_DN + subject_type = 'member' + + status, msg, _ = init_vgrid_script_add_rem( + self.test_vgrid, self.TEST_OWNER_DN, unknown_user, subject_type, + self.configuration, from_remove=False + ) + self.assertFalse(status, msg) + + def test_init_vgrid_script_add_rem_remove_unknown_user(self): + """Test removal of unknown user always allowed""" + unknown_user = self.TEST_UNKNOWN_DN + subject_type = 'member' + + status, msg, _ = init_vgrid_script_add_rem( + self.test_vgrid, self.TEST_OWNER_DN, unknown_user, subject_type, + self.configuration, from_remove=True + ) + self.assertTrue(status, msg) + + def test_init_vgrid_script_add_rem_modify_trigger_as_non_owner(self): + """Test modification attempt of trigger by non-owner""" + # Ensure test users exist + self._provision_test_user(self, self.TEST_OWNER_DN) + self._provision_test_user(self, self.TEST_OUTSIDER_DN) + + # Add test trigger + test_trigger = { + 'rule_id': 'test_rule', + 'vgrid_name': self.test_vgrid, + 'path': '*.txt', + 'changes': ['modified'], + 'run_as': self.TEST_OWNER_DN, + 'action': 'copy', + 'arguments': ['source', 'dest'], + 'match_files': True + } + vgrid_add_entities(self.configuration, self.test_vgrid, + 'triggers', [test_trigger]) + + status, msg, _ = init_vgrid_script_add_rem( + self.test_vgrid, self.TEST_OUTSIDER_DN, 'test_rule', 'trigger', + self.configuration, from_remove=False + ) + self.assertFalse(status) + self.assertIn('You must be the owner', msg) + + def test_init_vgrid_script_add_rem_remove_trigger_as_non_owner(self): + """Test removal attempt of trigger by non-owner""" + # Ensure test users exist + self._provision_test_user(self, self.TEST_OWNER_DN) + self._provision_test_user(self, self.TEST_OUTSIDER_DN) + + # Add test trigger + test_trigger = { + 'rule_id': 'test_rule', + 'vgrid_name': self.test_vgrid, + 'path': '*.txt', + 'changes': ['modified'], + 'run_as': self.TEST_OWNER_DN, + 'action': 'copy', + 'arguments': ['source', 'dest'], + 'match_files': True + } + vgrid_add_entities(self.configuration, self.test_vgrid, + 'triggers', [test_trigger]) + + status, msg, _ = init_vgrid_script_add_rem( + self.test_vgrid, self.TEST_OUTSIDER_DN, 'test_rule', 'trigger', + self.configuration, from_remove=True + ) + self.assertFalse(status) + self.assertIn('You must be an owner', msg) + + def test_init_vgrid_script_add_rem_modify_trigger_as_owner(self): + """Test modification attempt of trigger by trigger owner""" + # Ensure test user exists + self._provision_test_user(self, self.TEST_OWNER_DN) + + # Add test trigger + test_trigger = { + 'rule_id': 'test_rule', + 'vgrid_name': self.test_vgrid, + 'path': '*.txt', + 'changes': ['modified'], + 'run_as': self.TEST_OWNER_DN, + 'action': 'copy', + 'arguments': ['source', 'dest'], + 'match_files': True + } + vgrid_add_entities(self.configuration, self.test_vgrid, + 'triggers', [test_trigger]) + + status, msg, _ = init_vgrid_script_add_rem( + self.test_vgrid, self.TEST_OWNER_DN, 'test_rule', 'trigger', + self.configuration, from_remove=False + ) + self.assertTrue(status, msg) + + def test_init_vgrid_script_add_rem_remove_trigger_as_owner(self): + """Test removal of trigger by trigger owner""" + # Ensure test user exists + self._provision_test_user(self, self.TEST_OWNER_DN) + + # Add test trigger + test_trigger = { + 'rule_id': 'test_rule', + 'vgrid_name': self.test_vgrid, + 'path': '*.txt', + 'changes': ['modified'], + 'run_as': self.TEST_OWNER_DN, + 'action': 'copy', + 'arguments': ['source', 'dest'], + 'match_files': True + } + vgrid_add_entities(self.configuration, self.test_vgrid, + 'triggers', [test_trigger]) + + status, msg, _ = init_vgrid_script_add_rem( + self.test_vgrid, self.TEST_OWNER_DN, 'test_rule', 'trigger', + self.configuration, from_remove=True + ) + self.assertTrue(status, msg) + def test_vgrid_add_members_single(self): """Test vgrid_add_owners for initial member""" # Clear existing members to start fresh @@ -855,13 +1146,13 @@ def test_resource_signup_workflow(self): """Test full resource signup workflow""" # Sign up resource added, msg = vgrid_add_resources(self.configuration, self.test_vgrid, - [self.TEST_RESOURCE_DN]) + [self.TEST_RESOURCE_ID]) self.assertTrue(added, msg) # Verify visibility - matched = vgrid_match_resources(self.test_vgrid, [self.TEST_RESOURCE_DN], + matched = vgrid_match_resources(self.test_vgrid, [self.TEST_RESOURCE_ID], self.configuration) - self.assertEqual(matched, [self.TEST_RESOURCE_DN]) + self.assertEqual(matched, [self.TEST_RESOURCE_ID]) def test_multi_level_inheritance(self): """Test settings propagation through multiple vgrid levels"""