Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bin/ci-builder
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ case "$cmd" in
--env CANARY_LOADTEST_PASSWORD
--env CLOUDTEST_CLUSTER_DEFINITION_FILE
--env COMMON_ANCESTOR_OVERRIDE
--env CONFLUENT_CLOUD_QA_CANARY_CSR_PASSWORD
--env CONFLUENT_CLOUD_QA_CANARY_CSR_USERNAME
--env CONFLUENT_CLOUD_QA_CANARY_KAFKA_PASSWORD
--env CONFLUENT_CLOUD_QA_CANARY_KAFKA_USERNAME
--env AZURE_SERVICE_ACCOUNT_USERNAME
Expand All @@ -257,6 +259,10 @@ case "$cmd" in
--env PRODUCTION_ANALYTICS_USERNAME
--env PRODUCTION_ANALYTICS_APP_PASSWORD
--env PYPI_TOKEN
--env QA_CLUSTER_SPEC_SHEET_POSTGRES_HOSTNAME
--env QA_CLUSTER_SPEC_SHEET_POSTGRES_PASSWORD
--env QA_CLUSTER_SPEC_SHEET_MYSQL_HOSTNAME
--env QA_CLUSTER_SPEC_SHEET_MYSQL_PASSWORD
--env RUST_MIN_STACK
--env MZ_DEV_BUILD_SHA
--env MZ_GHCR
Expand Down
2 changes: 1 addition & 1 deletion ci/release-qualification/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ steps:
- ./ci/plugins/mzcompose:
composition: cluster-spec-sheet
run: default
args: [--cleanup, --target=cloud-production, cluster]
args: [--cleanup, --target=cloud-production, source_ingestion_strong]
agents:
queue: linux-aarch64-small

Expand Down
166 changes: 166 additions & 0 deletions test/cluster-spec-sheet/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
SCENARIO_TPCH_QUERIES_STRONG = "tpch_queries_strong"
SCENARIO_TPCH_QUERIES_WEAK = "tpch_queries_weak"
SCENARIO_TPCH_STRONG = "tpch_strong"
SCENARIO_SOURCE_INGESTION_STRONG = "source_ingestion_strong"
SCENARIO_QPS_ENVD_STRONG_SCALING = "qps_envd_strong_scaling"

SCENARIOS_CLUSTERD = [
Expand All @@ -97,6 +98,7 @@
SCENARIO_TPCH_QUERIES_STRONG,
SCENARIO_TPCH_QUERIES_WEAK,
SCENARIO_TPCH_STRONG,
SCENARIO_SOURCE_INGESTION_STRONG,
]
SCENARIOS_ENVIRONMENTD = [
SCENARIO_QPS_ENVD_STRONG_SCALING,
Expand Down Expand Up @@ -1912,6 +1914,159 @@ def run(self, runner: ScenarioRunner) -> None:
# We'll also want to measure latency, including tail latency.


class SourceIngestionScenario(Scenario):
def name(self) -> str:
return "source_ingestion"

def materialize_views(self) -> list[str]:
return []

def setup(self) -> list[str]:
# External setup was done once:
# Postgres (RDS)
# create user materialize password '...';
# create table tbl (customer_id int, region_id int, customer_name text, customer_email text, customer_phone text);
# \set N 50000000
# set synchronous_commit = off;
# insert into tbl
# select
# gs::int as customer_id,
# (1 + (random()*999)::int) as region_id,
# 'Customer ' || gs as customer_name,
# 'customer' || gs || '@example.com' as customer_email,
# lpad((random()*10000000000)::bigint::text, 10, '0') as customer_phone
# from generate_series(1, :N) as gs;
# analyze tbl;

# MySQL (RDS)
# CREATE USER 'materialize'@'%' IDENTIFIED BY '...';
# create table tbl (customer_id int, region_id int, customer_name text, customer_email text, customer_phone text);
# INSERT INTO tbl
# SELECT
# n AS customer_id,
# 1 + (RAND() * 999) AS region_id,
# CONCAT('Customer ', n) AS customer_name,
# CONCAT('customer', n, '@example.com') AS customer_email,
# LPAD(FLOOR(RAND() * 10000000000), 10, '0') AS customer_phone
# FROM (
# SELECT (a.n + b.n*10 + c.n*100 + d.n*1000 + e.n*10000 + f.n*100000 + g.n*1000000 + h.n*10000000) AS n
# FROM (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4
# UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) a
# CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4
# UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) b
# CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4
# UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) c
# CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4
# UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) d
# CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4
# UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) e
# CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4
# UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) f
# CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4
# UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) g
# CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4
# UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) h
# ) nums
# WHERE n BETWEEN 1 AND 50000000;

# Kafka (Confluent)
# node /usr/local/bin/datagen -f avro -n 50000000 -w 0 -p qa_cluster_spec_sheet -s table.json
# cat table.json
# [
# {
# "_meta": {
# "topic": "table",
# "key": "customer_id"
# },
# "customer_id": "iteration.index",
# "region_id": "faker.number.int({ min: 1, max: 4 })",
# "customer_name": "faker.person.fullName()",
# "customer_email": "faker.internet.email()",
# "customer_phone": "faker.phone.number()"
# }
# ]

postgres_hostname = os.environ[
"QA_CLUSTER_SPEC_SHEET_POSTGRES_HOSTNAME"
].replace("%", "%%")
postgres_password = os.environ[
"QA_CLUSTER_SPEC_SHEET_POSTGRES_PASSWORD"
].replace("%", "%%")
mysql_hostname = os.environ["QA_CLUSTER_SPEC_SHEET_MYSQL_HOSTNAME"].replace(
"%", "%%"
)
mysql_password = os.environ["QA_CLUSTER_SPEC_SHEET_MYSQL_PASSWORD"].replace(
"%", "%%"
)
kafka_username = os.environ["CONFLUENT_CLOUD_QA_CANARY_KAFKA_USERNAME"].replace(
"%", "%%"
)
kafka_password = os.environ["CONFLUENT_CLOUD_QA_CANARY_KAFKA_PASSWORD"].replace(
"%", "%%"
)
csr_username = os.environ["CONFLUENT_CLOUD_QA_CANARY_CSR_USERNAME"].replace(
"%", "%%"
)
csr_password = os.environ["CONFLUENT_CLOUD_QA_CANARY_CSR_PASSWORD"].replace(
"%", "%%"
)
return [
"DROP CONNECTION IF EXISTS pg_conn CASCADE;",
"DROP CONNECTION IF EXISTS mysql_conn CASCADE;",
"DROP CONNECTION IF EXISTS kafka_conn CASCADE;",
"DROP CONNECTION IF EXISTS csr_conn CASCADE;",
"DROP CLUSTER IF EXISTS ingest_cluster CASCADE;",
f"CREATE CLUSTER ingest_cluster SIZE '{self.replica_size}';",
f"CREATE SECRET IF NOT EXISTS pgpass AS '{postgres_password}';",
f"CREATE CONNECTION pg_conn TO postgres (HOST '{postgres_hostname}', PORT 5432, USER materialize, PASSWORD SECRET pgpass, SSL MODE 'require', DATABASE 'postgres');",
f"CREATE SECRET IF NOT EXISTS mysqlpass AS '{mysql_password}';",
f"CREATE CONNECTION mysql_conn TO MYSQL (HOST '{mysql_hostname}', PORT 3306, USER 'materialize', PASSWORD SECRET mysqlpass, SSL MODE REQUIRED);",
f"CREATE SECRET IF NOT EXISTS kafka_username AS '{kafka_username}';",
f"CREATE SECRET IF NOT EXISTS kafka_password AS '{kafka_password}';",
"CREATE CONNECTION kafka_conn TO KAFKA (BROKER 'pkc-oxqxx9.us-east-1.aws.confluent.cloud:9092', SASL MECHANISMS = 'PLAIN', SASL USERNAME = SECRET kafka_username, SASL PASSWORD = SECRET kafka_password);",
f"CREATE SECRET IF NOT EXISTS csr_username AS '{csr_username}';",
f"CREATE SECRET IF NOT EXISTS csr_password AS '{csr_password}';",
"CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (URL 'https://psrc-e0919.us-east-2.aws.confluent.cloud', USERNAME = SECRET csr_username, PASSWORD = SECRET csr_password);",
]

def drop(self) -> list[str]:
return ["DROP CLUSTER IF EXISTS ingest_cluster CASCADE;"]

def run(self, runner: ScenarioRunner) -> None:
runner.measure(
"hydration",
"postgres",
setup=["DROP SOURCE IF EXISTS pg_source CASCADE;"],
query=[
"CREATE SOURCE pg_source IN CLUSTER ingest_cluster FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'mz_source');",
"CREATE TABLE pg_table FROM SOURCE qa_cluster_spec_sheet_pg_source (REFERENCE tbl);",
"SELECT count(*) FROM pg_table;",
],
)

runner.measure(
"hydration",
"mysql",
setup=["DROP SOURCE IF EXISTS mysql_source CASCADE;"],
query=[
"CREATE SOURCE mysql_source IN CLUSTER ingest_cluster FROM MYSQL CONNECTION mysql_conn;",
"CREATE TABLE mysql_table FROM SOURCE mysql_source (REFERENCE admin.tbl);",
"SELECT count(*) FROM mysql_table;",
],
)

runner.measure(
"hydration",
"kafka",
setup=["DROP SOURCE IF EXISTS kafka_source CASCADE;"],
query=[
"CREATE SOURCE kafka_source IN CLUSTER ingest_cluster FROM KAFKA CONNECTION kafka_conn (TOPIC 'qa_cluster_spec_sheet_table');",
"CREATE TABLE kafka_table FROM SOURCE kafka_source;",
"SELECT count(*) FROM kafka_table;",
],
)


# TODO: We should factor out the below
# `disable_region`, `cloud_disable_enable_and_wait`, `reconfigure_envd_cpus`, `wait_for_envd`
# functions into a separate module. (Similar `disable_region` functions also occur in other tests.)
Expand Down Expand Up @@ -2286,6 +2441,17 @@ def process(scenario: str) -> None:
target=target,
max_scale=max_scale,
)
if scenario == SCENARIO_SOURCE_INGESTION_STRONG:
print("--- SCENARIO: Running Source ingestion scaling")
run_scenario_strong(
scenario=SourceIngestionScenario(
args.scale_auction, target.replica_size_for_scale(1)
),
results_writer=cluster_writer,
connection=conn,
target=target,
max_scale=max_scale,
)
if scenario == SCENARIO_QPS_ENVD_STRONG_SCALING:
print("--- SCENARIO: Running QPS envd strong scaling")
run_scenario_envd_strong_scaling(
Expand Down