diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 957be3b8..076950f2 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -3,7 +3,7 @@ # semantic-release is also run to create a new release (if # warranted by the new commits being built). -name: Build/Test +name: build on: push: @@ -16,7 +16,7 @@ on: jobs: detect-secrets: if: "!contains(github.event.head_commit.message, '[skip ci]')" - name: Detect-Secrets + name: detect-secrets runs-on: ubuntu-latest steps: @@ -38,8 +38,8 @@ jobs: detect-secrets -v audit --report --fail-on-unaudited --fail-on-live --fail-on-audited-real .secrets.baseline build: + name: build-test (python ${{ matrix.python-version }}) needs: detect-secrets - name: Build/Test (Python ${{ matrix.python-version }}) runs-on: ubuntu-latest strategy: @@ -59,8 +59,8 @@ jobs: run: make ci create-release: + name: semantic-release needs: build - name: Semantic-Release if: "github.ref_name == 'main' && github.event_name != 'pull_request'" runs-on: ubuntu-latest diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index 5d55a5fc..2b6c652e 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -3,7 +3,7 @@ # - building and publishing javadocs to the git repository. # It is triggered when a new release is created. -name: Publish +name: publish on: release: types: [created] @@ -12,7 +12,7 @@ on: jobs: publish: - name: Publish Release + name: publish-pypi runs-on: ubuntu-latest steps: diff --git a/.pylintrc b/.pylintrc index eb581058..eecb6e4f 100644 --- a/.pylintrc +++ b/.pylintrc @@ -8,7 +8,8 @@ disable= too-many-arguments, unnecessary-pass, no-member, - consider-using-f-string + consider-using-f-string, + too-many-instance-attributes [TYPECHECK] ignored-classes= responses diff --git a/.secrets.baseline b/.secrets.baseline index 262b3dfc..31065ddc 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -3,7 +3,7 @@ "files": "package-lock.json|^.secrets.baseline$", "lines": null }, - "generated_at": "2025-01-09T21:56:01Z", + "generated_at": "2025-05-27T14:04:58Z", "plugins_used": [ { "name": "AWSKeyDetector" @@ -70,7 +70,7 @@ "hashed_secret": "91dfd9ddb4198affc5c194cd8ce6d338fde470e2", "is_secret": false, "is_verified": false, - "line_number": 66, + "line_number": 67, "type": "Secret Keyword", "verified_result": null }, @@ -78,7 +78,7 @@ "hashed_secret": "4f51cde3ac0a5504afa4bc06859b098366592c19", "is_secret": false, "is_verified": false, - "line_number": 207, + "line_number": 208, "type": "Secret Keyword", "verified_result": null }, @@ -86,7 +86,7 @@ "hashed_secret": "e87559ed7decb62d0733ae251ae58d42a55291d8", "is_secret": false, "is_verified": false, - "line_number": 209, + "line_number": 210, "type": "Secret Keyword", "verified_result": null }, @@ -94,7 +94,7 @@ "hashed_secret": "12f4a68ed3d0863e56497c9cdb1e2e4e91d5cb68", "is_secret": false, "is_verified": false, - "line_number": 273, + "line_number": 274, "type": "Secret Keyword", "verified_result": null }, @@ -102,7 +102,7 @@ "hashed_secret": "c837b75d7cd93ef9c2243ca28d6e5156259fd253", "is_secret": false, "is_verified": false, - "line_number": 277, + "line_number": 278, "type": "Secret Keyword", "verified_result": null }, @@ -110,7 +110,7 @@ "hashed_secret": "98635b2eaa2379f28cd6d72a38299f286b81b459", "is_secret": false, "is_verified": false, - "line_number": 502, + "line_number": 505, "type": "Secret Keyword", "verified_result": null }, @@ -118,7 +118,7 @@ "hashed_secret": "47fcf185ee7e15fe05cae31fbe9e4ebe4a06a40d", "is_secret": false, "is_verified": false, - "line_number": 597, + "line_number": 694, "type": "Secret Keyword", "verified_result": null } @@ -213,6 +213,16 @@ "verified_result": null } ], + "resources/ibm-credentials-mcspv2.env": [ + { + "hashed_secret": "f2e7745f43b0ef0e2c2faf61d6c6a28be2965750", + "is_secret": false, + "is_verified": false, + "line_number": 23, + "type": "Secret Keyword", + "verified_result": null + } + ], "resources/ibm-credentials-retry.env": [ { "hashed_secret": "ce49dd46e23153d6593eccd68534b9f1465d5bbd", @@ -341,6 +351,40 @@ "verified_result": null } ], + "test/test_get_authenticator.py": [ + { + "hashed_secret": "34a0a47a51d5bf739df0214450385e29ee7e9847", + "is_secret": false, + "is_verified": false, + "line_number": 256, + "type": "Secret Keyword", + "verified_result": null + }, + { + "hashed_secret": "f2e7745f43b0ef0e2c2faf61d6c6a28be2965750", + "is_secret": false, + "is_verified": false, + "line_number": 267, + "type": "Secret Keyword", + "verified_result": null + }, + { + "hashed_secret": "2863fa4b5510c46afc2bd2998dfbc0cf3d6df032", + "is_secret": false, + "is_verified": false, + "line_number": 348, + "type": "Secret Keyword", + "verified_result": null + }, + { + "hashed_secret": "b9cad336062c0dc3bb30145b1a6697fccfe755a6", + "is_secret": false, + "is_verified": false, + "line_number": 409, + "type": "Secret Keyword", + "verified_result": null + } + ], "test/test_iam_assume_authenticator.py": [ { "hashed_secret": "4080eeeaf54faf879b9e8d99c49a8503f7e855bb", @@ -507,36 +551,28 @@ "verified_result": null } ], - "test/test_utils.py": [ - { - "hashed_secret": "34a0a47a51d5bf739df0214450385e29ee7e9847", - "is_secret": false, - "is_verified": false, - "line_number": 453, - "type": "Secret Keyword", - "verified_result": null - }, + "test/test_mcspv2_authenticator.py": [ { "hashed_secret": "f2e7745f43b0ef0e2c2faf61d6c6a28be2965750", "is_secret": false, "is_verified": false, - "line_number": 464, + "line_number": 15, "type": "Secret Keyword", "verified_result": null }, { - "hashed_secret": "2863fa4b5510c46afc2bd2998dfbc0cf3d6df032", + "hashed_secret": "da2f27d2c57a0e1ed2dc3a34b4ef02faf2f7a4c2", "is_secret": false, "is_verified": false, - "line_number": 545, - "type": "Secret Keyword", + "line_number": 246, + "type": "Hex High Entropy String", "verified_result": null }, { - "hashed_secret": "b9cad336062c0dc3bb30145b1a6697fccfe755a6", + "hashed_secret": "ace1a5bf229c3af3f699369c6f2fa1a628692ab8", "is_secret": false, "is_verified": false, - "line_number": 606, + "line_number": 392, "type": "Secret Keyword", "verified_result": null } diff --git a/Authentication.md b/Authentication.md index 5c5ee586..8ad6dd20 100644 --- a/Authentication.md +++ b/Authentication.md @@ -7,7 +7,8 @@ The python-sdk-core project supports the following types of authentication: - Container Authentication - VPC Instance Authentication - Cloud Pak for Data Authentication -- Multi-Cloud Saas Platform (MCSP) Authentication +- Multi-Cloud Saas Platform (MCSP) V1 Authentication +- Multi-Cloud Saas Platform (MCSP) V2 Authentication - No Authentication (for testing) The SDK user configures the appropriate type of authentication for use with service instances. @@ -546,11 +547,11 @@ service = ExampleServiceV1.new_instance(service_name='example_service') ``` -## Multi-Cloud Saas Platform (MCSP) Authentication +## Multi-Cloud Saas Platform (MCSP) V1 Authentication The `MCSPAuthenticator` can be used in scenarios where an application needs to interact with an IBM Cloud service that has been deployed to a non-IBM Cloud environment (e.g. AWS). -It accepts a user-supplied apikey and performs the necessary interactions with the -Multi-Cloud Saas Platform token service to obtain a suitable MCSP access token (a bearer token) +It accepts a user-supplied apikey and invokes the Multi-Cloud Saas Platform token service's +`POST /siusermgr/api/1.0/apikeys/token` operation to obtain a suitable MCSP access token (a bearer token) for the specified apikey. The authenticator will also obtain a new bearer token when the current token expires. The bearer token is then added to each outbound request in the `Authorization` header in the @@ -610,6 +611,104 @@ service = ExampleServiceV1.new_instance(service_name='example_service') ``` +## Multi-Cloud Saas Platform (MCSP) V2 Authentication +The `MCSPV2Authenticator` can be used in scenarios where an application needs to +interact with an IBM Cloud service that has been deployed to a non-IBM Cloud environment (e.g. AWS). +It accepts a user-supplied apikey and invokes the Multi-Cloud Saas Platform token service's +`POST /api/2.0/{scopeCollectionType}/{scopeId}/apikeys/token` operation to obtain a suitable MCSP access token (a bearer token) +for the specified apikey. +The authenticator will also obtain a new bearer token when the current token expires. +The bearer token is then added to each outbound request in the `Authorization` header in the +form: +``` + Authorization: Bearer +``` + +### Properties + +- apikey: (required) The apikey to be used to obtain an MCSP access token. + +- url: (required) The URL representing the MCSP token service endpoint's base URL string. Do not include the +operation path (e.g. `/api/2.0/{scopeCollectionType}/{scopeId}/apikeys/token`) as part of this property's value. + +- scope_collection_type: (required) The scope collection type of item(s). +The valid values are: `accounts`, `subscriptions`, `services`. + +- scope_id: (required) The scope identifier of item(s). + +- include_builtin_actions: (optional) A flag to include builtin actions in the `actions` claim in the MCSP token (default: false). + +- include_custom_actions: (optional) A flag to include custom actions in the `actions` claim in the MCSP token (default: false). + +- include_roles: (optional) A flag to include the `roles` claim in the MCSP token (default: true). + +- prefix_roles: (optional) A flag to add a prefix with the scope level where +the role is defined in the `roles` claim (default: false). + +- caller_ext_claim: (optional) A map containing keys and values to be injected into the returned access token +as the `callerExt` claim. The keys used in this map must be enabled in the apikey by setting the +`callerExtClaimNames` property when the apikey is created. +This property is typically only used in scenarios involving an apikey with identityType `SERVICEID`. + +- disable_ssl_verification: (optional) A flag that indicates whether verification of the server's SSL +certificate should be disabled or not. The default value is `false`. + +- headers: (optional) A set of key/value pairs that will be sent as HTTP headers in requests +made to the MCSP token service. + +### Usage Notes +- When constructing an MCSPV2Authenticator instance, the apikey, url, scope_collection_type, and scope_id properties are required. + +- If you specify the caller_ext_claim map, the keys used in the map must have been previously enabled in the apikey +by setting the `callerExtClaimNames` property when you created the apikey. +The entries contained in this map will appear in the `callerExt` field (claim) of the returned access token. + +- The authenticator will invoke the token server's `POST /api/2.0/{scopeCollectionType}/{scopeId}/apikeys/token` operation to +exchange the apikey for an MCSP access token (the bearer token). + +### Programming example +```python +from ibm_cloud_sdk_core.authenticators import MCSPV2Authenticator +from .example_service_v1 import * + +# Create the authenticator. +authenticator = MCSPV2Authenticator( + apikey='myapikey', + url='https://example.mcspv2.token-exchange.com', + scope_collection_type='accounts', + scope_id='20250519-2128-3755-60b3-103e01c509e8', + include_builtin_actions=True, + caller_ext_claim={'productID': 'prod-123'}, + ) + +# Construct the service instance. +service = ExampleServiceV1(authenticator=authenticator) + +# 'service' can now be used to invoke operations. +``` + +### Configuration example +External configuration: +``` +export EXAMPLE_SERVICE_AUTH_TYPE=mcspv2 +export EXAMPLE_SERVICE_APIKEY=myapikey +export EXAMPLE_SERVICE_AUTH_URL=https://example.mcspv2.token-exchange.com +export EXAMPLE_SERVICE_SCOPE_COLLECTION_TYPE=accounts +export EXAMPLE_SERVICE_SCOPE_ID=20250519-2128-3755-60b3-103e01c509e8 +export EXAMPLE_SERVICE_INCLUDE_BUILTIN_ACTIONS=true +export EXAMPLE_SERVICE_CALLER_EXT_CLAIM={"productID":"prod-123"} +``` +Application code: +```python +from .example_service_v1 import * + +# Construct the service instance. +service = ExampleServiceV1.new_instance(service_name='example_service') + +# 'service' can now be used to invoke operations. +``` + + ## No Auth Authentication The `NoAuthAuthenticator` is a placeholder authenticator which performs no actual authentication function. It can be used in situations where authentication needs to be bypassed, perhaps while developing diff --git a/ibm_cloud_sdk_core/__init__.py b/ibm_cloud_sdk_core/__init__.py index b6d36ec7..17fc072e 100644 --- a/ibm_cloud_sdk_core/__init__.py +++ b/ibm_cloud_sdk_core/__init__.py @@ -46,6 +46,7 @@ from .token_managers.container_token_manager import ContainerTokenManager from .token_managers.vpc_instance_token_manager import VPCInstanceTokenManager from .token_managers.mcsp_token_manager import MCSPTokenManager +from .token_managers.mcspv2_token_manager import MCSPV2TokenManager from .api_exception import ApiException from .utils import datetime_to_string, string_to_datetime, read_external_sources from .utils import datetime_to_string_list, string_to_datetime_list diff --git a/ibm_cloud_sdk_core/authenticators/__init__.py b/ibm_cloud_sdk_core/authenticators/__init__.py index 9ced229e..5b265595 100644 --- a/ibm_cloud_sdk_core/authenticators/__init__.py +++ b/ibm_cloud_sdk_core/authenticators/__init__.py @@ -28,9 +28,14 @@ Authenticator: Abstract Base Class. Implement this interface to provide custom authentication schemes to services. BasicAuthenticator: Authenticator for passing supplied basic authentication information to service endpoint. BearerTokenAuthenticator: Authenticator for passing supplied bearer token to service endpoint. + ContainerAuthenticator: Authenticator for use in a container environment. CloudPakForDataAuthenticator: Authenticator for passing CP4D authentication information to service endpoint. IAMAuthenticator: Authenticator for passing IAM authentication information to service endpoint. + IAMAssumeAuthenticator: Authenticator for the "assume" grant type. + VPCInstanceAuthenticator: Authenticator for use within a VPC instance. NoAuthAuthenticator: Performs no authentication. Useful for testing purposes. + MCSPAuthenticator: Authenticator that supports the MCSP v1 token exchange. + MCSPV2Authenticator: Authenticator that supports the MCSP v2 token exchange. """ from .authenticator import Authenticator @@ -43,3 +48,4 @@ from .vpc_instance_authenticator import VPCInstanceAuthenticator from .no_auth_authenticator import NoAuthAuthenticator from .mcsp_authenticator import MCSPAuthenticator +from .mcspv2_authenticator import MCSPV2Authenticator diff --git a/ibm_cloud_sdk_core/authenticators/authenticator.py b/ibm_cloud_sdk_core/authenticators/authenticator.py index bacdad3d..e5ee6782 100644 --- a/ibm_cloud_sdk_core/authenticators/authenticator.py +++ b/ibm_cloud_sdk_core/authenticators/authenticator.py @@ -1,6 +1,6 @@ # coding: utf-8 -# Copyright 2019, 2023 IBM All Rights Reserved. +# Copyright 2019, 2025 IBM All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ class Authenticator(ABC): AUTHTYPE_VPC = 'vpc' AUTHTYPE_NOAUTH = 'noAuth' AUTHTYPE_MCSP = 'mcsp' + AUTHTYPE_MCSPV2 = 'mcspv2' AUTHTYPE_UNKNOWN = 'unknown' @abstractmethod diff --git a/ibm_cloud_sdk_core/authenticators/mcspv2_authenticator.py b/ibm_cloud_sdk_core/authenticators/mcspv2_authenticator.py new file mode 100644 index 00000000..d8d9bdd9 --- /dev/null +++ b/ibm_cloud_sdk_core/authenticators/mcspv2_authenticator.py @@ -0,0 +1,270 @@ +# coding: utf-8 + +# Copyright 2025. IBM All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, Optional + +from requests import Request + +from ibm_cloud_sdk_core.logger import get_logger +from .authenticator import Authenticator +from ..token_managers.mcspv2_token_manager import MCSPV2TokenManager + +logger = get_logger() + + +class MCSPV2Authenticator(Authenticator): + """The MCSPV2Authenticator invokes the MCSP v2 token-exchange operation + (POST /api/2.0/{scopeCollectionType}/{scopeId}/apikeys/token) to obtain an access token + for an apikey, and adds the access token to requests via an Authorization header + of the form: "Authorization: Bearer ". + + Keyword Args: + apikey: The apikey used to obtain an access token [required]. + url: The base endpoint URL for the MCSP token service [required]. + scope_collection_type: The scope collection type of item(s) [required]. + Valid values are: "accounts", "subscriptions", "services". + scope_id: The scope identifier of item(s) [required]. + include_builtin_actions: A flag to include builtin actions in the "actions" claim in the + MCSP access token (default: False). + include_custom_actions: A flag to include custom actions in the "actions" claim in the + MCSP access token (default: False). + include_roles: A flag to include the "roles" claim in the MCSP access token (default: True). + prefix_roles: A flag to add a prefix with the scope level where the role is defined + in the "roles" claim (default: False). + caller_ext_claim: A map (dictionary) containing keys and values to be injected into + the access token as the "callerExt" claim (default: None). + The keys used in this map must be enabled in the apikey by setting the + "callerExtClaimNames" property when the apikey is created. + This property is typically only used in scenarios involving an apikey with identityType `SERVICEID`. + disable_ssl_verification: A flag that indicates whether verification of the server's SSL + certificate should be disabled or not (default: False). + headers: Default headers to be sent with every MCSP token request (default: None). + proxies: Dictionary for mapping request protocol to proxy URL (default: None). + proxies.http (optional): The proxy endpoint to use for HTTP requests. + proxies.https (optional): The proxy endpoint to use for HTTPS requests. + + Attributes: + token_manager (MCSPTokenManager): Retrieves and manages MCSP tokens from the endpoint specified by the url. + + Raises: + TypeError: The `disable_ssl_verification` is not a bool. + ValueError: An error occurred while validating the configuration. + """ + + def __init__( + self, + *, + apikey: str, + url: str, + scope_collection_type: str, + scope_id: str, + include_builtin_actions: bool = False, + include_custom_actions: bool = False, + include_roles: bool = True, + prefix_roles: bool = False, + caller_ext_claim: Optional[Dict[str, str]] = None, + disable_ssl_verification: bool = False, + headers: Optional[Dict[str, str]] = None, + proxies: Optional[Dict[str, str]] = None, + ) -> None: + + self.token_manager = MCSPV2TokenManager( + apikey=apikey, + url=url, + scope_collection_type=scope_collection_type, + scope_id=scope_id, + include_builtin_actions=include_builtin_actions, + include_custom_actions=include_custom_actions, + include_roles=include_roles, + prefix_roles=prefix_roles, + caller_ext_claim=caller_ext_claim, + disable_ssl_verification=disable_ssl_verification, + headers=headers, + proxies=proxies, + ) + self.validate() + + def authentication_type(self) -> str: + """Returns this authenticator's type ('mcsp').""" + return Authenticator.AUTHTYPE_MCSPV2 + + def validate(self) -> None: + """Validate the configuration. + + Raises: + ValueError: The <...> property shouldn't be None. + """ + if not isinstance(self.token_manager.apikey, str): + raise TypeError('"apikey" must be a string') + if not isinstance(self.token_manager.url, str): + raise TypeError('"url" must be a string') + if not isinstance(self.token_manager.scope_collection_type, str): + raise TypeError('"scope_collection_type" must be a string') + if not isinstance(self.token_manager.scope_id, str): + raise TypeError('"scope_id" must be a string') + if not isinstance(self.token_manager.include_builtin_actions, bool): + raise TypeError('"include_builtin_actions" must be a bool') + if not isinstance(self.token_manager.include_custom_actions, bool): + raise TypeError('"include_custom_actions" must be a bool') + if not isinstance(self.token_manager.include_roles, bool): + raise TypeError('"include_roles" must be a bool') + if not isinstance(self.token_manager.prefix_roles, bool): + raise TypeError('"prefix_roles" must be a bool') + if not isinstance(self.token_manager.caller_ext_claim, (dict, type(None))): + raise TypeError('"caller_ext_claim" must be a dictionary or None') + if not isinstance(self.token_manager.disable_ssl_verification, bool): + raise TypeError('"disable_ssl_verification" must be a bool') + if not isinstance(self.token_manager.headers, (dict, type(None))): + raise TypeError('"headers" must be a dictionary or None') + if not isinstance(self.token_manager.proxies, (dict, type(None))): + raise TypeError('"proxies" must be a dictionary or None') + + def authenticate(self, req: Request) -> None: + """Adds MCSP authentication information to the request. + + The MCSP bearer token will be added to the request's headers in the form: + Authorization: Bearer + + Args: + req: The request to add MCSP authentication information to. Must contain a key to a dictionary + called headers. + """ + headers = req.get('headers') + bearer_token = self.token_manager.get_token() + headers['Authorization'] = 'Bearer {0}'.format(bearer_token) + logger.debug('Authenticated outbound request (type=%s)', self.authentication_type()) + + def set_scope_collection_type(self, scope_collection_type: str) -> None: + """Set the scope_collection_type value. + + Args: + scope_collection_type: the value to set. + + Raises: + TypeError: "scope_collection_type" must be a string. + """ + if not isinstance(scope_collection_type, str): + raise TypeError('"scope_collection_type" must be a string') + self.token_manager.scope_collection_type = scope_collection_type + + def set_scope_id(self, scope_id: str) -> None: + """Set the scope_id value. + + Args: + scope_id: the value to set. + + Raises: + TypeError: "scope_id" must be a string. + """ + if not isinstance(scope_id, str): + raise TypeError('"scope_id" must be a string') + self.token_manager.scope_id = scope_id + + def set_include_builtin_actions(self, include_builtin_actions: bool = False) -> None: + """Set the include_builtin_actions flag. + + Args: + include_builtin_actions: The value to set (default: False). + + Raises: + TypeError: "include_builtin_actions" must be a bool. + """ + if not isinstance(include_builtin_actions, bool): + raise TypeError('"include_builtin_actions" must be a bool') + self.token_manager.include_builtin_actions = include_builtin_actions + + def set_include_custom_actions(self, include_custom_actions: bool = False) -> None: + """Set the include_custom_actions flag. + + Args: + include_custom_actions: The value to set (default: False). + + Raises: + TypeError: "include_custom_actions" must be a bool. + """ + if not isinstance(include_custom_actions, bool): + raise TypeError('"include_custom_actions" must be a bool') + self.token_manager.include_custom_actions = include_custom_actions + + def set_include_roles(self, include_roles: bool = True) -> None: + """Set the include_roles flag. + + Args: + include_roles: The value to set (default: True). + + Raises: + TypeError: "include_roles" must be a bool. + """ + if not isinstance(include_roles, bool): + raise TypeError('"include_roles" must be a bool') + self.token_manager.include_roles = include_roles + + def set_prefix_roles(self, prefix_roles: bool = False) -> None: + """Set the prefix_roles flag. + + Args: + prefix_roles: The value to set (default: False). + + Raises: + TypeError: "prefix_roles" must be a bool. + """ + if not isinstance(prefix_roles, bool): + raise TypeError('"prefix_roles" must be a bool') + self.token_manager.prefix_roles = prefix_roles + + def set_caller_ext_claim(self, caller_ext_claim: Optional[Dict[str, str]] = None) -> None: + """Set the caller_ext_claim value. + + Args: + caller_ext_claim: The value to set (default: False). + + Raises: + TypeError: "caller_ext_claim" must be a dictionary or None. + """ + if not isinstance(caller_ext_claim, (dict, type(None))): + raise TypeError('"caller_ext_claim" must be a dictionary or None') + self.token_manager.caller_ext_claim = caller_ext_claim + + def set_disable_ssl_verification(self, disable_ssl_verification: bool = False) -> None: + """Set the disable_ssl_verification flag. + + Args: + disable_ssl_verification: The value to set (default: False). + + Raises: + TypeError: "disable_ssl_verification" must be a bool. + """ + if not isinstance(disable_ssl_verification, bool): + raise TypeError('"disable_ssl_verification" must be a bool') + self.token_manager.disable_ssl_verification = disable_ssl_verification + + def set_headers(self, headers: Optional[Dict[str, str]] = None) -> None: + """Set the headers to be sent with each MCSP token-exchange request. + + Args: + headers: The headers to be sent with each MCSP token request (default: None). + """ + self.token_manager.set_headers(headers) + + def set_proxies(self, proxies: Optional[Dict[str, str]] = None) -> None: + """Sets the proxies the token manager will use to communicate with MCSP on behalf of the host. + + Args: + proxies: Dictionary for mapping request protocol to proxy URL (default: None). + proxies.http (optional): The proxy endpoint to use for HTTP requests. + proxies.https (optional): The proxy endpoint to use for HTTPS requests. + """ + self.token_manager.set_proxies(proxies) diff --git a/ibm_cloud_sdk_core/get_authenticator.py b/ibm_cloud_sdk_core/get_authenticator.py index abc5f5ed..3fa03c77 100644 --- a/ibm_cloud_sdk_core/get_authenticator.py +++ b/ibm_cloud_sdk_core/get_authenticator.py @@ -1,6 +1,6 @@ # coding: utf-8 -# Copyright 2019, 2024 IBM All Rights Reserved. +# Copyright 2019, 2025 IBM All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json from .authenticators import ( Authenticator, BasicAuthenticator, @@ -25,8 +26,9 @@ NoAuthAuthenticator, VPCInstanceAuthenticator, MCSPAuthenticator, + MCSPV2Authenticator, ) -from .utils import read_external_sources +from .utils import read_external_sources, string_to_bool from .logger import get_logger logger = get_logger() @@ -57,6 +59,7 @@ def get_authenticator_from_environment(service_name: str) -> Authenticator: # pylint: disable=too-many-branches +# pylint: disable=too-many-statements def __construct_authenticator(config: dict) -> Authenticator: # Determine the authentication type if not specified explicitly. if config.get('AUTH_TYPE'): @@ -86,7 +89,7 @@ def __construct_authenticator(config: dict) -> Authenticator: url=config.get('AUTH_URL'), client_id=config.get('CLIENT_ID'), client_secret=config.get('CLIENT_SECRET'), - disable_ssl_verification=config.get('AUTH_DISABLE_SSL', 'false').lower() == 'true', + disable_ssl_verification=string_to_bool(config.get('AUTH_DISABLE_SSL', 'false')), scope=config.get('SCOPE'), ) elif auth_type == Authenticator.AUTHTYPE_CP4D.lower(): @@ -95,7 +98,7 @@ def __construct_authenticator(config: dict) -> Authenticator: password=config.get('PASSWORD'), url=config.get('AUTH_URL'), apikey=config.get('APIKEY'), - disable_ssl_verification=config.get('AUTH_DISABLE_SSL', 'false').lower() == 'true', + disable_ssl_verification=string_to_bool(config.get('AUTH_DISABLE_SSL', 'false')), ) elif auth_type == Authenticator.AUTHTYPE_IAM.lower() and config.get('APIKEY'): authenticator = IAMAuthenticator( @@ -103,7 +106,7 @@ def __construct_authenticator(config: dict) -> Authenticator: url=config.get('AUTH_URL'), client_id=config.get('CLIENT_ID'), client_secret=config.get('CLIENT_SECRET'), - disable_ssl_verification=config.get('AUTH_DISABLE_SSL', 'false').lower() == 'true', + disable_ssl_verification=string_to_bool(config.get('AUTH_DISABLE_SSL', 'false')), scope=config.get('SCOPE'), ) elif auth_type == Authenticator.AUTHTYPE_IAM_ASSUME.lower(): @@ -116,7 +119,7 @@ def __construct_authenticator(config: dict) -> Authenticator: url=config.get('AUTH_URL'), client_id=config.get('CLIENT_ID'), client_secret=config.get('CLIENT_SECRET'), - disable_ssl_verification=config.get('AUTH_DISABLE_SSL', 'false').lower() == 'true', + disable_ssl_verification=string_to_bool(config.get('AUTH_DISABLE_SSL', 'false')), scope=config.get('SCOPE'), ) elif auth_type == Authenticator.AUTHTYPE_VPC.lower(): @@ -130,6 +133,48 @@ def __construct_authenticator(config: dict) -> Authenticator: apikey=config.get('APIKEY'), url=config.get('AUTH_URL'), ) + elif auth_type == Authenticator.AUTHTYPE_MCSPV2.lower(): + # Required arguments. + apikey = config.get('APIKEY') + url = config.get('AUTH_URL') + scope_collection_type = config.get('SCOPE_COLLECTION_TYPE') + scope_id = config.get('SCOPE_ID') + + # Optional arguments. + optional_args = {} + str_value = config.get("INCLUDE_BUILTIN_ACTIONS") + if str_value is not None: + optional_args['include_builtin_actions'] = string_to_bool(str_value) + + str_value = config.get("INCLUDE_CUSTOM_ACTIONS") + if str_value is not None: + optional_args['include_custom_actions'] = string_to_bool(str_value) + + str_value = config.get("INCLUDE_ROLES") + if str_value is not None: + optional_args['include_roles'] = string_to_bool(str_value) + + str_value = config.get("PREFIX_ROLES") + if str_value is not None: + optional_args['prefix_roles'] = string_to_bool(str_value) + + str_value = config.get("CALLER_EXT_CLAIM") + if str_value is not None: + try: + optional_args['caller_ext_claim'] = json.loads(str_value) + except Exception as caused_by: + msg = 'An error occurred while unmarshalling the CALLER_EXT_CLAIM configuration property: {0}'.format( + str_value + ) + raise ValueError(msg) from caused_by + + str_value = config.get("AUTH_DISABLE_SSL") + if str_value is not None: + optional_args['disable_ssl_verification'] = string_to_bool(str_value) + + authenticator = MCSPV2Authenticator( + apikey=apikey, url=url, scope_collection_type=scope_collection_type, scope_id=scope_id, **optional_args + ) elif auth_type == Authenticator.AUTHTYPE_NOAUTH.lower(): authenticator = NoAuthAuthenticator() diff --git a/ibm_cloud_sdk_core/token_managers/mcspv2_token_manager.py b/ibm_cloud_sdk_core/token_managers/mcspv2_token_manager.py new file mode 100644 index 00000000..2904cbfb --- /dev/null +++ b/ibm_cloud_sdk_core/token_managers/mcspv2_token_manager.py @@ -0,0 +1,185 @@ +# coding: utf-8 + +# Copyright 2025. IBM All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from typing import Dict, List, Optional +import requests + +from ibm_cloud_sdk_core.logger import get_logger +from ..private_helpers import _build_user_agent +from .jwt_token_manager import JWTTokenManager + +logger = get_logger() + + +class MCSPV2TokenManager(JWTTokenManager): + """The MCSPV2TokenManager invokes the MCSP v2 token-exchange operation + (POST /api/2.0/{scopeCollectionType}/{scopeId}/apikeys/token) to obtain an access token + for an apikey. When the access token expires, a new access token is obtained from the token server. + + Keyword Arguments: + apikey: The apikey for authentication [required]. + url: The endpoint for JWT token requests [required]. + scope_collection_type: The scope collection type of item(s) [required]. + Valid values are: "accounts", "subscriptions", "services". + scope_id: The scope identifier of item(s) [required]. + include_builtin_actions: A flag to include builtin actions in the "actions" claim in the + MCSP access token (default: False). + include_custom_actions: A flag to include custom actions in the "actions" claim in the + MCSP access token (default: False). + include_roles: A flag to include the "roles" claim in the MCSP access token (default: True). + prefix_roles: A flag to add a prefix with the scope level where the role is defined + in the "roles" claim (default: False). + caller_ext_claim: A dictionary (map) containing keys and values to be injected into + the access token as the "callerExt" claim (default: None). + The keys used in this map must be enabled in the apikey by setting the + "callerExtClaimNames" property when the apikey is created. + disable_ssl_verification: Disable ssl verification. Defaults to False. + headers: Headers to be sent with every service token request. Defaults to None. + proxies: Proxies to use for making request. Defaults to None. + proxies.http (optional): The proxy endpoint to use for HTTP requests. + proxies.https (optional): The proxy endpoint to use for HTTPS requests. + """ + + # pylint: disable=too-many-instance-attributes + + # The name of the response body property that contains the access token. + TOKEN_NAME = 'token' + + # The path associated with the token-exchange operation to be invoked. + OPERATION_PATH = '/api/2.0/{scopeCollectionType}/{scopeId}/apikeys/token' + + # These path parameter names must be kept in sync with the operation path above. + _path_param_names = ['scopeCollectionType', 'scopeId'] + + def __init__( + self, + *, + apikey: str, + url: str, + scope_collection_type: str, + scope_id: str, + include_builtin_actions: bool = False, + include_custom_actions: bool = False, + include_roles: bool = True, + prefix_roles: bool = False, + caller_ext_claim: Optional[Dict[str, str]] = None, + disable_ssl_verification: bool = False, + headers: Optional[Dict[str, str]] = None, + proxies: Optional[Dict[str, str]] = None, + ) -> None: + self.apikey = apikey + self.scope_collection_type = scope_collection_type + self.scope_id = scope_id + self.include_builtin_actions = include_builtin_actions + self.include_custom_actions = include_custom_actions + self.include_roles = include_roles + self.prefix_roles = prefix_roles + self.caller_ext_claim = caller_ext_claim + + self.set_headers(headers) + self.set_proxies(proxies) + + super().__init__(url, disable_ssl_verification=disable_ssl_verification, token_name=self.TOKEN_NAME) + self._set_user_agent(_build_user_agent('mcspv2-authenticator')) + + def set_headers(self, headers: Optional[Dict[str, str]] = None) -> None: + """Headers to be sent with every MCSP token request. + + Args: + headers: The headers to be sent with every MCSP token request (default: None). + """ + if isinstance(headers, (dict, type(None))): + self.headers = headers + else: + raise TypeError('"headers" must be a dictionary or None') + + def set_proxies(self, proxies: Optional[Dict[str, str]] = None) -> None: + """Sets the proxies the token manager will use to communicate with MCSP on behalf of the host. + + Args: + proxies: Proxies to use for making request (default: None). + proxies.http (optional): The proxy endpoint to use for HTTP requests. + proxies.https (optional): The proxy endpoint to use for HTTPS requests. + """ + if isinstance(proxies, (dict, type(None))): + self.proxies = proxies + else: + raise TypeError('"proxies" must be a dictionary or None') + + def request_token(self) -> dict: + """Invokes the "POST /api/2.0/{scopeCollectionType}/{scopeId}/apikeys/token" operation + to obtain an access token.""" + + # These headers take priority over user-supplied headers. + required_headers = { + 'User-Agent': self.user_agent, + 'Content-Type': 'application/json', + 'Accept': 'application/json', + } + + # Set up the request headers. + request_headers = {} + if self.headers is not None and isinstance(self.headers, dict): + request_headers.update(self.headers) + request_headers.update(required_headers) + + # Compute the request URL. + path_param_values = self._encode_path_vars(self.scope_collection_type, self.scope_id) + path_params = dict(zip(self._path_param_names, path_param_values)) + request_url = self.url + self.OPERATION_PATH.format(**path_params) + + # Create the request body (apikey, callerExtClaim properties). + request_body = {} + request_body['apikey'] = self.apikey + if self.caller_ext_claim is not None and isinstance(self.caller_ext_claim, dict): + request_body['callerExtClaim'] = self.caller_ext_claim + + # Set up the query params. + query_params = { + 'includeBuiltinActions': self.bool_to_string(self.include_builtin_actions), + 'includeCustomActions': self.bool_to_string(self.include_custom_actions), + 'includeRoles': self.bool_to_string(self.include_roles), + 'prefixRolesWithDefinitionScope': self.bool_to_string(self.prefix_roles), + } + + logger.debug('Invoking MCSP v2 token service operation: %s', request_url) + + response = self._request( + method='POST', + headers=request_headers, + url=request_url, + params=query_params, + data=json.dumps(request_body), + proxies=self.proxies, + ) + logger.debug('Returned from MCSP v2 token service operation') + return response + + def _encode_path_vars(self, *args: str) -> List[str]: + """Encode path variables to be substituted into a URL path. + + Arguments: + args: A list of strings to be URL path encoded + + Returns: + A list of encoded strings that are safe to substitute into a URL path. + """ + return (requests.utils.quote(x, safe='') for x in args) + + def bool_to_string(self, value: bool) -> str: + """Convert a boolean value to string.""" + return 'true' if value else 'false' diff --git a/ibm_cloud_sdk_core/utils.py b/ibm_cloud_sdk_core/utils.py index 6a896534..427de6ec 100644 --- a/ibm_cloud_sdk_core/utils.py +++ b/ibm_cloud_sdk_core/utils.py @@ -1,6 +1,6 @@ # coding: utf-8 -# Copyright 2019, 2024 IBM All Rights Reserved. +# Copyright 2019, 2025 IBM All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -273,6 +273,18 @@ def string_to_date(string: str) -> datetime.date: return date_parser.parse(string).date() +def string_to_bool(string: str) -> bool: + """Converts 'string' to a bool value. + Args: + string: the value to convert. This should be some form of "true" or "false" + (e.g. "True", "TrUE", "False", "FaLsE", etc.) + + Returns: + the boolean value + """ + return string.strip().lower() == 'true' + + def get_query_param(url_str: str, param: str) -> str: """Return a query parameter value from url_str diff --git a/resources/ibm-credentials-mcspv2.env b/resources/ibm-credentials-mcspv2.env new file mode 100644 index 00000000..d8b350ce --- /dev/null +++ b/resources/ibm-credentials-mcspv2.env @@ -0,0 +1,27 @@ +# MCSP v2 with only required properties +SERVICE1_AUTH_TYPE=mcspv2 +SERVICE1_APIKEY=my-api-key +SERVICE1_AUTH_URL=https://mcspv2.ibm.com +SERVICE1_SCOPE_COLLECTION_TYPE=accounts +SERVICE1_SCOPE_ID=global_account + +# MCSP v2 with all properties +SERVICE2_AUTH_TYPE=mcspv2 +SERVICE2_APIKEY=my-api-key +SERVICE2_AUTH_URL=https://mcspv2.ibm.com +SERVICE2_SCOPE_COLLECTION_TYPE=accounts +SERVICE2_SCOPE_ID=global_account +SERVICE2_INCLUDE_BUILTIN_ACTIONS=true +SERVICE2_INCLUDE_CUSTOM_ACTIONS=true +SERVICE2_INCLUDE_ROLES=false +SERVICE2_PREFIX_ROLES=true +SERVICE2_CALLER_EXT_CLAIM={"productID": "prod-123"} +SERVICE2_AUTH_DISABLE_SSL=true + +# MCSP v with error config +ERROR1_AUTH_TYPE=mcspv2 +ERROR1_APIKEY=my-api-key +ERROR1_AUTH_URL=https://mcspv2.ibm.com +ERROR1_SCOPE_COLLECTION_TYPE=accounts +ERROR1_SCOPE_ID=global_account +ERROR1_CALLER_EXT_CLAIM={not json} diff --git a/test/test_get_authenticator.py b/test/test_get_authenticator.py new file mode 100644 index 00000000..e15f5100 --- /dev/null +++ b/test/test_get_authenticator.py @@ -0,0 +1,414 @@ +# pylint: disable=missing-docstring +# coding: utf-8 + +# Copyright 2019, 2024 IBM All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import pytest + +from ibm_cloud_sdk_core import get_authenticator_from_environment +from ibm_cloud_sdk_core.authenticators import Authenticator, BasicAuthenticator, IAMAuthenticator +from .utils.logger_utils import setup_test_logger + +setup_test_logger(logging.ERROR) + + +# pylint: disable=too-many-statements +def test_get_authenticator_from_credential_file(): + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-iam.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + authenticator = get_authenticator_from_environment('ibm watson') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_IAM + assert authenticator.token_manager.apikey == '5678efgh' + assert authenticator.token_manager.url == 'https://iam.cloud.ibm.com' + assert authenticator.token_manager.client_id is None + assert authenticator.token_manager.client_secret is None + assert authenticator.token_manager.disable_ssl_verification is False + assert authenticator.token_manager.scope is None + del os.environ['IBM_CREDENTIALS_FILE'] + + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-iam-assume.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + authenticator = get_authenticator_from_environment('service 1') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_IAM_ASSUME + assert authenticator.token_manager.iam_delegate.apikey == 'my-api-key' + assert authenticator.token_manager.iam_profile_id == 'iam-profile-1' + assert authenticator.token_manager.url == 'https://iam.cloud.ibm.com' + assert authenticator.token_manager.client_id is None + assert authenticator.token_manager.client_secret is None + assert authenticator.token_manager.disable_ssl_verification is False + assert authenticator.token_manager.scope is None + del os.environ['IBM_CREDENTIALS_FILE'] + + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-basic.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + authenticator = get_authenticator_from_environment('watson') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_BASIC + assert authenticator.username == 'my_username' + del os.environ['IBM_CREDENTIALS_FILE'] + + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-container.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + authenticator = get_authenticator_from_environment('service 1') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_CONTAINER + assert authenticator.token_manager.cr_token_filename == 'crtoken.txt' + assert authenticator.token_manager.iam_profile_name == 'iam-user-123' + assert authenticator.token_manager.iam_profile_id == 'iam-id-123' + assert authenticator.token_manager.url == 'https://iamhost/iam/api' + assert authenticator.token_manager.scope == 'scope1' + assert authenticator.token_manager.client_id == 'iam-client-123' + assert authenticator.token_manager.client_secret == 'iam-secret-123' + assert authenticator.token_manager.disable_ssl_verification is True + + authenticator = get_authenticator_from_environment('service 2') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_CONTAINER + assert authenticator.token_manager.cr_token_filename is None + assert authenticator.token_manager.iam_profile_name == 'iam-user-123' + assert authenticator.token_manager.iam_profile_id is None + assert authenticator.token_manager.url == 'https://iam.cloud.ibm.com' + assert authenticator.token_manager.scope is None + assert authenticator.token_manager.client_id is None + assert authenticator.token_manager.client_secret is None + assert authenticator.token_manager.disable_ssl_verification is False + del os.environ['IBM_CREDENTIALS_FILE'] + + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-cp4d.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + authenticator = get_authenticator_from_environment('watson') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_CP4D + assert authenticator.token_manager.username == 'my_username' + assert authenticator.token_manager.password == 'my_password' + assert authenticator.token_manager.url == 'https://my_url/v1/authorize' + assert authenticator.token_manager.apikey is None + assert authenticator.token_manager.disable_ssl_verification is False + del os.environ['IBM_CREDENTIALS_FILE'] + + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-no-auth.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + authenticator = get_authenticator_from_environment('watson') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_NOAUTH + del os.environ['IBM_CREDENTIALS_FILE'] + + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-bearer.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + authenticator = get_authenticator_from_environment('watson') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_BEARERTOKEN + assert authenticator.bearer_token is not None + del os.environ['IBM_CREDENTIALS_FILE'] + + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + authenticator = get_authenticator_from_environment('service_1') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_IAM + assert authenticator.token_manager.apikey == 'V4HXmoUtMjohnsnow=KotN' + assert authenticator.token_manager.client_id == 'somefake========id' + assert authenticator.token_manager.client_secret == '==my-client-secret==' + assert authenticator.token_manager.url == 'https://iamhost/iam/api=' + assert authenticator.token_manager.scope is None + del os.environ['IBM_CREDENTIALS_FILE'] + + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-vpc.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + authenticator = get_authenticator_from_environment('service1') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_VPC + assert authenticator.token_manager.iam_profile_crn is None + assert authenticator.token_manager.iam_profile_id is None + assert authenticator.token_manager.url == 'http://169.254.169.254' + del os.environ['IBM_CREDENTIALS_FILE'] + + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-vpc.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + authenticator = get_authenticator_from_environment('service2') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_VPC + assert authenticator.token_manager.iam_profile_crn == 'crn:iam-profile1' + assert authenticator.token_manager.iam_profile_id is None + assert authenticator.token_manager.url == 'http://vpc.imds.com/api' + del os.environ['IBM_CREDENTIALS_FILE'] + + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-vpc.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + authenticator = get_authenticator_from_environment('service3') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_VPC + assert authenticator.token_manager.iam_profile_crn is None + assert authenticator.token_manager.iam_profile_id == 'iam-profile1-id' + assert authenticator.token_manager.url == 'http://169.254.169.254' + del os.environ['IBM_CREDENTIALS_FILE'] + + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-mcsp.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + authenticator = get_authenticator_from_environment('service1') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_MCSP + assert authenticator.token_manager.url == 'https://mcsp.ibm.com' + assert authenticator.token_manager.apikey == 'my-api-key' + del os.environ['IBM_CREDENTIALS_FILE'] + + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-mcspv2.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + authenticator = get_authenticator_from_environment('service1') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_MCSPV2 + assert authenticator.token_manager.url == 'https://mcspv2.ibm.com' + assert authenticator.token_manager.apikey == 'my-api-key' + assert authenticator.token_manager.scope_collection_type == 'accounts' + assert authenticator.token_manager.scope_id == 'global_account' + assert authenticator.token_manager.include_builtin_actions is False + assert authenticator.token_manager.include_custom_actions is False + assert authenticator.token_manager.include_roles is True + assert authenticator.token_manager.prefix_roles is False + assert authenticator.token_manager.caller_ext_claim is None + assert authenticator.token_manager.disable_ssl_verification is False + assert authenticator.token_manager.headers is None + assert authenticator.token_manager.proxies is None + del os.environ['IBM_CREDENTIALS_FILE'] + + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-mcspv2.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + authenticator = get_authenticator_from_environment('service2') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_MCSPV2 + assert authenticator.token_manager.url == 'https://mcspv2.ibm.com' + assert authenticator.token_manager.apikey == 'my-api-key' + assert authenticator.token_manager.scope_collection_type == 'accounts' + assert authenticator.token_manager.scope_id == 'global_account' + assert authenticator.token_manager.include_builtin_actions is True + assert authenticator.token_manager.include_custom_actions is True + assert authenticator.token_manager.include_roles is False + assert authenticator.token_manager.prefix_roles is True + assert authenticator.token_manager.caller_ext_claim == {"productID": "prod-123"} + assert authenticator.token_manager.disable_ssl_verification is True + assert authenticator.token_manager.headers is None + assert authenticator.token_manager.proxies is None + del os.environ['IBM_CREDENTIALS_FILE'] + + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-mcspv2.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + with pytest.raises(Exception) as err: + authenticator = get_authenticator_from_environment('error1') + assert ( + str(err.value) + == 'An error occurred while unmarshalling the CALLER_EXT_CLAIM configuration property: {not json}' + ) + del os.environ['IBM_CREDENTIALS_FILE'] + + +def test_get_authenticator_from_credential_file_scope(): + file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials.env') + os.environ['IBM_CREDENTIALS_FILE'] = file_path + authenticator = get_authenticator_from_environment('service_2') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_IAM + assert authenticator.token_manager.apikey == 'V4HXmoUtMjohnsnow=KotN' + assert authenticator.token_manager.client_id == 'somefake========id' + assert authenticator.token_manager.client_secret == '==my-client-secret==' + assert authenticator.token_manager.url == 'https://iamhost/iam/api=' + assert authenticator.token_manager.scope == 'A B C D' + del os.environ['IBM_CREDENTIALS_FILE'] + + +def test_get_authenticator_from_env_variables(): + os.environ['TEST_APIKEY'] = '5678efgh' + authenticator = get_authenticator_from_environment('test') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_IAM + assert authenticator.token_manager.apikey == '5678efgh' + del os.environ['TEST_APIKEY'] + + os.environ['TEST_IAM_PROFILE_ID'] = 'iam-profile-id1' + authenticator = get_authenticator_from_environment('test') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_CONTAINER + assert authenticator.token_manager.iam_profile_id == 'iam-profile-id1' + del os.environ['TEST_IAM_PROFILE_ID'] + + os.environ['SERVICE_1_APIKEY'] = 'V4HXmoUtMjohnsnow=KotN' + authenticator = get_authenticator_from_environment('service_1') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_IAM + assert authenticator.token_manager.apikey == 'V4HXmoUtMjohnsnow=KotN' + del os.environ['SERVICE_1_APIKEY'] + + os.environ['SERVICE_2_APIKEY'] = 'johnsnow' + os.environ['SERVICE_2_SCOPE'] = 'A B C D' + authenticator = get_authenticator_from_environment('service_2') + assert authenticator is not None + assert authenticator.token_manager.apikey == 'johnsnow' + assert authenticator.token_manager.scope == 'A B C D' + del os.environ['SERVICE_2_APIKEY'] + del os.environ['SERVICE_2_SCOPE'] + + os.environ['SERVICE3_AUTH_TYPE'] = 'mCsP' + os.environ['SERVICE3_AUTH_URL'] = 'https://mcsp.ibm.com' + os.environ['SERVICE3_APIKEY'] = 'my-api-key' + authenticator = get_authenticator_from_environment('service3') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_MCSP + assert authenticator.token_manager.apikey == 'my-api-key' + assert authenticator.token_manager.url == 'https://mcsp.ibm.com' + del os.environ['SERVICE3_APIKEY'] + del os.environ['SERVICE3_AUTH_TYPE'] + del os.environ['SERVICE3_AUTH_URL'] + + +def test_vcap_credentials(): + vcap_services = '{"test":[{"credentials":{ \ + "url":"https://gateway.watsonplatform.net/compare-comply/api",\ + "username":"bogus username", \ + "password":"bogus password"}}]}' + + os.environ['VCAP_SERVICES'] = vcap_services + authenticator = get_authenticator_from_environment('test') + assert isinstance(authenticator, BasicAuthenticator) + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_BASIC + assert authenticator.username == 'bogus username' + assert authenticator.password == 'bogus password' + del os.environ['VCAP_SERVICES'] + + vcap_services = '{"test":[{"credentials":{ \ + "url":"https://gateway.watsonplatform.net/compare-comply/api",\ + "apikey":"bogus apikey"}}]}' + + os.environ['VCAP_SERVICES'] = vcap_services + authenticator = get_authenticator_from_environment('test') + assert isinstance(authenticator, IAMAuthenticator) + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_IAM + assert authenticator.token_manager.apikey == 'bogus apikey' + del os.environ['VCAP_SERVICES'] + + vcap_services = '{"test":[{"credentials":{ \ + "url":"https://gateway.watsonplatform.net/compare-comply/api",\ + "iam_apikey":"bogus apikey"}}]}' + + os.environ['VCAP_SERVICES'] = vcap_services + authenticator = get_authenticator_from_environment('test') + assert isinstance(authenticator, IAMAuthenticator) + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_IAM + assert authenticator.token_manager.apikey == 'bogus apikey' + del os.environ['VCAP_SERVICES'] + + vcap_services = '{"test":[{"name": "testname",\ + "credentials":{ \ + "url":"https://gateway.watsonplatform.net/compare-comply/api",\ + "username":"bogus username", \ + "password":"bogus password"}}]}' + + os.environ['VCAP_SERVICES'] = vcap_services + authenticator = get_authenticator_from_environment('testname') + assert isinstance(authenticator, BasicAuthenticator) + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_BASIC + assert authenticator.username == 'bogus username' + assert authenticator.password == 'bogus password' + del os.environ['VCAP_SERVICES'] + + +def test_vcap_credentials_2(): + vcap_services = '{\ + "test":[{"name": "testname",\ + "credentials":{ \ + "url":"https://gateway.watsonplatform.net/compare-comply/api",\ + "username":"bogus username2", \ + "password":"bogus password2"}},\ + {"name": "othertestname",\ + "credentials":{ \ + "url":"https://gateway.watsonplatform.net/compare-comply/api",\ + "username":"bogus username3", \ + "password":"bogus password3"}}],\ + "testname":[{"name": "nottestname",\ + "credentials":{ \ + "url":"https://gateway.watsonplatform.net/compare-comply/api",\ + "username":"bogus username", \ + "password":"bogus password"}}],\ + "equals_sign_test":[{"name": "equals_sign_test",\ + "credentials":{ \ + "iam_apikey": "V4HXmoUtMjohnsnow=KotN",\ + "iam_apikey_description": "Auto generated apikey...",\ + "iam_apikey_name": "auto-generated-apikey-111-222-333",\ + "iam_role_crn": "crn:v1:bluemix:public:iam::::serviceRole:Manager",\ + "iam_serviceid_crn": "crn:v1:staging:public:iam-identity::a/::serviceid:ServiceID-1234",\ + "url": "https://gateway.watsonplatform.net/testService",\ + "auth_url": "https://iamhost/iam/api="}}]}' + + os.environ['VCAP_SERVICES'] = vcap_services + authenticator = get_authenticator_from_environment('testname') + assert isinstance(authenticator, BasicAuthenticator) + assert authenticator.username == 'bogus username2' + assert authenticator.password == 'bogus password2' + + authenticator = get_authenticator_from_environment('equals_sign_test') + assert isinstance(authenticator, IAMAuthenticator) + assert authenticator.token_manager.apikey == 'V4HXmoUtMjohnsnow=KotN' + del os.environ['VCAP_SERVICES'] + + vcap_services = '{"test":[{\ + "credentials":{ \ + "url":"https://gateway.watsonplatform.net/compare-comply/api",\ + "username":"bogus username", \ + "password":"bogus password"}},\ + {"credentials":{ \ + "url":"https://gateway.watsonplatform.net/compare-comply/api",\ + "username":"bogus username2", \ + "password":"bogus password2"}}\ + ]}' + + os.environ['VCAP_SERVICES'] = vcap_services + authenticator = get_authenticator_from_environment('test') + assert isinstance(authenticator, BasicAuthenticator) + assert authenticator.username == 'bogus username' + assert authenticator.password == 'bogus password' + del os.environ['VCAP_SERVICES'] + + vcap_services = '{"first":[],\ + "test":[{"credentials":{ \ + "url":"https://gateway.watsonplatform.net/compare-comply/api",\ + "username":"bogus username", \ + "password":"bogus password"}}],\ + "last":[]}' + + os.environ['VCAP_SERVICES'] = vcap_services + authenticator = get_authenticator_from_environment('test') + assert isinstance(authenticator, BasicAuthenticator) + assert authenticator.username == 'bogus username' + assert authenticator.password == 'bogus password' + del os.environ['VCAP_SERVICES'] + + vcap_services = '{"test":[],\ + "last":[]}' + + os.environ['VCAP_SERVICES'] = vcap_services + authenticator = get_authenticator_from_environment('test') + assert authenticator is None + del os.environ['VCAP_SERVICES'] + + +def test_multi_word_service_name(): + os.environ['PERSONALITY_INSIGHTS_APIKEY'] = '5678efgh' + authenticator = get_authenticator_from_environment('personality-insights') + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_IAM + assert authenticator.token_manager.apikey == '5678efgh' + del os.environ['PERSONALITY_INSIGHTS_APIKEY'] diff --git a/test/test_mcspv2_authenticator.py b/test/test_mcspv2_authenticator.py new file mode 100644 index 00000000..be42f4f5 --- /dev/null +++ b/test/test_mcspv2_authenticator.py @@ -0,0 +1,409 @@ +# pylint: disable=missing-docstring +import logging +import json +import time +import jwt +import pytest +import responses + +from ibm_cloud_sdk_core.authenticators import MCSPV2Authenticator, Authenticator +from ibm_cloud_sdk_core import MCSPV2TokenManager +from .utils.logger_utils import setup_test_logger + +setup_test_logger(logging.ERROR) + +MOCK_APIKEY = 'my-api-key' +MOCK_URL = 'https://mcspv2.ibm.com' +MOCK_SCOPE_COLLECTION_TYPE = 'accounts' +MOCK_SCOPE_ID = 'global_account' +MOCK_CALLER_EXT_CLAIM = {"productID": "prod-123"} +MOCK_HEADERS = {"header1": "value1", "header2": "value2"} +MOCK_PROXIES = {"https": "proxy1", "http": "proxy2"} +MOCK_PATH = '/api/2.0/{0}/{1}/apikeys/token'.format(MOCK_SCOPE_COLLECTION_TYPE, MOCK_SCOPE_ID) + + +# pylint: disable=too-many-statements +def test_mcspv2_authenticator1(): + # Use only required properties. + authenticator = MCSPV2Authenticator( + apikey=MOCK_APIKEY, + url=MOCK_URL, + scope_collection_type=MOCK_SCOPE_COLLECTION_TYPE, + scope_id=MOCK_SCOPE_ID, + ) + assert authenticator is not None + assert authenticator.authentication_type() == Authenticator.AUTHTYPE_MCSPV2 + assert authenticator.token_manager.apikey == MOCK_APIKEY + assert authenticator.token_manager.url == MOCK_URL + assert authenticator.token_manager.scope_collection_type == MOCK_SCOPE_COLLECTION_TYPE + assert authenticator.token_manager.scope_id == MOCK_SCOPE_ID + assert authenticator.token_manager.include_builtin_actions is False + assert authenticator.token_manager.include_custom_actions is False + assert authenticator.token_manager.include_roles is True + assert authenticator.token_manager.prefix_roles is False + assert authenticator.token_manager.caller_ext_claim is None + assert authenticator.token_manager.disable_ssl_verification is False + assert authenticator.token_manager.headers is None + assert authenticator.token_manager.proxies is None + + # Test setter functions. + authenticator.set_scope_collection_type("subscriptions") + assert authenticator.token_manager.scope_collection_type == "subscriptions" + + with pytest.raises(TypeError) as err: + authenticator.set_scope_collection_type(None) + assert str(err.value) == '"scope_collection_type" must be a string' + + authenticator.set_scope_id("new_id") + assert authenticator.token_manager.scope_id == "new_id" + + with pytest.raises(TypeError) as err: + authenticator.set_scope_id(None) + assert str(err.value) == '"scope_id" must be a string' + + authenticator.set_include_builtin_actions(True) + assert authenticator.token_manager.include_builtin_actions is True + + with pytest.raises(TypeError) as err: + authenticator.set_include_builtin_actions('True') + assert str(err.value) == '"include_builtin_actions" must be a bool' + + authenticator.set_include_custom_actions(True) + assert authenticator.token_manager.include_custom_actions is True + + with pytest.raises(TypeError) as err: + authenticator.set_include_custom_actions('not a bool') + assert str(err.value) == '"include_custom_actions" must be a bool' + + authenticator.set_include_roles(True) + assert authenticator.token_manager.include_roles is True + + with pytest.raises(TypeError) as err: + authenticator.set_include_roles('nope') + assert str(err.value) == '"include_roles" must be a bool' + + authenticator.set_prefix_roles(True) + assert authenticator.token_manager.prefix_roles is True + + with pytest.raises(TypeError) as err: + authenticator.set_prefix_roles('maybe') + assert str(err.value) == '"prefix_roles" must be a bool' + + authenticator.set_caller_ext_claim(MOCK_CALLER_EXT_CLAIM) + assert authenticator.token_manager.caller_ext_claim == MOCK_CALLER_EXT_CLAIM + + with pytest.raises(TypeError) as err: + authenticator.set_caller_ext_claim('not a dictionary') + assert str(err.value) == '"caller_ext_claim" must be a dictionary or None' + + authenticator.set_disable_ssl_verification(True) + assert authenticator.token_manager.disable_ssl_verification is True + + with pytest.raises(TypeError) as err: + authenticator.set_disable_ssl_verification('not a bool') + assert str(err.value) == '"disable_ssl_verification" must be a bool' + + authenticator.set_headers(MOCK_HEADERS) + assert authenticator.token_manager.headers == MOCK_HEADERS + + with pytest.raises(TypeError) as err: + authenticator.set_headers('not a dictionary') + assert str(err.value) == '"headers" must be a dictionary or None' + + authenticator.set_proxies(MOCK_PROXIES) + assert authenticator.token_manager.proxies == MOCK_PROXIES + + with pytest.raises(TypeError) as err: + authenticator.set_proxies('not a dictionary') + assert str(err.value) == '"proxies" must be a dictionary or None' + + +def test_mcspv2_authenticator2(): + # Test with all properties. + authenticator = MCSPV2Authenticator( + apikey=MOCK_APIKEY, + url=MOCK_URL, + scope_collection_type=MOCK_SCOPE_COLLECTION_TYPE, + scope_id=MOCK_SCOPE_ID, + include_builtin_actions=True, + include_custom_actions=True, + include_roles=False, + prefix_roles=True, + caller_ext_claim=MOCK_CALLER_EXT_CLAIM, + disable_ssl_verification=True, + headers=MOCK_HEADERS, + proxies=MOCK_PROXIES, + ) + assert authenticator.token_manager.apikey == MOCK_APIKEY + assert authenticator.token_manager.url == MOCK_URL + assert authenticator.token_manager.scope_collection_type == MOCK_SCOPE_COLLECTION_TYPE + assert authenticator.token_manager.scope_id == MOCK_SCOPE_ID + assert authenticator.token_manager.include_builtin_actions is True + assert authenticator.token_manager.include_custom_actions is True + assert authenticator.token_manager.include_roles is False + assert authenticator.token_manager.prefix_roles is True + assert authenticator.token_manager.caller_ext_claim == MOCK_CALLER_EXT_CLAIM + assert authenticator.token_manager.disable_ssl_verification is True + assert authenticator.token_manager.headers == MOCK_HEADERS + assert authenticator.token_manager.proxies == MOCK_PROXIES + + +def test_mcsp_authenticator_validate_failed(): + + # Check each property individually. + with pytest.raises(TypeError) as err: + MCSPV2Authenticator( + apikey=None, + url=MOCK_URL, + scope_collection_type=MOCK_SCOPE_COLLECTION_TYPE, + scope_id=MOCK_SCOPE_ID, + ) + assert str(err.value) == '"apikey" must be a string' + + with pytest.raises(TypeError) as err: + MCSPV2Authenticator( + apikey=MOCK_APIKEY, + url=None, + scope_collection_type=MOCK_SCOPE_COLLECTION_TYPE, + scope_id=MOCK_SCOPE_ID, + ) + assert str(err.value) == '"url" must be a string' + + with pytest.raises(TypeError) as err: + MCSPV2Authenticator( + apikey=MOCK_APIKEY, + url=MOCK_URL, + scope_collection_type=None, + scope_id=MOCK_SCOPE_ID, + ) + assert str(err.value) == '"scope_collection_type" must be a string' + + with pytest.raises(TypeError) as err: + MCSPV2Authenticator( + apikey=MOCK_APIKEY, + url=MOCK_URL, + scope_collection_type=MOCK_SCOPE_COLLECTION_TYPE, + scope_id=None, + ) + assert str(err.value) == '"scope_id" must be a string' + + with pytest.raises(TypeError) as err: + MCSPV2Authenticator( + apikey=MOCK_APIKEY, + url=MOCK_URL, + scope_collection_type=MOCK_SCOPE_COLLECTION_TYPE, + scope_id=MOCK_SCOPE_ID, + include_builtin_actions='not a bool', + ) + assert str(err.value) == '"include_builtin_actions" must be a bool' + + with pytest.raises(TypeError) as err: + MCSPV2Authenticator( + apikey=MOCK_APIKEY, + url=MOCK_URL, + scope_collection_type=MOCK_SCOPE_COLLECTION_TYPE, + scope_id=MOCK_SCOPE_ID, + include_custom_actions=None, + ) + assert str(err.value) == '"include_custom_actions" must be a bool' + + with pytest.raises(TypeError) as err: + MCSPV2Authenticator( + apikey=MOCK_APIKEY, + url=MOCK_URL, + scope_collection_type=MOCK_SCOPE_COLLECTION_TYPE, + scope_id=MOCK_SCOPE_ID, + include_roles=382636, + ) + assert str(err.value) == '"include_roles" must be a bool' + + with pytest.raises(TypeError) as err: + MCSPV2Authenticator( + apikey=MOCK_APIKEY, + url=MOCK_URL, + scope_collection_type=MOCK_SCOPE_COLLECTION_TYPE, + scope_id=MOCK_SCOPE_ID, + prefix_roles=None, + ) + assert str(err.value) == '"prefix_roles" must be a bool' + + +# utility function to construct a mock token server response containing an access token. +def get_mock_token_response(issued_at: int, time_to_live: int) -> str: + access_token_layout = { + "username": "dummy", + "role": "Admin", + "permissions": ["administrator", "manage_catalog"], + "sub": "admin", + "iss": "sss", + "aud": "sss", + "uid": "sss", + "iat": issued_at, + "exp": issued_at + time_to_live, + } + + access_token = jwt.encode( + access_token_layout, 'secret', algorithm='HS256', headers={'kid': '230498151c214b788dd97f22b85410a5'} + ) + + token_server_response = { + "token": access_token, + "token_type": "Bearer", + "expires_in": time_to_live, + "expiration": issued_at + time_to_live, + } + + # For convenience, return both the server response and the access_token. + return (json.dumps(token_server_response), access_token) + + +@responses.activate +def test_get_token(): + (response, access_token) = get_mock_token_response(int(time.time()), 7200) + responses.add(responses.POST, MOCK_URL + MOCK_PATH, body=response, status=200) + + auth_headers = {'Host': 'mcsp.cloud.ibm.com:443'} + authenticator = MCSPV2Authenticator( + apikey=MOCK_APIKEY, + url=MOCK_URL, + scope_collection_type=MOCK_SCOPE_COLLECTION_TYPE, + scope_id=MOCK_SCOPE_ID, + headers=auth_headers, + ) + + # Authenticate the request and verify the Authorization header. + request = {'headers': {}} + authenticator.authenticate(request) + assert request['headers']['Authorization'] == 'Bearer ' + access_token + + # Verify that the "get token" request contained the Host header. + assert responses.calls[0].request.headers.get('Host') == 'mcsp.cloud.ibm.com:443' + + +@responses.activate +def test_get_token_cached(): + (response, access_token) = get_mock_token_response(int(time.time()), 7200) + responses.add(responses.POST, MOCK_URL + MOCK_PATH, body=response, status=200) + + authenticator = MCSPV2Authenticator( + apikey=MOCK_APIKEY, + url=MOCK_URL, + scope_collection_type=MOCK_SCOPE_COLLECTION_TYPE, + scope_id=MOCK_SCOPE_ID, + ) + + # Authenticate the request and verify the Authorization header. + request = {'headers': {}} + authenticator.authenticate(request) + assert request['headers']['Authorization'] == 'Bearer ' + access_token + + # Authenticate a second request and verify that we used the same access token. + request = {'headers': {}} + authenticator.authenticate(request) + assert request['headers']['Authorization'] == 'Bearer ' + access_token + + +@responses.activate +def test_get_token_background_refresh(): + t1 = time.time() + t2 = t1 + 7200 + + # Setup the first token response. + (response1, access_token1) = get_mock_token_response(int(t1), 7200) + responses.add(responses.POST, MOCK_URL + MOCK_PATH, body=response1, status=200) + + # Setup the second token response. + (response2, access_token2) = get_mock_token_response(int(t2), 7200) + responses.add(responses.POST, MOCK_URL + MOCK_PATH, body=response2, status=200) + + authenticator = MCSPV2Authenticator( + apikey=MOCK_APIKEY, + url=MOCK_URL, + scope_collection_type=MOCK_SCOPE_COLLECTION_TYPE, + scope_id=MOCK_SCOPE_ID, + ) + + # Authenticate the request and verify that the first access_token is used. + request = {'headers': {}} + authenticator.authenticate(request) + assert request['headers']['Authorization'] == 'Bearer ' + access_token1 + + # Now put the token manager in the refresh window to trigger a background refresh scenario. + authenticator.token_manager.refresh_time = t1 - 1 + + # Authenticate a second request and verify that the correct access token is used. + # Note: Ideally, the token manager would trigger the refresh in a separate thread + # and it "should" return the first access token for this second authentication request + # while the token manager is obtaining a new access token. + # Unfortunately, the TokenManager class method does the refresh request synchronously, + # so we get back the second access token here instead. + # If we "fix" the TokenManager class to refresh asynchronously, we'll need to + # change this test case to expect the first access token here. + request = {'headers': {}} + authenticator.authenticate(request) + assert request['headers']['Authorization'] == 'Bearer ' + access_token2 + + # Wait for the background refresh to finish. + # No need to wait due to the synchronous logic in the TokenManager class mentioned above. + # time.sleep(2) + + # Authenticate another request and verify that the second access token is used again. + request = {'headers': {}} + authenticator.authenticate(request) + assert request['headers']['Authorization'] == 'Bearer ' + access_token2 + + +@responses.activate +def test_request_token(): + (response, access_token) = get_mock_token_response(time.time(), 30) + responses.add(responses.POST, MOCK_URL + MOCK_PATH, body=response, status=200) + + token_manager = MCSPV2TokenManager( + apikey=MOCK_APIKEY, + url=MOCK_URL, + scope_collection_type=MOCK_SCOPE_COLLECTION_TYPE, + scope_id=MOCK_SCOPE_ID, + disable_ssl_verification=True, + ) + token = token_manager.get_token() + + assert len(responses.calls) == 1 + assert ( + responses.calls[0].request.url + == MOCK_URL + + MOCK_PATH + + '?includeBuiltinActions=false&includeCustomActions=false&' + + 'includeRoles=true&prefixRolesWithDefinitionScope=false' + ) + assert responses.calls[0].request.headers.get('User-Agent').startswith('ibm-python-sdk-core/mcspv2-authenticator') + assert token == access_token + + +@responses.activate +def test_request_token_unsuccessful(): + response = """{ + "errorCode": "BXNIM0415E", + "errorMessage": "Provided API key could not be found" + } + """ + responses.add(responses.POST, url=MOCK_URL + MOCK_PATH, body=response, status=400) + + token_manager = MCSPV2TokenManager( + apikey="bad-api-key", + url=MOCK_URL, + scope_collection_type=MOCK_SCOPE_COLLECTION_TYPE, + scope_id=MOCK_SCOPE_ID, + disable_ssl_verification=True, + ) + with pytest.raises(Exception): + token_manager.request_token() + + assert len(responses.calls) == 1 + assert ( + responses.calls[0].request.url + == MOCK_URL + + MOCK_PATH + + '?includeBuiltinActions=false&includeCustomActions=false&' + + 'includeRoles=true&prefixRolesWithDefinitionScope=false' + ) + assert responses.calls[0].response.text == response diff --git a/test/test_utils.py b/test/test_utils.py index fed34ff1..4eeef51b 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -22,13 +22,12 @@ import pytest -from ibm_cloud_sdk_core import string_to_datetime, datetime_to_string, get_authenticator_from_environment +from ibm_cloud_sdk_core import string_to_datetime, datetime_to_string from ibm_cloud_sdk_core import string_to_datetime_list, datetime_to_string_list from ibm_cloud_sdk_core import string_to_date, date_to_string from ibm_cloud_sdk_core import convert_model, convert_list from ibm_cloud_sdk_core import get_query_param from ibm_cloud_sdk_core import read_external_sources -from ibm_cloud_sdk_core.authenticators import Authenticator, BasicAuthenticator, IAMAuthenticator from ibm_cloud_sdk_core.utils import GzipStream, strip_extra_slashes, is_json_mimetype from .utils.logger_utils import setup_test_logger @@ -271,346 +270,6 @@ def test_convert_list(): assert mock4_str == mock4 -# pylint: disable=too-many-statements -def test_get_authenticator_from_credential_file(): - file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-iam.env') - os.environ['IBM_CREDENTIALS_FILE'] = file_path - authenticator = get_authenticator_from_environment('ibm watson') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_IAM - assert authenticator.token_manager.apikey == '5678efgh' - assert authenticator.token_manager.url == 'https://iam.cloud.ibm.com' - assert authenticator.token_manager.client_id is None - assert authenticator.token_manager.client_secret is None - assert authenticator.token_manager.disable_ssl_verification is False - assert authenticator.token_manager.scope is None - del os.environ['IBM_CREDENTIALS_FILE'] - - file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-iam-assume.env') - os.environ['IBM_CREDENTIALS_FILE'] = file_path - authenticator = get_authenticator_from_environment('service 1') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_IAM_ASSUME - assert authenticator.token_manager.iam_delegate.apikey == 'my-api-key' - assert authenticator.token_manager.iam_profile_id == 'iam-profile-1' - assert authenticator.token_manager.url == 'https://iam.cloud.ibm.com' - assert authenticator.token_manager.client_id is None - assert authenticator.token_manager.client_secret is None - assert authenticator.token_manager.disable_ssl_verification is False - assert authenticator.token_manager.scope is None - del os.environ['IBM_CREDENTIALS_FILE'] - - file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-basic.env') - os.environ['IBM_CREDENTIALS_FILE'] = file_path - authenticator = get_authenticator_from_environment('watson') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_BASIC - assert authenticator.username == 'my_username' - del os.environ['IBM_CREDENTIALS_FILE'] - - file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-container.env') - os.environ['IBM_CREDENTIALS_FILE'] = file_path - authenticator = get_authenticator_from_environment('service 1') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_CONTAINER - assert authenticator.token_manager.cr_token_filename == 'crtoken.txt' - assert authenticator.token_manager.iam_profile_name == 'iam-user-123' - assert authenticator.token_manager.iam_profile_id == 'iam-id-123' - assert authenticator.token_manager.url == 'https://iamhost/iam/api' - assert authenticator.token_manager.scope == 'scope1' - assert authenticator.token_manager.client_id == 'iam-client-123' - assert authenticator.token_manager.client_secret == 'iam-secret-123' - assert authenticator.token_manager.disable_ssl_verification is True - - authenticator = get_authenticator_from_environment('service 2') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_CONTAINER - assert authenticator.token_manager.cr_token_filename is None - assert authenticator.token_manager.iam_profile_name == 'iam-user-123' - assert authenticator.token_manager.iam_profile_id is None - assert authenticator.token_manager.url == 'https://iam.cloud.ibm.com' - assert authenticator.token_manager.scope is None - assert authenticator.token_manager.client_id is None - assert authenticator.token_manager.client_secret is None - assert authenticator.token_manager.disable_ssl_verification is False - del os.environ['IBM_CREDENTIALS_FILE'] - - file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-cp4d.env') - os.environ['IBM_CREDENTIALS_FILE'] = file_path - authenticator = get_authenticator_from_environment('watson') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_CP4D - assert authenticator.token_manager.username == 'my_username' - assert authenticator.token_manager.password == 'my_password' - assert authenticator.token_manager.url == 'https://my_url/v1/authorize' - assert authenticator.token_manager.apikey is None - assert authenticator.token_manager.disable_ssl_verification is False - del os.environ['IBM_CREDENTIALS_FILE'] - - file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-no-auth.env') - os.environ['IBM_CREDENTIALS_FILE'] = file_path - authenticator = get_authenticator_from_environment('watson') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_NOAUTH - del os.environ['IBM_CREDENTIALS_FILE'] - - file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-bearer.env') - os.environ['IBM_CREDENTIALS_FILE'] = file_path - authenticator = get_authenticator_from_environment('watson') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_BEARERTOKEN - assert authenticator.bearer_token is not None - del os.environ['IBM_CREDENTIALS_FILE'] - - file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials.env') - os.environ['IBM_CREDENTIALS_FILE'] = file_path - authenticator = get_authenticator_from_environment('service_1') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_IAM - assert authenticator.token_manager.apikey == 'V4HXmoUtMjohnsnow=KotN' - assert authenticator.token_manager.client_id == 'somefake========id' - assert authenticator.token_manager.client_secret == '==my-client-secret==' - assert authenticator.token_manager.url == 'https://iamhost/iam/api=' - assert authenticator.token_manager.scope is None - del os.environ['IBM_CREDENTIALS_FILE'] - - file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-vpc.env') - os.environ['IBM_CREDENTIALS_FILE'] = file_path - authenticator = get_authenticator_from_environment('service1') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_VPC - assert authenticator.token_manager.iam_profile_crn is None - assert authenticator.token_manager.iam_profile_id is None - assert authenticator.token_manager.url == 'http://169.254.169.254' - del os.environ['IBM_CREDENTIALS_FILE'] - - file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-vpc.env') - os.environ['IBM_CREDENTIALS_FILE'] = file_path - authenticator = get_authenticator_from_environment('service2') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_VPC - assert authenticator.token_manager.iam_profile_crn == 'crn:iam-profile1' - assert authenticator.token_manager.iam_profile_id is None - assert authenticator.token_manager.url == 'http://vpc.imds.com/api' - del os.environ['IBM_CREDENTIALS_FILE'] - - file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-vpc.env') - os.environ['IBM_CREDENTIALS_FILE'] = file_path - authenticator = get_authenticator_from_environment('service3') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_VPC - assert authenticator.token_manager.iam_profile_crn is None - assert authenticator.token_manager.iam_profile_id == 'iam-profile1-id' - assert authenticator.token_manager.url == 'http://169.254.169.254' - del os.environ['IBM_CREDENTIALS_FILE'] - - file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-mcsp.env') - os.environ['IBM_CREDENTIALS_FILE'] = file_path - authenticator = get_authenticator_from_environment('service1') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_MCSP - assert authenticator.token_manager.url == 'https://mcsp.ibm.com' - assert authenticator.token_manager.apikey == 'my-api-key' - del os.environ['IBM_CREDENTIALS_FILE'] - - -def test_get_authenticator_from_credential_file_scope(): - file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials.env') - os.environ['IBM_CREDENTIALS_FILE'] = file_path - authenticator = get_authenticator_from_environment('service_2') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_IAM - assert authenticator.token_manager.apikey == 'V4HXmoUtMjohnsnow=KotN' - assert authenticator.token_manager.client_id == 'somefake========id' - assert authenticator.token_manager.client_secret == '==my-client-secret==' - assert authenticator.token_manager.url == 'https://iamhost/iam/api=' - assert authenticator.token_manager.scope == 'A B C D' - del os.environ['IBM_CREDENTIALS_FILE'] - - -def test_get_authenticator_from_env_variables(): - os.environ['TEST_APIKEY'] = '5678efgh' - authenticator = get_authenticator_from_environment('test') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_IAM - assert authenticator.token_manager.apikey == '5678efgh' - del os.environ['TEST_APIKEY'] - - os.environ['TEST_IAM_PROFILE_ID'] = 'iam-profile-id1' - authenticator = get_authenticator_from_environment('test') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_CONTAINER - assert authenticator.token_manager.iam_profile_id == 'iam-profile-id1' - del os.environ['TEST_IAM_PROFILE_ID'] - - os.environ['SERVICE_1_APIKEY'] = 'V4HXmoUtMjohnsnow=KotN' - authenticator = get_authenticator_from_environment('service_1') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_IAM - assert authenticator.token_manager.apikey == 'V4HXmoUtMjohnsnow=KotN' - del os.environ['SERVICE_1_APIKEY'] - - os.environ['SERVICE_2_APIKEY'] = 'johnsnow' - os.environ['SERVICE_2_SCOPE'] = 'A B C D' - authenticator = get_authenticator_from_environment('service_2') - assert authenticator is not None - assert authenticator.token_manager.apikey == 'johnsnow' - assert authenticator.token_manager.scope == 'A B C D' - del os.environ['SERVICE_2_APIKEY'] - del os.environ['SERVICE_2_SCOPE'] - - os.environ['SERVICE3_AUTH_TYPE'] = 'mCsP' - os.environ['SERVICE3_AUTH_URL'] = 'https://mcsp.ibm.com' - os.environ['SERVICE3_APIKEY'] = 'my-api-key' - authenticator = get_authenticator_from_environment('service3') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_MCSP - assert authenticator.token_manager.apikey == 'my-api-key' - assert authenticator.token_manager.url == 'https://mcsp.ibm.com' - del os.environ['SERVICE3_APIKEY'] - del os.environ['SERVICE3_AUTH_TYPE'] - del os.environ['SERVICE3_AUTH_URL'] - - -def test_vcap_credentials(): - vcap_services = '{"test":[{"credentials":{ \ - "url":"https://gateway.watsonplatform.net/compare-comply/api",\ - "username":"bogus username", \ - "password":"bogus password"}}]}' - - os.environ['VCAP_SERVICES'] = vcap_services - authenticator = get_authenticator_from_environment('test') - assert isinstance(authenticator, BasicAuthenticator) - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_BASIC - assert authenticator.username == 'bogus username' - assert authenticator.password == 'bogus password' - del os.environ['VCAP_SERVICES'] - - vcap_services = '{"test":[{"credentials":{ \ - "url":"https://gateway.watsonplatform.net/compare-comply/api",\ - "apikey":"bogus apikey"}}]}' - - os.environ['VCAP_SERVICES'] = vcap_services - authenticator = get_authenticator_from_environment('test') - assert isinstance(authenticator, IAMAuthenticator) - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_IAM - assert authenticator.token_manager.apikey == 'bogus apikey' - del os.environ['VCAP_SERVICES'] - - vcap_services = '{"test":[{"credentials":{ \ - "url":"https://gateway.watsonplatform.net/compare-comply/api",\ - "iam_apikey":"bogus apikey"}}]}' - - os.environ['VCAP_SERVICES'] = vcap_services - authenticator = get_authenticator_from_environment('test') - assert isinstance(authenticator, IAMAuthenticator) - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_IAM - assert authenticator.token_manager.apikey == 'bogus apikey' - del os.environ['VCAP_SERVICES'] - - vcap_services = '{"test":[{"name": "testname",\ - "credentials":{ \ - "url":"https://gateway.watsonplatform.net/compare-comply/api",\ - "username":"bogus username", \ - "password":"bogus password"}}]}' - - os.environ['VCAP_SERVICES'] = vcap_services - authenticator = get_authenticator_from_environment('testname') - assert isinstance(authenticator, BasicAuthenticator) - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_BASIC - assert authenticator.username == 'bogus username' - assert authenticator.password == 'bogus password' - del os.environ['VCAP_SERVICES'] - - -def test_vcap_credentials_2(): - vcap_services = '{\ - "test":[{"name": "testname",\ - "credentials":{ \ - "url":"https://gateway.watsonplatform.net/compare-comply/api",\ - "username":"bogus username2", \ - "password":"bogus password2"}},\ - {"name": "othertestname",\ - "credentials":{ \ - "url":"https://gateway.watsonplatform.net/compare-comply/api",\ - "username":"bogus username3", \ - "password":"bogus password3"}}],\ - "testname":[{"name": "nottestname",\ - "credentials":{ \ - "url":"https://gateway.watsonplatform.net/compare-comply/api",\ - "username":"bogus username", \ - "password":"bogus password"}}],\ - "equals_sign_test":[{"name": "equals_sign_test",\ - "credentials":{ \ - "iam_apikey": "V4HXmoUtMjohnsnow=KotN",\ - "iam_apikey_description": "Auto generated apikey...",\ - "iam_apikey_name": "auto-generated-apikey-111-222-333",\ - "iam_role_crn": "crn:v1:bluemix:public:iam::::serviceRole:Manager",\ - "iam_serviceid_crn": "crn:v1:staging:public:iam-identity::a/::serviceid:ServiceID-1234",\ - "url": "https://gateway.watsonplatform.net/testService",\ - "auth_url": "https://iamhost/iam/api="}}]}' - - os.environ['VCAP_SERVICES'] = vcap_services - authenticator = get_authenticator_from_environment('testname') - assert isinstance(authenticator, BasicAuthenticator) - assert authenticator.username == 'bogus username2' - assert authenticator.password == 'bogus password2' - - authenticator = get_authenticator_from_environment('equals_sign_test') - assert isinstance(authenticator, IAMAuthenticator) - assert authenticator.token_manager.apikey == 'V4HXmoUtMjohnsnow=KotN' - del os.environ['VCAP_SERVICES'] - - vcap_services = '{"test":[{\ - "credentials":{ \ - "url":"https://gateway.watsonplatform.net/compare-comply/api",\ - "username":"bogus username", \ - "password":"bogus password"}},\ - {"credentials":{ \ - "url":"https://gateway.watsonplatform.net/compare-comply/api",\ - "username":"bogus username2", \ - "password":"bogus password2"}}\ - ]}' - - os.environ['VCAP_SERVICES'] = vcap_services - authenticator = get_authenticator_from_environment('test') - assert isinstance(authenticator, BasicAuthenticator) - assert authenticator.username == 'bogus username' - assert authenticator.password == 'bogus password' - del os.environ['VCAP_SERVICES'] - - vcap_services = '{"first":[],\ - "test":[{"credentials":{ \ - "url":"https://gateway.watsonplatform.net/compare-comply/api",\ - "username":"bogus username", \ - "password":"bogus password"}}],\ - "last":[]}' - - os.environ['VCAP_SERVICES'] = vcap_services - authenticator = get_authenticator_from_environment('test') - assert isinstance(authenticator, BasicAuthenticator) - assert authenticator.username == 'bogus username' - assert authenticator.password == 'bogus password' - del os.environ['VCAP_SERVICES'] - - vcap_services = '{"test":[],\ - "last":[]}' - - os.environ['VCAP_SERVICES'] = vcap_services - authenticator = get_authenticator_from_environment('test') - assert authenticator is None - del os.environ['VCAP_SERVICES'] - - -def test_multi_word_service_name(): - os.environ['PERSONALITY_INSIGHTS_APIKEY'] = '5678efgh' - authenticator = get_authenticator_from_environment('personality-insights') - assert authenticator is not None - assert authenticator.authentication_type() == Authenticator.AUTHTYPE_IAM - assert authenticator.token_manager.apikey == '5678efgh' - del os.environ['PERSONALITY_INSIGHTS_APIKEY'] - - def test_read_external_sources_1(): # Set IBM_CREDENTIALS_FILE to a non-existent file (should be silently ignored). bad_file_path = os.path.join(os.path.dirname(__file__), 'NOT_A_FILE') diff --git a/test_integration/test_mcspv2_authenticator_integration.py b/test_integration/test_mcspv2_authenticator_integration.py new file mode 100644 index 00000000..9739be35 --- /dev/null +++ b/test_integration/test_mcspv2_authenticator_integration.py @@ -0,0 +1,46 @@ +# pylint: disable=missing-docstring +import logging +import os + +from test.utils.logger_utils import setup_test_logger +from ibm_cloud_sdk_core import get_authenticator_from_environment + +setup_test_logger(logging.WARNING) + + +# Note: Only the unit tests are run by default. +# +# In order to test with a live MCSP token server, create file "mcspv2test.env" in the project root. +# It should look like this: +# +# required properties: +# +# MCSPV2TEST1_AUTH_URL= e.g. https://account-iam.platform.dev.saas.ibm.com +# MCSPV2TEST1_AUTH_TYPE=mcspv2 +# MCSPV2TEST1_APIKEY= +# MCSPV2TEST1_SCOPE_COLLECTION_TYPE=accounts (use any valid collection type value) +# MCSPV2TEST1_SCOPE_ID=global_account (use any valid scope id) +# +# optional properties: +# +# MCSPV2TEST1_INCLUDE_BUILTIN_ACTIONS=true|false +# MCSPV2TEST1_INCLUDE_CUSTOM_ACTIONS=true|false +# MCSPV2TEST1_INCLUDE_ROLES=true|false +# MCSPV2TEST1_PREFIX_ROLES=true|false +# MCSPV2TEST1_CALLER_EXT_CLAIM={"productID":"prod123"} +# +# Then run this command: +# pytest test_integration/test_mcspv2_authenticator_integration.py +def test_mcspv2_authenticator(): + os.environ['IBM_CREDENTIALS_FILE'] = 'mcspv2test.env' + + authenticator = get_authenticator_from_environment('mcspv2test1') + assert authenticator is not None + + request = {'headers': {}} + authenticator.authenticate(request) + assert request['headers']['Authorization'] is not None + + auth_header = request['headers']['Authorization'] + assert 'Bearer' in auth_header + print("Authorization: ", auth_header)