From c9616d7a39d68d193796444649518f51b41dffe6 Mon Sep 17 00:00:00 2001 From: Colin Moynes <77994705+colinmoynes@users.noreply.github.com> Date: Mon, 27 Oct 2025 21:17:02 +0530 Subject: [PATCH 1/2] Added token creation capabilities (#223) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Auth now creates a token if none exists * Bump version: 1.8.6 → 1.8.7 * Update CHANGELOG.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * updated pylintrc to 13 max pos args * updated pylint max-args to 13 * Removed max-positional-args setting in pylintrc * Set default force value in create_config_files() * Update cloudsmith_cli/core/config.py wrapping create=y within else Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .bumpversion.cfg | 2 +- .pylintrc | 6 +- CHANGELOG.md | 8 ++ cloudsmith_cli/cli/commands/auth.py | 129 +++++++------------ cloudsmith_cli/cli/commands/tokens.py | 178 ++++++++++++++++++++++---- cloudsmith_cli/cli/webserver.py | 47 ++++++- cloudsmith_cli/core/config.py | 11 +- cloudsmith_cli/data/VERSION | 2 +- 8 files changed, 263 insertions(+), 120 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 49b8c4e2..895662a3 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.8.6 +current_version = 1.8.7 commit = True tag = True parse = (?P\d+)\.(?P\d+)\.(?P\d+) diff --git a/.pylintrc b/.pylintrc index 6a7cd8b1..0165d7f3 100644 --- a/.pylintrc +++ b/.pylintrc @@ -99,10 +99,6 @@ recursive=no # source root. source-roots= -# When enabled, pylint would attempt to guess common misconfiguration and emit -# user-friendly hints instead of false-positive error messages. -suggestion-mode=yes - # Allow loading of arbitrary C extensions. Extensions are imported into the # active Python interpreter and may run arbitrary code. unsafe-load-any-extension=no @@ -285,7 +281,7 @@ exclude-too-few-public-methods= ignored-parents= # Maximum number of arguments for function / method. -max-args=5 +max-args=13 # Maximum number of attributes for a class (see R0902). max-attributes=7 diff --git a/CHANGELOG.md b/CHANGELOG.md index c244e006..57af2f89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +## [1.8.7] - 2025-10-27 + +### Added + +- `Cloudsmith auth -o --token` now creates a new token if none previously existed. +- Added support for json output for auth via `--json` param. +- Added new `create` command for tokens. If authenticated and no previous token exists, this allows for new token creation. + ## [1.8.6] - 2025-10-16 ### Added diff --git a/cloudsmith_cli/cli/commands/auth.py b/cloudsmith_cli/cli/commands/auth.py index 5dc6ad1d..b2500959 100644 --- a/cloudsmith_cli/cli/commands/auth.py +++ b/cloudsmith_cli/cli/commands/auth.py @@ -4,14 +4,42 @@ import click -from ...core.api import exceptions, user -from ...core.config import create_config_files, new_config_messaging from .. import decorators, validators from ..exceptions import handle_api_exceptions from ..saml import create_configured_session, get_idp_url -from ..utils import maybe_spinner from ..webserver import AuthenticationWebRequestHandler, AuthenticationWebServer from .main import main +from .tokens import create + +# Authentication server configuration +AUTH_SERVER_HOST = "127.0.0.1" +AUTH_SERVER_PORT = 12400 + + +def _perform_saml_authentication(opts, owner, enable_token_creation=False): + """Perform SAML authentication via web browser and local web server.""" + session = create_configured_session(opts) + api_host = opts.api_config.host + + idp_url = get_idp_url(api_host, owner, session=session) + click.echo( + f"Opening your organization's SAML IDP URL in your browser: {click.style(idp_url, bold=True)}" + ) + click.echo() + webbrowser.open(idp_url) + click.echo("Starting webserver to begin authentication ... ") + + auth_server = AuthenticationWebServer( + (AUTH_SERVER_HOST, AUTH_SERVER_PORT), + AuthenticationWebRequestHandler, + owner=owner, + session=session, + debug=opts.debug, + refresh_api_on_success=enable_token_creation, + api_opts=opts.api_config, + ) + + auth_server.handle_request() @main.command(aliases=["auth"]) @@ -38,94 +66,33 @@ is_flag=True, help="Force refresh of user API token without prompts.", ) +@click.option( + "--save-config", + default=False, + is_flag=True, + help="Save the new API key to your configuration files.", +) +@click.option( + "--json", + default=False, + is_flag=True, + help="Output token details in json format.", +) @decorators.common_cli_config_options @decorators.common_cli_output_options @decorators.initialise_api @click.pass_context -def authenticate(ctx, opts, owner, token, force): +def authenticate(ctx, opts, owner, token, force, save_config, json): """Authenticate to Cloudsmith using the org's SAML setup.""" owner = owner[0].strip("'[]'") - api_host = opts.api_config.host click.echo( - "Beginning authentication for the {owner} org ... ".format( - owner=click.style(owner, bold=True) - ) + f"Beginning authentication for the {click.style(owner, bold=True)} org ... " ) - session = create_configured_session(opts) - context_message = "Failed to authenticate via SSO!" with handle_api_exceptions(ctx, opts=opts, context_msg=context_message): - idp_url = get_idp_url(api_host, owner, session=session) - click.echo( - "Opening your organization's SAML IDP URL in your browser: %(idp_url)s" - % {"idp_url": click.style(idp_url, bold=True)} - ) - click.echo() - webbrowser.open(idp_url) - click.echo("Starting webserver to begin authentication ... ") - - auth_server = AuthenticationWebServer( - ("127.0.0.1", 12400), - AuthenticationWebRequestHandler, - api_host=api_host, - owner=owner, - session=session, - debug=opts.debug, - ) - auth_server.handle_request() - - if not token: - return - - try: - api_token = user.create_user_token_saml() - click.echo(f"New token value: {click.style(api_token.key, fg='magenta')}") - - if not token: - create, has_errors = create_config_files( - ctx, opts, api_key=api_token.key - ) - new_config_messaging(has_errors, opts, create, api_key=api_token.key) - except exceptions.ApiException as exc: - if exc.status == 400: - if not force: - if "User has already created an API key" in exc.detail: - click.confirm( - "User already has a token. Would you like to recreate it?", - abort=True, - ) - else: - raise - - context_msg = "Failed to refresh the token!" - with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): - api_tokens = user.list_user_tokens() - for t in api_tokens: - click.echo("Current tokens:") - click.echo( - f"Token: {click.style(t.key, fg='magenta')}, " - f"Created: {click.style(t.created, fg='green')}, " - f"slug_perm: {click.style(t.slug_perm, fg='cyan')}" - ) - - if not force: - token_slug = click.prompt( - "Please enter the slug_perm of the token you would like to refresh" - ) - click.echo(f"Refreshing token {token_slug}... ", nl=False) - else: - # Use the first available slug_perm for simplicity - token_slug = api_tokens[0].slug_perm - click.echo(f"Refreshing token {token_slug}... ", nl=False) - - with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): - with maybe_spinner(opts): - new_token = user.refresh_user_token(token_slug) - click.secho("OK", fg="green") - click.echo(f"New token value: {click.style(new_token.key, fg='magenta')}") + _perform_saml_authentication(opts, owner, enable_token_creation=token) - if not force: - create, has_errors = create_config_files(ctx, opts, api_key=new_token.key) - new_config_messaging(has_errors, opts, create, api_key=new_token.key) + if token: + ctx.invoke(create, opts=opts, save_config=save_config, force=force, json=json) diff --git a/cloudsmith_cli/cli/commands/tokens.py b/cloudsmith_cli/cli/commands/tokens.py index a4b96ab5..b4977e68 100644 --- a/cloudsmith_cli/cli/commands/tokens.py +++ b/cloudsmith_cli/cli/commands/tokens.py @@ -1,12 +1,36 @@ import click -from ...core.api import user as api +from ...core.api import exceptions, user as api +from ...core.config import create_config_files, new_config_messaging from .. import command, decorators from ..exceptions import handle_api_exceptions from ..utils import maybe_print_as_json, maybe_spinner from .main import main +def handle_duplicate_token_error(exc, ctx, opts, save_config, force): + """ + Handle the case where user already has a token. + + Returns the token from refresh if user confirms, otherwise exits. + """ + if ( + exc.status == 400 + and exc.detail + and "User has already created an API key" in exc.detail + ): + if not force: + if not click.confirm( + "User already has a token. Would you like to recreate it?", + abort=False, + ): + return None + return refresh_existing_token_interactive( + ctx, opts, save_config=save_config, force=force + ) + raise exc + + @main.group(cls=command.AliasGroup, name="tokens") @decorators.common_cli_config_options @decorators.common_cli_output_options @@ -39,11 +63,47 @@ def list_tokens(ctx, opts): print_tokens(tokens) +@tokens.command() +@click.option( + "--save-config", + default=False, + is_flag=True, + help="Save the new API key to your configuration files.", +) +@click.option( + "-f", + "--force", + default=False, + is_flag=True, + help="Force creation of user API token without prompts.", +) +@decorators.common_cli_config_options +@decorators.common_cli_output_options +@decorators.common_api_auth_options +@decorators.initialise_api +@click.pass_context +def create(ctx, opts, save_config, force=False, json=False): + """Create a new API token.""" + new_token = _create(ctx, opts, save_config, force, json) + + if save_config: + create, has_errors = create_config_files( + ctx, opts, api_key=new_token.key, force=force + ) + new_config_messaging(has_errors, opts, create, api_key=new_token.key) + + @tokens.command() @click.argument( "token_slug", required=False, ) +@click.option( + "--save-config", + default=False, + is_flag=True, + help="Save the new API key to your configuration files.", +) @click.option( "-f", "--force", @@ -56,41 +116,113 @@ def list_tokens(ctx, opts): @decorators.common_api_auth_options @decorators.initialise_api @click.pass_context -def refresh(ctx, opts, token_slug, force): +def refresh(ctx, opts, token_slug, force, save_config): """Refresh a specific API token by its slug.""" + new_token = refresh_existing_token_interactive( + ctx, opts, token_slug, save_config, force + ) + + if new_token: + if maybe_print_as_json(opts, new_token): + return new_token + + +def print_tokens(tokens): + for token in tokens: + click.echo( + f"Token: {click.style(token.key, fg='magenta')}, " + f"Created: {click.style(token.created, fg='green')}, " + f"slug_perm: {click.style(token.slug_perm, fg='cyan')}" + ) + + +def refresh_existing_token_interactive( + ctx, opts, token_slug=None, save_config=False, force=False +): + """Refresh an existing API token with interactive token selection, or create new if none exist.""" context_msg = "Failed to refresh the token!" if not token_slug: - with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): - with maybe_spinner(opts): + try: + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): api_tokens = api.list_user_tokens() + except exceptions.ApiException as exc: + # If we can't list tokens due to API error, fall back to creating a new one + if opts.debug: + click.echo(f"Debug: Failed to list tokens with error: {exc}", err=True) + click.echo( + "Unable to list existing tokens. Creating a new token instead..." + ) + return _create(ctx, opts, save_config=save_config, force=force) + + if not api_tokens: + click.echo("No existing tokens found. Creating a new token instead...") + return _create(ctx, opts, save_config=save_config, force=force) + click.echo("Current tokens:") print_tokens(api_tokens) + if not force: token_slug = click.prompt( "Please enter the slug_perm of the token you would like to refresh" ) else: - # Use the first available slug_perm for simplicity + # Use the first available slug_perm when force is enabled token_slug = api_tokens[0].slug_perm - click.echo(f"Refreshing token {token_slug}... ", nl=False) + click.echo(f"Using token {token_slug} (first available)") click.echo(f"Refreshing token {token_slug}... ", nl=False) - with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): - with maybe_spinner(opts): - new_token = api.refresh_user_token(token_slug) - click.secho("OK", fg="green") - - if maybe_print_as_json(opts, new_token): - return - - click.echo(f"New token value: {click.style(new_token.key, fg='magenta')}") - + try: + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + with maybe_spinner(opts): + new_token = api.refresh_user_token(token_slug) -def print_tokens(tokens): - for token in tokens: - click.echo( - f"Token: {click.style(token.key, fg='magenta')}, " - f"Created: {click.style(token.created, fg='green')}, " - f"slug_perm: {click.style(token.slug_perm, fg='cyan')}" - ) + if save_config: + create, has_errors = create_config_files( + ctx, opts, api_key=new_token.key, force=force + ) + new_config_messaging(has_errors, opts, create, api_key=new_token.key) + + if maybe_print_as_json(opts, new_token): + return + + click.secho("OK", fg="green") + click.echo(f"New token value: {click.style(new_token.key, fg='magenta')}") + + return new_token + except exceptions.ApiException as exc: + # If refresh fails due to API error, offer to create a new token instead + if opts.debug: + click.echo(f"\nDebug: Refresh failed with error: {exc}", err=True) + click.echo("\nRefresh failed. Creating a new token instead...") + return _create(ctx, opts, save_config=save_config, force=force) + + +def _create(ctx, opts, save_config=False, force=False, json=False): + """Create a new API token.""" + try: + new_token = api.create_user_token_saml() + + if new_token: + if json: + opts.output = "json" + if maybe_print_as_json(opts, new_token): + return + if maybe_print_as_json(opts, new_token): + return + click.echo(f"New token value: {click.style(new_token.key, fg='magenta')}") + return new_token + + return new_token + + except exceptions.ApiException as exc: + if exc.status == 401: + click.echo(f"{exc.detail}") + return + if json: + opts.output = "json" + if exc.status == 400: + new_token = handle_duplicate_token_error( + exc, ctx, opts, save_config=save_config, force=force + ) + return new_token diff --git a/cloudsmith_cli/cli/webserver.py b/cloudsmith_cli/cli/webserver.py index d5ae351b..381b3433 100644 --- a/cloudsmith_cli/cli/webserver.py +++ b/cloudsmith_cli/cli/webserver.py @@ -8,6 +8,7 @@ import click from ..core.api.exceptions import ApiException +from ..core.api.init import initialise_api from ..core.keyring import store_sso_tokens from .saml import exchange_2fa_token @@ -46,10 +47,11 @@ class AuthenticationWebServer(HTTPServer): def __init__( self, server_address, RequestHandlerClass, bind_and_activate=True, **kwargs ): - self.api_host = kwargs.get("api_host") self.owner = kwargs.get("owner") self.session = kwargs.get("session") self.debug = kwargs.get("debug", False) + self.refresh_api_on_success = kwargs.get("refresh_api_on_success", False) + self.api_opts = kwargs.get("api_opts") self.exception = None super().__init__( @@ -58,15 +60,36 @@ def __init__( self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + @property + def api_host(self): + """Get the API host from api_opts.""" + return self.api_opts.host if self.api_opts else None + + def refresh_api_config_after_auth(self): + """Refresh the API configuration to pick up newly stored SSO tokens.""" + if not self.api_opts: + return + + initialise_api( + debug=self.api_opts.debug, + host=self.api_opts.host, + proxy=getattr(self.api_opts, "proxy", None), + ssl_verify=getattr(self.api_opts, "ssl_verify", True), + user_agent=getattr(self.api_opts, "user_agent", None), + headers=getattr(self.api_opts, "headers", None), + rate_limit=getattr(self.api_opts, "rate_limit", True), + ) + def finish_request(self, request, client_address): self.RequestHandlerClass( request, client_address, self, - api_host=self.api_host, owner=self.owner, debug=self.debug, session=self.session, + refresh_api_on_success=self.refresh_api_on_success, + server_instance=self, ) def _handle_request_noblock(self): @@ -85,7 +108,7 @@ def _handle_request_noblock(self): self.handle_error(request, client_address) self.exception = exc self.shutdown_request(request) - except: # noqa: E722 + except BaseException: self.shutdown_request(request) raise else: @@ -98,18 +121,26 @@ def handle_error(self, request, client_address): def shutdown_request(self, request): super().shutdown_request(request) if self.exception: - raise self.exception + exc = self.exception + self.exception = None # Clear to prevent re-raising + raise exc class AuthenticationWebRequestHandler(BaseHTTPRequestHandler): def __init__(self, request, client_address, server, **kwargs): - self.api_host = kwargs.get("api_host") self.owner = kwargs.get("owner") self.debug = kwargs.get("debug", False) self.session = kwargs.get("session") + self.refresh_api_on_success = kwargs.get("refresh_api_on_success", False) + self.server_instance = kwargs.get("server_instance") super().__init__(request, client_address, server) + @property + def api_host(self): + """Get the API host from the server instance.""" + return self.server_instance.api_host if self.server_instance else None + def _return_response(self, status=200, message=None): self.send_response(status) self.send_header("Content-Type", "text/html; charset=utf-8") @@ -175,6 +206,9 @@ def do_GET(self): refresh_token, ) + if self.refresh_api_on_success and self.server_instance: + self.server_instance.refresh_api_config_after_auth() + self._return_success_response() return @@ -192,6 +226,9 @@ def do_GET(self): refresh_token, ) + if self.refresh_api_on_success and self.server_instance: + self.server_instance.refresh_api_config_after_auth() + click.secho("\nAuthentication complete", fg="green", err=True) self._return_success_response() return diff --git a/cloudsmith_cli/core/config.py b/cloudsmith_cli/core/config.py index 16044fb6..1ec8ef8b 100644 --- a/cloudsmith_cli/core/config.py +++ b/cloudsmith_cli/core/config.py @@ -10,7 +10,7 @@ ) -def create_config_files(ctx, opts, api_key): +def create_config_files(ctx, opts, api_key, force=False): """Create default config files.""" # pylint: disable=unused-argument config_reader = opts.get_config_reader() @@ -22,9 +22,12 @@ def create_config_files(ctx, opts, api_key): create = False else: click.echo() - create = click.confirm( - "No default config file(s) found, do you want to create them?" - ) + if not force: + create = click.confirm( + "No default config file(s) found, do you want to create them?" + ) + else: + create = "y" click.echo() if not create: diff --git a/cloudsmith_cli/data/VERSION b/cloudsmith_cli/data/VERSION index f263cd11..88d3ee79 100644 --- a/cloudsmith_cli/data/VERSION +++ b/cloudsmith_cli/data/VERSION @@ -1 +1 @@ -1.8.6 +1.8.7 From b145d35f154c233970f199ba8497304920f0282b Mon Sep 17 00:00:00 2001 From: Samuel Tam Date: Tue, 28 Oct 2025 17:16:46 +0800 Subject: [PATCH 2/2] Drop mix_stderr=False from CliRunner() (#224) Upstream deprecated the option to mangle original stdout and stderr, and instead provides result.output that copies the original streams into a new mixed stream, in proper terminal order. Passing mix_stderr is invalid since version 8.2. > return click.testing.CliRunner(mix_stderr=False) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ E TypeError: CliRunner.__init__() got an unexpected keyword argument 'mix_stderr' See: https://github.com/pallets/click/pull/2523 --- cloudsmith_cli/cli/tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudsmith_cli/cli/tests/conftest.py b/cloudsmith_cli/cli/tests/conftest.py index 5e4936a7..c42f23f4 100644 --- a/cloudsmith_cli/cli/tests/conftest.py +++ b/cloudsmith_cli/cli/tests/conftest.py @@ -19,7 +19,7 @@ def _get_env_var_or_skip(key): @pytest.fixture() def runner(): """Return a CliRunner with which to run Commands.""" - return click.testing.CliRunner(mix_stderr=False) + return click.testing.CliRunner() @pytest.fixture()