diff --git a/bin/ci-builder b/bin/ci-builder index 2fc03a500d925..7cd8c2bc3d648 100755 --- a/bin/ci-builder +++ b/bin/ci-builder @@ -236,6 +236,8 @@ case "$cmd" in --env CANARY_LOADTEST_PASSWORD --env CLOUDTEST_CLUSTER_DEFINITION_FILE --env COMMON_ANCESTOR_OVERRIDE + --env CONFLUENT_CLOUD_QA_CANARY_CSR_PASSWORD + --env CONFLUENT_CLOUD_QA_CANARY_CSR_USERNAME --env CONFLUENT_CLOUD_QA_CANARY_KAFKA_PASSWORD --env CONFLUENT_CLOUD_QA_CANARY_KAFKA_USERNAME --env AZURE_SERVICE_ACCOUNT_USERNAME @@ -257,6 +259,10 @@ case "$cmd" in --env PRODUCTION_ANALYTICS_USERNAME --env PRODUCTION_ANALYTICS_APP_PASSWORD --env PYPI_TOKEN + --env QA_CLUSTER_SPEC_SHEET_POSTGRES_HOSTNAME + --env QA_CLUSTER_SPEC_SHEET_POSTGRES_PASSWORD + --env QA_CLUSTER_SPEC_SHEET_MYSQL_HOSTNAME + --env QA_CLUSTER_SPEC_SHEET_MYSQL_PASSWORD --env RUST_MIN_STACK --env MZ_DEV_BUILD_SHA --env MZ_GHCR diff --git a/ci/release-qualification/pipeline.template.yml b/ci/release-qualification/pipeline.template.yml index 1ce7f231d5d91..b95f5b7350f02 100644 --- a/ci/release-qualification/pipeline.template.yml +++ b/ci/release-qualification/pipeline.template.yml @@ -543,7 +543,7 @@ steps: - ./ci/plugins/mzcompose: composition: cluster-spec-sheet run: default - args: [--cleanup, --target=cloud-production, cluster] + args: [--cleanup, --target=cloud-production, source_ingestion_strong] agents: queue: linux-aarch64-small diff --git a/test/cluster-spec-sheet/mzcompose.py b/test/cluster-spec-sheet/mzcompose.py index 6eb4c5447ca24..5ad6d39491940 100644 --- a/test/cluster-spec-sheet/mzcompose.py +++ b/test/cluster-spec-sheet/mzcompose.py @@ -88,6 +88,7 @@ SCENARIO_TPCH_QUERIES_STRONG = "tpch_queries_strong" SCENARIO_TPCH_QUERIES_WEAK = "tpch_queries_weak" SCENARIO_TPCH_STRONG = "tpch_strong" +SCENARIO_SOURCE_INGESTION_STRONG = "source_ingestion_strong" SCENARIO_QPS_ENVD_STRONG_SCALING = "qps_envd_strong_scaling" SCENARIOS_CLUSTERD = [ @@ -97,6 +98,7 @@ SCENARIO_TPCH_QUERIES_STRONG, SCENARIO_TPCH_QUERIES_WEAK, SCENARIO_TPCH_STRONG, + SCENARIO_SOURCE_INGESTION_STRONG, ] SCENARIOS_ENVIRONMENTD = [ SCENARIO_QPS_ENVD_STRONG_SCALING, @@ -1912,6 +1914,159 @@ def run(self, runner: ScenarioRunner) -> None: # We'll also want to measure latency, including tail latency. +class SourceIngestionScenario(Scenario): + def name(self) -> str: + return "source_ingestion" + + def materialize_views(self) -> list[str]: + return [] + + def setup(self) -> list[str]: + # External setup was done once: + # Postgres (RDS) + # create user materialize password '...'; + # create table tbl (customer_id int, region_id int, customer_name text, customer_email text, customer_phone text); + # \set N 50000000 + # set synchronous_commit = off; + # insert into tbl + # select + # gs::int as customer_id, + # (1 + (random()*999)::int) as region_id, + # 'Customer ' || gs as customer_name, + # 'customer' || gs || '@example.com' as customer_email, + # lpad((random()*10000000000)::bigint::text, 10, '0') as customer_phone + # from generate_series(1, :N) as gs; + # analyze tbl; + + # MySQL (RDS) + # CREATE USER 'materialize'@'%' IDENTIFIED BY '...'; + # create table tbl (customer_id int, region_id int, customer_name text, customer_email text, customer_phone text); + # INSERT INTO tbl + # SELECT + # n AS customer_id, + # 1 + (RAND() * 999) AS region_id, + # CONCAT('Customer ', n) AS customer_name, + # CONCAT('customer', n, '@example.com') AS customer_email, + # LPAD(FLOOR(RAND() * 10000000000), 10, '0') AS customer_phone + # FROM ( + # SELECT (a.n + b.n*10 + c.n*100 + d.n*1000 + e.n*10000 + f.n*100000 + g.n*1000000 + h.n*10000000) AS n + # FROM (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 + # UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) a + # CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 + # UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) b + # CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 + # UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) c + # CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 + # UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) d + # CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 + # UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) e + # CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 + # UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) f + # CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 + # UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) g + # CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 + # UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) h + # ) nums + # WHERE n BETWEEN 1 AND 50000000; + + # Kafka (Confluent) + # node /usr/local/bin/datagen -f avro -n 50000000 -w 0 -p qa_cluster_spec_sheet -s table.json + # cat table.json + # [ + # { + # "_meta": { + # "topic": "table", + # "key": "customer_id" + # }, + # "customer_id": "iteration.index", + # "region_id": "faker.number.int({ min: 1, max: 4 })", + # "customer_name": "faker.person.fullName()", + # "customer_email": "faker.internet.email()", + # "customer_phone": "faker.phone.number()" + # } + # ] + + postgres_hostname = os.environ[ + "QA_CLUSTER_SPEC_SHEET_POSTGRES_HOSTNAME" + ].replace("%", "%%") + postgres_password = os.environ[ + "QA_CLUSTER_SPEC_SHEET_POSTGRES_PASSWORD" + ].replace("%", "%%") + mysql_hostname = os.environ["QA_CLUSTER_SPEC_SHEET_MYSQL_HOSTNAME"].replace( + "%", "%%" + ) + mysql_password = os.environ["QA_CLUSTER_SPEC_SHEET_MYSQL_PASSWORD"].replace( + "%", "%%" + ) + kafka_username = os.environ["CONFLUENT_CLOUD_QA_CANARY_KAFKA_USERNAME"].replace( + "%", "%%" + ) + kafka_password = os.environ["CONFLUENT_CLOUD_QA_CANARY_KAFKA_PASSWORD"].replace( + "%", "%%" + ) + csr_username = os.environ["CONFLUENT_CLOUD_QA_CANARY_CSR_USERNAME"].replace( + "%", "%%" + ) + csr_password = os.environ["CONFLUENT_CLOUD_QA_CANARY_CSR_PASSWORD"].replace( + "%", "%%" + ) + return [ + "DROP CONNECTION IF EXISTS pg_conn CASCADE;", + "DROP CONNECTION IF EXISTS mysql_conn CASCADE;", + "DROP CONNECTION IF EXISTS kafka_conn CASCADE;", + "DROP CONNECTION IF EXISTS csr_conn CASCADE;", + "DROP CLUSTER IF EXISTS ingest_cluster CASCADE;", + f"CREATE CLUSTER ingest_cluster SIZE '{self.replica_size}';", + f"CREATE SECRET IF NOT EXISTS pgpass AS '{postgres_password}';", + f"CREATE CONNECTION pg_conn TO postgres (HOST '{postgres_hostname}', PORT 5432, USER materialize, PASSWORD SECRET pgpass, SSL MODE 'require', DATABASE 'postgres');", + f"CREATE SECRET IF NOT EXISTS mysqlpass AS '{mysql_password}';", + f"CREATE CONNECTION mysql_conn TO MYSQL (HOST '{mysql_hostname}', PORT 3306, USER 'materialize', PASSWORD SECRET mysqlpass, SSL MODE REQUIRED);", + f"CREATE SECRET IF NOT EXISTS kafka_username AS '{kafka_username}';", + f"CREATE SECRET IF NOT EXISTS kafka_password AS '{kafka_password}';", + "CREATE CONNECTION kafka_conn TO KAFKA (BROKER 'pkc-oxqxx9.us-east-1.aws.confluent.cloud:9092', SASL MECHANISMS = 'PLAIN', SASL USERNAME = SECRET kafka_username, SASL PASSWORD = SECRET kafka_password);", + f"CREATE SECRET IF NOT EXISTS csr_username AS '{csr_username}';", + f"CREATE SECRET IF NOT EXISTS csr_password AS '{csr_password}';", + "CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (URL 'https://psrc-e0919.us-east-2.aws.confluent.cloud', USERNAME = SECRET csr_username, PASSWORD = SECRET csr_password);", + ] + + def drop(self) -> list[str]: + return ["DROP CLUSTER IF EXISTS ingest_cluster CASCADE;"] + + def run(self, runner: ScenarioRunner) -> None: + runner.measure( + "hydration", + "postgres", + setup=["DROP SOURCE IF EXISTS pg_source CASCADE;"], + query=[ + "CREATE SOURCE pg_source IN CLUSTER ingest_cluster FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'mz_source');", + "CREATE TABLE pg_table FROM SOURCE qa_cluster_spec_sheet_pg_source (REFERENCE tbl);", + "SELECT count(*) FROM pg_table;", + ], + ) + + runner.measure( + "hydration", + "mysql", + setup=["DROP SOURCE IF EXISTS mysql_source CASCADE;"], + query=[ + "CREATE SOURCE mysql_source IN CLUSTER ingest_cluster FROM MYSQL CONNECTION mysql_conn;", + "CREATE TABLE mysql_table FROM SOURCE mysql_source (REFERENCE admin.tbl);", + "SELECT count(*) FROM mysql_table;", + ], + ) + + runner.measure( + "hydration", + "kafka", + setup=["DROP SOURCE IF EXISTS kafka_source CASCADE;"], + query=[ + "CREATE SOURCE kafka_source IN CLUSTER ingest_cluster FROM KAFKA CONNECTION kafka_conn (TOPIC 'qa_cluster_spec_sheet_table');", + "CREATE TABLE kafka_table FROM SOURCE kafka_source;", + "SELECT count(*) FROM kafka_table;", + ], + ) + + # TODO: We should factor out the below # `disable_region`, `cloud_disable_enable_and_wait`, `reconfigure_envd_cpus`, `wait_for_envd` # functions into a separate module. (Similar `disable_region` functions also occur in other tests.) @@ -2286,6 +2441,17 @@ def process(scenario: str) -> None: target=target, max_scale=max_scale, ) + if scenario == SCENARIO_SOURCE_INGESTION_STRONG: + print("--- SCENARIO: Running Source ingestion scaling") + run_scenario_strong( + scenario=SourceIngestionScenario( + args.scale_auction, target.replica_size_for_scale(1) + ), + results_writer=cluster_writer, + connection=conn, + target=target, + max_scale=max_scale, + ) if scenario == SCENARIO_QPS_ENVD_STRONG_SCALING: print("--- SCENARIO: Running QPS envd strong scaling") run_scenario_envd_strong_scaling(