From ffa86e9c42d4f9f605136f7493c9cb7fa24d8fbe Mon Sep 17 00:00:00 2001 From: Daniel Ayaz Date: Sat, 29 Mar 2025 00:30:42 -0500 Subject: [PATCH 1/4] Upgrade Suggestions added for upgrading from Basic to Standard kafka cluster and upgrading stream governance of an environment from essentials to advanced. --- internal/environment/command_create.go | 22 ++++++ internal/kafka/command_cluster_create.go | 33 ++++++++- internal/kafka/copy_manager.go | 92 ++++++++++++++++++++++++ internal/kafka/upgrade_suggestions.json | 22 ++++++ 4 files changed, 167 insertions(+), 2 deletions(-) create mode 100644 internal/kafka/copy_manager.go create mode 100644 internal/kafka/upgrade_suggestions.json diff --git a/internal/environment/command_create.go b/internal/environment/command_create.go index df3ea245dd..11eba3d0d8 100644 --- a/internal/environment/command_create.go +++ b/internal/environment/command_create.go @@ -8,6 +8,7 @@ import ( orgv2 "github.com/confluentinc/ccloud-sdk-go-v2/org/v2" + "github.com/confluentinc/cli/v4/internal/kafka" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" "github.com/confluentinc/cli/v4/pkg/output" "github.com/confluentinc/cli/v4/pkg/utils" @@ -54,6 +55,27 @@ func (c *command) create(cmd *cobra.Command, args []string) error { return err } + // Show upgrade suggestion for Essentials package + if strings.ToLower(governancePackage) == "essentials" { + orgId := c.Context.GetCurrentOrganization() + if orgId == "" { + return nil + } + + copyManager, err := kafka.NewCopyManager() + if err != nil { + return nil + } + + content, cta, err := copyManager.GetCopy("stream_governance_upgrade_essentials_to_advanced", orgId) + if err != nil { + return nil + } + + formattedCopy := copyManager.FormatCopy(content, cta, environment.GetId()) + output.Println(c.Config.EnableColor, "\n"+formattedCopy) + } + c.Context.AddEnvironment(environment.GetId()) _ = c.Config.Save() diff --git a/internal/kafka/command_cluster_create.go b/internal/kafka/command_cluster_create.go index 83d5bd812f..a74966e32e 100644 --- a/internal/kafka/command_cluster_create.go +++ b/internal/kafka/command_cluster_create.go @@ -91,6 +91,11 @@ func (c *clusterCommand) create(cmd *cobra.Command, args []string) error { return err } + // If no type specified, default to basic + if clusterType == "" { + clusterType = skuBasic + } + sku, err := stringToSku(clusterType) if err != nil { return err @@ -160,7 +165,7 @@ func (c *clusterCommand) create(cmd *cobra.Command, args []string) error { createCluster.Spec.Network = &cmkv2.EnvScopedObjectReference{Id: network} } - kafkaCluster, httpResp, err := c.V2Client.CreateKafkaCluster(createCluster) + cluster, httpResp, err := c.V2Client.CreateKafkaCluster(createCluster) if err != nil { return catchClusterConfigurationNotValidError(err, httpResp, cloud, region) } @@ -169,7 +174,31 @@ func (c *clusterCommand) create(cmd *cobra.Command, args []string) error { output.ErrPrintln(c.Config.EnableColor, getKafkaProvisionEstimate(sku)) } - return c.outputKafkaClusterDescription(cmd, &kafkaCluster, false) + if err := c.outputKafkaClusterDescription(cmd, &cluster, false); err != nil { + return err + } + + if strings.ToLower(clusterType) == strings.ToLower(skuBasic) { + orgId := c.Context.GetCurrentOrganization() + if orgId == "" { + return nil + } + + copyManager, err := NewCopyManager() + if err != nil { + return nil + } + + content, cta, err := copyManager.GetCopy("cluster_upgrade_basic_to_standard", orgId) + if err != nil { + return nil + } + + formattedCopy := copyManager.FormatCopy(content, cta, cluster.GetId()) + output.Println(c.Config.EnableColor, "\n"+formattedCopy) + } + + return nil } func stringToAvailability(s string, sku ccstructs.Sku) (string, error) { diff --git a/internal/kafka/copy_manager.go b/internal/kafka/copy_manager.go new file mode 100644 index 0000000000..65a1bf1a72 --- /dev/null +++ b/internal/kafka/copy_manager.go @@ -0,0 +1,92 @@ +package kafka + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "runtime" + "strings" +) + +type CopyVariation struct { + Weight int `json:"weight"` + Content string `json:"content"` + CTA string `json:"cta"` +} + +type CopyScenario struct { + Variations []CopyVariation `json:"variations"` +} + +type CopyData struct { + Scenarios map[string]CopyScenario `json:"scenarios"` +} + +type CopyManager struct { + data CopyData +} + +func NewCopyManager() (*CopyManager, error) { + // Get the directory of the current file + _, filename, _, _ := runtime.Caller(0) + dir := filepath.Dir(filename) + + // Read the JSON file + filePath := filepath.Join(dir, "upgrade_suggestions.json") + + file, err := os.ReadFile(filePath) + if err != nil { + return nil, fmt.Errorf("failed to read copy data: %w", err) + } + + var data CopyData + if err := json.Unmarshal(file, &data); err != nil { + return nil, fmt.Errorf("failed to parse copy data: %w", err) + } + + return &CopyManager{data: data}, nil +} + +func (cm *CopyManager) GetCopy(scenario string, orgId string) (string, string, error) { + scenarioData, exists := cm.data.Scenarios[scenario] + if !exists { + return "", "", fmt.Errorf("unknown scenario: %s", scenario) + } + + variations := scenarioData.Variations + if len(variations) == 0 { + return "", "", fmt.Errorf("no variations found for scenario: %s", scenario) + } + + // Calculate total weight + totalWeight := 0 + for _, v := range variations { + totalWeight += v.Weight + } + + // Use orgId to select variation (simple modulo for now) + // This can be replaced with LaunchDarkly logic later + hash := 0 + for _, c := range orgId { + hash = (hash + int(c)) % totalWeight + } + + // Select variation based on hash + currentWeight := 0 + for _, v := range variations { + currentWeight += v.Weight + if hash < currentWeight { + return v.Content, v.CTA, nil + } + } + + // Fallback to first variation + return variations[0].Content, variations[0].CTA, nil +} + +func (cm *CopyManager) FormatCopy(content, cta, id string) string { + formattedContent := content + formattedCTA := strings.ReplaceAll(cta, "{{id}}", id) + return formattedContent + "\n\n" + formattedCTA +} diff --git a/internal/kafka/upgrade_suggestions.json b/internal/kafka/upgrade_suggestions.json new file mode 100644 index 0000000000..88bda11582 --- /dev/null +++ b/internal/kafka/upgrade_suggestions.json @@ -0,0 +1,22 @@ +{ + "scenarios": { + "cluster_upgrade_basic_to_standard": { + "variations": [ + { + "weight": 1, + "content": "NOTICE: Basic tier selected. Limitations include:\n- No high availability guarantees\n- Limited performance during peak loads\n- Basic support SLA (24hr response)\n\nFor production workloads, 'Standard' tier provides HA, performance scaling, and enterprise SLA.", + "cta": "To upgrade to the Standard Tier execute the following command:\nconfluent kafka cluster update {{id}} --type standard" + } + ] + }, + "stream_governance_upgrade_essentials_to_advanced": { + "variations": [ + { + "weight": 1, + "content": "NOTICE: Stream Governance Essentials selected. Limitations include:\n- Limited schema compatibility checks\n- Basic schema management\n- No schema validation\n\nFor production workloads, 'Advanced' tier provides full schema compatibility, advanced validation, and enterprise-grade governance.", + "cta": "To upgrade to Stream Governance Advanced execute the following command:\nconfluent environment update {{id}} --governance-package advanced" + } + ] + } + } +} \ No newline at end of file From 4103daeac0c93093793cac916b07073c27f2977d Mon Sep 17 00:00:00 2001 From: Daniel Ayaz Date: Tue, 8 Apr 2025 11:18:57 -0500 Subject: [PATCH 2/4] Upgrade Suggestions added for upgrading from Basic to Standard kafka cluster --- internal/environment/command_create.go | 22 ---------------------- internal/kafka/command_cluster_create.go | 13 +++++++++++++ 2 files changed, 13 insertions(+), 22 deletions(-) diff --git a/internal/environment/command_create.go b/internal/environment/command_create.go index 11eba3d0d8..df3ea245dd 100644 --- a/internal/environment/command_create.go +++ b/internal/environment/command_create.go @@ -8,7 +8,6 @@ import ( orgv2 "github.com/confluentinc/ccloud-sdk-go-v2/org/v2" - "github.com/confluentinc/cli/v4/internal/kafka" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" "github.com/confluentinc/cli/v4/pkg/output" "github.com/confluentinc/cli/v4/pkg/utils" @@ -55,27 +54,6 @@ func (c *command) create(cmd *cobra.Command, args []string) error { return err } - // Show upgrade suggestion for Essentials package - if strings.ToLower(governancePackage) == "essentials" { - orgId := c.Context.GetCurrentOrganization() - if orgId == "" { - return nil - } - - copyManager, err := kafka.NewCopyManager() - if err != nil { - return nil - } - - content, cta, err := copyManager.GetCopy("stream_governance_upgrade_essentials_to_advanced", orgId) - if err != nil { - return nil - } - - formattedCopy := copyManager.FormatCopy(content, cta, environment.GetId()) - output.Println(c.Config.EnableColor, "\n"+formattedCopy) - } - c.Context.AddEnvironment(environment.GetId()) _ = c.Config.Save() diff --git a/internal/kafka/command_cluster_create.go b/internal/kafka/command_cluster_create.go index a74966e32e..b6246b2c3c 100644 --- a/internal/kafka/command_cluster_create.go +++ b/internal/kafka/command_cluster_create.go @@ -13,6 +13,7 @@ import ( pcmd "github.com/confluentinc/cli/v4/pkg/cmd" "github.com/confluentinc/cli/v4/pkg/errors" "github.com/confluentinc/cli/v4/pkg/examples" + "github.com/confluentinc/cli/v4/pkg/featureflags" "github.com/confluentinc/cli/v4/pkg/kafka" "github.com/confluentinc/cli/v4/pkg/output" "github.com/confluentinc/cli/v4/pkg/utils" @@ -75,6 +76,14 @@ func (c *clusterCommand) newCreateCommand() *cobra.Command { return cmd } +func (c *clusterCommand) isBasicToStandardUpgradeSuggestionEnabled() bool { + if c.Config.IsTest { + return true + } + ldClient := featureflags.GetCcloudLaunchDarklyClient(c.Context.PlatformName) + return featureflags.Manager.BoolVariation("cli.basic-to-standard-cluster-upgrade-suggestion", c.Context, ldClient, true, false) +} + func (c *clusterCommand) create(cmd *cobra.Command, args []string) error { cloud, err := cmd.Flags().GetString("cloud") if err != nil { @@ -184,6 +193,10 @@ func (c *clusterCommand) create(cmd *cobra.Command, args []string) error { return nil } + if !c.isBasicToStandardUpgradeSuggestionEnabled() { + return nil + } + copyManager, err := NewCopyManager() if err != nil { return nil From cb6a9f6f0a1a96f57f2e71fbcf6e653956b2bc13 Mon Sep 17 00:00:00 2001 From: Daniel Ayaz Date: Tue, 8 Apr 2025 11:32:43 -0500 Subject: [PATCH 3/4] Edits to simplify the copy manager for initial experiment --- internal/kafka/command_cluster_create.go | 2 +- internal/kafka/copy_manager.go | 37 +++++------------------- internal/kafka/upgrade_suggestions.json | 2 -- 3 files changed, 9 insertions(+), 32 deletions(-) diff --git a/internal/kafka/command_cluster_create.go b/internal/kafka/command_cluster_create.go index b6246b2c3c..55f490d8f0 100644 --- a/internal/kafka/command_cluster_create.go +++ b/internal/kafka/command_cluster_create.go @@ -202,7 +202,7 @@ func (c *clusterCommand) create(cmd *cobra.Command, args []string) error { return nil } - content, cta, err := copyManager.GetCopy("cluster_upgrade_basic_to_standard", orgId) + content, cta, err := copyManager.GetCopy("cluster_upgrade_basic_to_standard") if err != nil { return nil } diff --git a/internal/kafka/copy_manager.go b/internal/kafka/copy_manager.go index 65a1bf1a72..70c4a6084d 100644 --- a/internal/kafka/copy_manager.go +++ b/internal/kafka/copy_manager.go @@ -10,7 +10,6 @@ import ( ) type CopyVariation struct { - Weight int `json:"weight"` Content string `json:"content"` CTA string `json:"cta"` } @@ -29,7 +28,10 @@ type CopyManager struct { func NewCopyManager() (*CopyManager, error) { // Get the directory of the current file - _, filename, _, _ := runtime.Caller(0) + _, filename, _, ok := runtime.Caller(0) + if !ok { + return nil, fmt.Errorf("failed to get current file path") + } dir := filepath.Dir(filename) // Read the JSON file @@ -48,7 +50,7 @@ func NewCopyManager() (*CopyManager, error) { return &CopyManager{data: data}, nil } -func (cm *CopyManager) GetCopy(scenario string, orgId string) (string, string, error) { +func (cm *CopyManager) GetCopy(scenario string) (string, string, error) { scenarioData, exists := cm.data.Scenarios[scenario] if !exists { return "", "", fmt.Errorf("unknown scenario: %s", scenario) @@ -59,34 +61,11 @@ func (cm *CopyManager) GetCopy(scenario string, orgId string) (string, string, e return "", "", fmt.Errorf("no variations found for scenario: %s", scenario) } - // Calculate total weight - totalWeight := 0 - for _, v := range variations { - totalWeight += v.Weight - } - - // Use orgId to select variation (simple modulo for now) - // This can be replaced with LaunchDarkly logic later - hash := 0 - for _, c := range orgId { - hash = (hash + int(c)) % totalWeight - } - - // Select variation based on hash - currentWeight := 0 - for _, v := range variations { - currentWeight += v.Weight - if hash < currentWeight { - return v.Content, v.CTA, nil - } - } - - // Fallback to first variation + // For now, simply use the first variation + // In the future, we could add more sophisticated selection logic if needed return variations[0].Content, variations[0].CTA, nil } func (cm *CopyManager) FormatCopy(content, cta, id string) string { - formattedContent := content - formattedCTA := strings.ReplaceAll(cta, "{{id}}", id) - return formattedContent + "\n\n" + formattedCTA + return fmt.Sprintf("%s\n\n%s", content, strings.ReplaceAll(cta, "{{id}}", id)) } diff --git a/internal/kafka/upgrade_suggestions.json b/internal/kafka/upgrade_suggestions.json index 88bda11582..c0de2c8992 100644 --- a/internal/kafka/upgrade_suggestions.json +++ b/internal/kafka/upgrade_suggestions.json @@ -3,7 +3,6 @@ "cluster_upgrade_basic_to_standard": { "variations": [ { - "weight": 1, "content": "NOTICE: Basic tier selected. Limitations include:\n- No high availability guarantees\n- Limited performance during peak loads\n- Basic support SLA (24hr response)\n\nFor production workloads, 'Standard' tier provides HA, performance scaling, and enterprise SLA.", "cta": "To upgrade to the Standard Tier execute the following command:\nconfluent kafka cluster update {{id}} --type standard" } @@ -12,7 +11,6 @@ "stream_governance_upgrade_essentials_to_advanced": { "variations": [ { - "weight": 1, "content": "NOTICE: Stream Governance Essentials selected. Limitations include:\n- Limited schema compatibility checks\n- Basic schema management\n- No schema validation\n\nFor production workloads, 'Advanced' tier provides full schema compatibility, advanced validation, and enterprise-grade governance.", "cta": "To upgrade to Stream Governance Advanced execute the following command:\nconfluent environment update {{id}} --governance-package advanced" } From 89a45b74cfcb0162aa6aaf3b82851a5522b181b8 Mon Sep 17 00:00:00 2001 From: Daniel Ayaz Date: Tue, 8 Apr 2025 14:42:05 -0500 Subject: [PATCH 4/4] Edited feature flag id --- internal/kafka/command_cluster_create.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/kafka/command_cluster_create.go b/internal/kafka/command_cluster_create.go index 55f490d8f0..3a600295fe 100644 --- a/internal/kafka/command_cluster_create.go +++ b/internal/kafka/command_cluster_create.go @@ -81,7 +81,7 @@ func (c *clusterCommand) isBasicToStandardUpgradeSuggestionEnabled() bool { return true } ldClient := featureflags.GetCcloudLaunchDarklyClient(c.Context.PlatformName) - return featureflags.Manager.BoolVariation("cli.basic-to-standard-cluster-upgrade-suggestion", c.Context, ldClient, true, false) + return featureflags.Manager.BoolVariation("cli.basic_to_standard_cluster_upgrade_suggestion.enable", c.Context, ldClient, true, false) } func (c *clusterCommand) create(cmd *cobra.Command, args []string) error {