From f6da6e7cc693ca47413bf8db21e9b684ea9a108a Mon Sep 17 00:00:00 2001 From: Darren Bolduc Date: Tue, 30 Dec 2025 11:26:50 -0500 Subject: [PATCH 1/3] impl(pubsub): subscriber client (without rpcs) --- src/pubsub/Cargo.toml | 2 +- src/pubsub/src/subscriber.rs | 2 + src/pubsub/src/subscriber/client.rs | 90 +++++++++++++++ src/pubsub/src/subscriber/client_builder.rs | 122 ++++++++++++++++++++ src/pubsub/src/subscriber/transport.rs | 2 +- 5 files changed, 216 insertions(+), 2 deletions(-) create mode 100644 src/pubsub/src/subscriber/client.rs create mode 100644 src/pubsub/src/subscriber/client_builder.rs diff --git a/src/pubsub/Cargo.toml b/src/pubsub/Cargo.toml index 1723311a21..37b5b2a015 100644 --- a/src/pubsub/Cargo.toml +++ b/src/pubsub/Cargo.toml @@ -27,6 +27,7 @@ rust-version.workspace = true [dependencies] async-trait.workspace = true +auth.workspace = true bytes.workspace = true futures.workspace = true gax.workspace = true @@ -48,7 +49,6 @@ tracing.workspace = true wkt.workspace = true [dev-dependencies] -auth.workspace = true anyhow.workspace = true mockall.workspace = true tokio-test.workspace = true diff --git a/src/pubsub/src/subscriber.rs b/src/pubsub/src/subscriber.rs index 7a3f405b7e..8e63316cdf 100644 --- a/src/pubsub/src/subscriber.rs +++ b/src/pubsub/src/subscriber.rs @@ -13,6 +13,8 @@ // limitations under the License. pub(crate) mod builder; +pub(crate) mod client; +pub(crate) mod client_builder; pub(crate) mod handler; pub(crate) mod keepalive; pub(crate) mod lease_loop; diff --git a/src/pubsub/src/subscriber/client.rs b/src/pubsub/src/subscriber/client.rs new file mode 100644 index 0000000000..0ee2b2f6ff --- /dev/null +++ b/src/pubsub/src/subscriber/client.rs @@ -0,0 +1,90 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::client_builder::ClientBuilder; +use super::transport::Transport; +use gax::client_builder::Result as BuilderResult; +use std::sync::Arc; + +/// A Subscriber client for the [Cloud Pub/Sub] API. +/// +/// Use this client to receive messages from a [pull subscription] on a topic. +/// +// TODO(#3941) - add basic subscribe example +// +/// # Configuration +/// +/// To configure a `Subscriber` use the `with_*` methods in the type returned by +/// [builder()][Storage::builder]. The default configuration should work for +/// most applications. Common configuration changes include: +/// +/// * [with_endpoint()]: by default this client uses the global default endpoint +/// (`https://pubsub.googleapis.com`). Applications using regional endpoints +/// or running in restricted networks (e.g. a network configured with +/// [Private Google Access with VPC Service Controls]) may want to override +/// this default. +/// * [with_credentials()]: by default this client uses +/// [Application Default Credentials]. Applications using custom +/// authentication may need to override this default. +/// +/// # Pooling and Cloning +/// +/// `Subscriber` holds a connection pool internally, it is advised to +/// create one and then reuse it. You do not need to wrap `Subscriber` in +/// an [Rc](std::rc::Rc) or [Arc] to reuse it, because it already uses an `Arc` +/// internally. +/// +/// [application default credentials]: https://cloud.google.com/docs/authentication#adc +/// [cloud pub/sub]: https://docs.cloud.google.com/pubsub/docs/overview +/// [private google access with vpc service controls]: https://cloud.google.com/vpc-service-controls/docs/private-connectivity +/// [pull subscription]: https://docs.cloud.google.com/pubsub/docs/pull +/// [with_endpoint()]: ClientBuilder::with_endpoint +/// [with_credentials()]: ClientBuilder::with_credentials +#[derive(Clone, Debug)] +pub struct Subscriber { + inner: Arc, +} + +impl Subscriber { + /// Returns a builder for [Subscriber]. + /// + /// # Example + /// ```no_rust + /// # use google_cloud_pubsub::client::Subscriber; + /// # async fn sample() -> anyhow::Result<()> { + /// let client = Subscriber::builder().build().await?; + /// # Ok(()) } + /// ``` + pub fn builder() -> ClientBuilder { + ClientBuilder::new() + } + + pub(crate) async fn new(builder: ClientBuilder) -> BuilderResult { + let transport = Transport::new(builder.config).await?; + Ok(Self { + inner: Arc::new(transport), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn basic() -> anyhow::Result<()> { + let _ = Subscriber::builder().build().await?; + Ok(()) + } +} diff --git a/src/pubsub/src/subscriber/client_builder.rs b/src/pubsub/src/subscriber/client_builder.rs new file mode 100644 index 0000000000..e32ff65ba6 --- /dev/null +++ b/src/pubsub/src/subscriber/client_builder.rs @@ -0,0 +1,122 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::client::Subscriber; +use gax::client_builder::Result as BuilderResult; +use gaxi::options::ClientConfig; + +/// A builder for [Subscriber]. +/// +/// ```no_rust +/// # use google_cloud_pubsub::client::Subscriber; +/// # async fn sample() -> anyhow::Result<()> { +/// let builder = Subscriber::builder(); +/// let client = builder +/// .with_endpoint("https://pubsub.googleapis.com") +/// .build() +/// .await?; +/// # Ok(()) } +/// ``` +pub struct ClientBuilder { + pub(crate) config: ClientConfig, +} + +impl ClientBuilder { + pub(crate) fn new() -> Self { + Self { + config: ClientConfig::default(), + } + } + + /// Creates a new client. + /// + /// # Example + /// ```no_rust + /// # use google_cloud_pubsub::client::Subscriber; + /// # async fn sample() -> anyhow::Result<()> { + /// let client = Subscriber::builder().build().await?; + /// # Ok(()) } + /// ``` + pub async fn build(self) -> BuilderResult { + Subscriber::new(self).await + } + + /// Sets the endpoint. + /// + /// # Example + /// ```no_rust + /// # use google_cloud_pubsub::client::Subscriber; + /// # async fn sample() -> anyhow::Result<()> { + /// let client = Subscriber::builder() + /// .with_endpoint("https://private.googleapis.com") + /// .build() + /// .await?; + /// # Ok(()) } + /// ``` + pub fn with_endpoint>(mut self, v: V) -> Self { + self.config.endpoint = Some(v.into()); + self + } + + /// Configures the authentication credentials. + /// + /// More information about valid credentials types can be found in the + /// [google-cloud-auth] crate documentation. + /// + /// # Example + /// ```no_rust + /// # use google_cloud_pubsub::client::Subscriber; + /// # async fn sample() -> anyhow::Result<()> { + /// use auth::credentials::mds; + /// let client = Subscriber::builder() + /// .with_credentials( + /// mds::Builder::default() + /// .with_scopes(["https://www.googleapis.com/auth/cloud-platform.read-only"]) + /// .build()?) + /// .build() + /// .await?; + /// # Ok(()) } + /// ``` + /// + /// [google-cloud-auth]: https://docs.rs/google-cloud-auth + pub fn with_credentials>(mut self, v: V) -> Self { + self.config.cred = Some(v.into()); + self + } +} + +#[cfg(test)] +mod tests { + use super::*; + use auth::credentials::anonymous::Builder as Anonymous; + + #[test] + fn defaults() { + let builder = ClientBuilder::new(); + assert!(!builder.config.endpoint.is_some()); + assert!(!builder.config.cred.is_some()); + } + + #[test] + fn setters() { + let builder = ClientBuilder::new() + .with_endpoint("test-endpoint.com") + .with_credentials(Anonymous::new().build()); + assert_eq!( + builder.config.endpoint, + Some("test-endpoint.com".to_string()) + ); + assert!(builder.config.cred.is_some()); + } +} diff --git a/src/pubsub/src/subscriber/transport.rs b/src/pubsub/src/subscriber/transport.rs index d957d2db1e..d1c192c323 100644 --- a/src/pubsub/src/subscriber/transport.rs +++ b/src/pubsub/src/subscriber/transport.rs @@ -15,7 +15,7 @@ use super::stub::Stub; use crate::Result; use crate::generated::gapic_dataplane::stub::dynamic::Subscriber as GapicStub; -use crate::generated::gapic_dataplane::transport::Subscriber as Transport; +pub(crate) use crate::generated::gapic_dataplane::transport::Subscriber as Transport; use crate::google::pubsub::v1::{StreamingPullRequest, StreamingPullResponse}; use tokio::sync::mpsc::Receiver; use tokio_stream::wrappers::ReceiverStream; From 88118aa5cda0d4d44e3ab5248d6c402b8b57f18c Mon Sep 17 00:00:00 2001 From: Darren Bolduc Date: Tue, 30 Dec 2025 11:46:08 -0500 Subject: [PATCH 2/3] clippy knows --- src/pubsub/src/subscriber/client_builder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pubsub/src/subscriber/client_builder.rs b/src/pubsub/src/subscriber/client_builder.rs index e32ff65ba6..de9015466d 100644 --- a/src/pubsub/src/subscriber/client_builder.rs +++ b/src/pubsub/src/subscriber/client_builder.rs @@ -104,8 +104,8 @@ mod tests { #[test] fn defaults() { let builder = ClientBuilder::new(); - assert!(!builder.config.endpoint.is_some()); - assert!(!builder.config.cred.is_some()); + assert!(builder.config.endpoint.is_none()); + assert!(builder.config.cred.is_none()); } #[test] From 1794b09f61151a10a23fbd8e82333ff3ab3c3d0c Mon Sep 17 00:00:00 2001 From: Darren Bolduc Date: Fri, 2 Jan 2026 12:30:27 -0500 Subject: [PATCH 3/3] the times, they are a-changin --- src/pubsub/src/subscriber/client.rs | 2 +- src/pubsub/src/subscriber/client_builder.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pubsub/src/subscriber/client.rs b/src/pubsub/src/subscriber/client.rs index 0ee2b2f6ff..b4f5f9b152 100644 --- a/src/pubsub/src/subscriber/client.rs +++ b/src/pubsub/src/subscriber/client.rs @@ -1,4 +1,4 @@ -// Copyright 2025 Google LLC +// Copyright 2026 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/src/pubsub/src/subscriber/client_builder.rs b/src/pubsub/src/subscriber/client_builder.rs index de9015466d..61e4ea4606 100644 --- a/src/pubsub/src/subscriber/client_builder.rs +++ b/src/pubsub/src/subscriber/client_builder.rs @@ -1,4 +1,4 @@ -// Copyright 2025 Google LLC +// Copyright 2026 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License.