Skip to content

chore(spark): migrate SDK to kubeflow_spark_api Pydantic models#295

Open
tariq-hasan wants to merge 5 commits intokubeflow:mainfrom
tariq-hasan:refactor/spark-pypi-models
Open

chore(spark): migrate SDK to kubeflow_spark_api Pydantic models#295
tariq-hasan wants to merge 5 commits intokubeflow:mainfrom
tariq-hasan:refactor/spark-pypi-models

Conversation

@tariq-hasan
Copy link
Contributor

What this PR does / why we need it:

This PR migrates the Spark SDK from constructing CRDs using raw dictionaries to using the typed Pydantic models provided by kubeflow_spark_api. There are no user-facing API changes in this PR.

What changed:

  • Replace raw dict-based CRD construction with typed Pydantic models from kubeflow_spark_api
  • Convert to dict only at the Kubernetes API boundary
  • Parse API responses using .from_dict() instead of manual extraction
  • Keep user-facing dataclasses (Driver, Executor, SparkConnectInfo) unchanged

Why:

  • Improves type safety within the SDK
  • Aligns Spark with the established Trainer/Optimizer architecture

Testing:
Tested against kubeflow_spark_api==2.4.0rc0.

Which issue(s) this PR fixes (optional, in Fixes #<issue number>, #<issue number>, ... format, will close the issue(s) when PR gets merged):

Fixes #271

Checklist:

  • Docs included if any changes are user facing

Copilot AI review requested due to automatic review settings February 15, 2026 21:50
@github-actions
Copy link
Contributor

🎉 Welcome to the Kubeflow SDK! 🎉

Thanks for opening your first PR! We're happy to have you as part of our community 🚀

Here's what happens next:

  • If you haven't already, please check out our Contributing Guide for repo-specific guidelines and the Kubeflow Contributor Guide for general community standards
  • Our team will review your PR soon! cc @kubeflow/kubeflow-sdk-team

Join the community:

Feel free to ask questions in the comments if you need any help or clarification!
Thanks again for contributing to Kubeflow! 🙏

@google-oss-prow
Copy link
Contributor

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign andreyvelich for approval. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coveralls
Copy link

coveralls commented Feb 15, 2026

Pull Request Test Coverage Report for Build 22043884336

Details

  • 215 of 218 (98.62%) changed or added relevant lines in 6 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage increased (+0.06%) to 72.887%

Changes Missing Coverage Covered Lines Changed/Added Lines %
kubeflow/spark/backends/kubernetes/utils.py 48 51 94.12%
Totals Coverage Status
Change from base Build 21955571005: 0.06%
Covered Lines: 4105
Relevant Lines: 5632

💛 - Coveralls

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the Spark SDK to use typed Pydantic models from kubeflow_spark_api instead of raw dictionaries for CRD construction. This aligns the Spark SDK with the established architecture pattern used by the Trainer SDK.

Changes:

  • Added kubeflow-spark-api>=2.3.0 dependency and migrated from dict-based CRD construction to typed Pydantic models
  • Updated all option implementations to work with Pydantic models instead of dictionaries
  • Refactored backend methods to convert to/from Pydantic models at the Kubernetes API boundary

Reviewed changes

Copilot reviewed 8 out of 9 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
pyproject.toml Added kubeflow-spark-api>=2.3.0 dependency
uv.lock Lock file updates for new kubeflow-spark-api dependency
kubeflow/spark/backends/kubernetes/backend.py Convert between dict and Pydantic models at API boundary using .to_dict() and .from_dict()
kubeflow/spark/backends/kubernetes/utils.py Refactored build_spark_connect_crd to return Pydantic model; renamed parse_spark_connect_status to get_spark_connect_info_from_cr with Pydantic input
kubeflow/spark/types/options.py Updated all option callables to accept SparkConnect Pydantic model instead of dict
kubeflow/spark/backends/kubernetes/backend_test.py Enhanced mock responses to include all required fields for Pydantic model validation
kubeflow/spark/backends/kubernetes/utils_test.py Updated tests to work with Pydantic models and added validation test for invalid CR
kubeflow/spark/types/options_test.py Migrated tests to use spark_connect_model fixture and verify Pydantic model attributes
hack/Dockerfile.spark-e2e-runner Added --pre flag to allow installation of pre-release versions

role_spec.template = models.IoK8sApiCoreV1PodTemplateSpec()

# Convert existing template to dict, merge, and convert back
existing_dict = role_spec.template.to_dict() if role_spec.template else {}
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant None check in ternary expression. Since role_spec.template is guaranteed to be non-None after line 193, the ternary expression role_spec.template.to_dict() if role_spec.template else {} can be simplified to role_spec.template.to_dict().

Suggested change
existing_dict = role_spec.template.to_dict() if role_spec.template else {}
existing_dict = role_spec.template.to_dict()

Copilot uses AI. Check for mistakes.
@tariq-hasan tariq-hasan changed the title refactor(spark): migrate SDK to kubeflow_spark_api Pydantic models chore(spark): migrate SDK to kubeflow_spark_api Pydantic models Feb 15, 2026
@tariq-hasan tariq-hasan force-pushed the refactor/spark-pypi-models branch from 2df7db9 to 255b2ad Compare February 15, 2026 22:03
@andreyvelich
Copy link
Member

/assign @Shekharrajak
Please can you help with the review?

Copy link
Member

@andreyvelich andreyvelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tariq-hasan!
Overall, looks great.
cc @kubeflow/kubeflow-sdk-team @Fiona-Waters @abhijeet-dhumal @jaiakash

@@ -9,7 +9,7 @@ COPY pyproject.toml README.md LICENSE ./
COPY kubeflow/ kubeflow/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove these filters and run Spark E2E tests for every PR?
https://github.com/tariq-hasan/sdk/blob/255b2ad2e3f953b3aa78deebd4b20a137eb0667c/.github/workflows/test-spark-examples.yaml#L4-L12

It should be fine to run them on every PR, like we do for other tests.
For example, we didn't trigger Spark tests when PySpark dependency is updated: #300

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I have removed the paths.

Comment on lines 71 to 87
base_response = {
"apiVersion": f"{constants.SPARK_CONNECT_GROUP}/{constants.SPARK_CONNECT_VERSION}",
"kind": constants.SPARK_CONNECT_KIND,
"spec": {
"sparkVersion": constants.DEFAULT_SPARK_VERSION,
"image": constants.DEFAULT_SPARK_IMAGE,
"server": {
"cores": constants.DEFAULT_DRIVER_CPU,
"memory": constants.DEFAULT_DRIVER_MEMORY,
},
"executor": {
"instances": 2,
"cores": constants.DEFAULT_EXECUTOR_CPU,
"memory": constants.DEFAULT_EXECUTOR_MEMORY,
},
},
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@tariq-hasan tariq-hasan Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have introduced the get_spark_connect function to return a typed model in place of the dict-based approach.

Comment on lines 116 to 128
"""
base_spec = {
"sparkVersion": constants.DEFAULT_SPARK_VERSION,
"image": constants.DEFAULT_SPARK_IMAGE,
"server": {
"cores": constants.DEFAULT_DRIVER_CPU,
"memory": constants.DEFAULT_DRIVER_MEMORY,
},
"executor": {
"instances": 2,
"cores": constants.DEFAULT_EXECUTOR_CPU,
"memory": constants.DEFAULT_EXECUTOR_MEMORY,
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same suggestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The get_spark_connect function is used here now as well.

API ExecutorSpec model.
"""
# Determine number of instances
if executor and executor.num_instances is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if executor and executor.num_instances is not None:
if executor and executor.num_instances:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the code to remove is not None.

# Determine number of instances
if executor and executor.num_instances is not None:
instances = executor.num_instances
elif num_executors is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
elif num_executors is not None:
elif num_executors:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the code here as well to remove is not None.

Comment on lines 183 to 184
@@ -99,8 +184,8 @@ def build_spark_connect_crd(
executor: Optional[Executor] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we should remove driver and executor spec from the first released version, and extend it later.
@Shekharrajak Do you have any particular use-case when users want to set it?

Suggested change
executor: Optional[Executor] = None,

name=metadata.get("name", ""),
namespace=metadata.get("namespace", ""),
name=spark_connect_cr.metadata.name,
namespace=spark_connect_cr.metadata.namespace or "",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

namespace cannot be none

Suggested change
namespace=spark_connect_cr.metadata.namespace or "",
namespace=spark_connect_cr.metadata.namespace,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made the change.

Comment on lines 150 to 152
crd = spark_connect.to_dict()
assert crd["spec"]["executor"]["cores"] == 2
assert crd["spec"]["executor"]["memory"] == "4g"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you refactor these tests to just access the object fields, you don't need to run to_dict(), e.g.

assert crd.spec.executor.cores == 2

Copy link
Contributor Author

@tariq-hasan tariq-hasan Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I have made the change to use the spark_connect typed model directly and removed the to_dict() conversion.

"pydantic>=2.10.0",
"kubeflow-trainer-api>=2.0.0",
"kubeflow-katib-api>=0.19.0",
"kubeflow-spark-api>=2.3.0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add this to the spark extras, alongside pyspark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added it there as well.

Signed-off-by: tariq-hasan <mmtariquehsn@gmail.com>
Signed-off-by: tariq-hasan <mmtariquehsn@gmail.com>
Signed-off-by: tariq-hasan <mmtariquehsn@gmail.com>
Signed-off-by: tariq-hasan <mmtariquehsn@gmail.com>
Signed-off-by: tariq-hasan <mmtariquehsn@gmail.com>
@tariq-hasan tariq-hasan force-pushed the refactor/spark-pypi-models branch from 255b2ad to 740fbf4 Compare February 26, 2026 12:25
@tariq-hasan
Copy link
Contributor Author

I have rebased the PR as well to account for the changes coming from #288.

)


def build_spark_connect_crd(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def build_spark_connect_crd(
def build_spark_connect_cr(

) -> dict[str, Any]:
"""Build SparkConnect CRD manifest (KEP-107 compliant).
) -> models.SparkV1alpha1SparkConnect:
"""Build SparkConnect CRD using typed API models (KEP-107 compliant).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Build SparkConnect CRD using typed API models (KEP-107 compliant).
"""Build SparkConnect CR using typed API models (KEP-107 compliant).


Returns:
SparkConnect CRD as dictionary.
SparkConnect CRD as typed Pydantic model.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SparkConnect CRD as typed Pydantic model.
SparkConnect CR as typed Pydantic model.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

refactor(spark): Re-use Spark Python models from PyPI instead of custom dataclasses

6 participants