Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/pubsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ rust-version.workspace = true

[dependencies]
async-trait.workspace = true
auth.workspace = true
bytes.workspace = true
futures.workspace = true
gax.workspace = true
Expand All @@ -48,7 +49,6 @@ tracing.workspace = true
wkt.workspace = true

[dev-dependencies]
auth.workspace = true
anyhow.workspace = true
mockall.workspace = true
tokio-test.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions src/pubsub/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

pub(crate) mod builder;
pub(crate) mod client;
pub(crate) mod client_builder;
pub(crate) mod handler;
pub(crate) mod keepalive;
pub(crate) mod lease_loop;
Expand Down
90 changes: 90 additions & 0 deletions src/pubsub/src/subscriber/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::client_builder::ClientBuilder;
use super::transport::Transport;
use gax::client_builder::Result as BuilderResult;
use std::sync::Arc;

/// A Subscriber client for the [Cloud Pub/Sub] API.
///
/// Use this client to receive messages from a [pull subscription] on a topic.
///
// TODO(#3941) - add basic subscribe example
//
/// # Configuration
///
/// To configure a `Subscriber` use the `with_*` methods in the type returned by
/// [builder()][Storage::builder]. The default configuration should work for
/// most applications. Common configuration changes include:
///
/// * [with_endpoint()]: by default this client uses the global default endpoint
/// (`https://pubsub.googleapis.com`). Applications using regional endpoints
/// or running in restricted networks (e.g. a network configured with
/// [Private Google Access with VPC Service Controls]) may want to override
/// this default.
/// * [with_credentials()]: by default this client uses
/// [Application Default Credentials]. Applications using custom
/// authentication may need to override this default.
///
/// # Pooling and Cloning
///
/// `Subscriber` holds a connection pool internally, it is advised to
/// create one and then reuse it. You do not need to wrap `Subscriber` in
/// an [Rc](std::rc::Rc) or [Arc] to reuse it, because it already uses an `Arc`
/// internally.
///
/// [application default credentials]: https://cloud.google.com/docs/authentication#adc
/// [cloud pub/sub]: https://docs.cloud.google.com/pubsub/docs/overview
/// [private google access with vpc service controls]: https://cloud.google.com/vpc-service-controls/docs/private-connectivity
/// [pull subscription]: https://docs.cloud.google.com/pubsub/docs/pull
/// [with_endpoint()]: ClientBuilder::with_endpoint
/// [with_credentials()]: ClientBuilder::with_credentials
#[derive(Clone, Debug)]
pub struct Subscriber {
inner: Arc<Transport>,
}

impl Subscriber {
/// Returns a builder for [Subscriber].
///
/// # Example
/// ```no_rust
/// # use google_cloud_pubsub::client::Subscriber;
/// # async fn sample() -> anyhow::Result<()> {
/// let client = Subscriber::builder().build().await?;
/// # Ok(()) }
/// ```
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}

pub(crate) async fn new(builder: ClientBuilder) -> BuilderResult<Self> {
let transport = Transport::new(builder.config).await?;
Ok(Self {
inner: Arc::new(transport),
})
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn basic() -> anyhow::Result<()> {
let _ = Subscriber::builder().build().await?;
Ok(())
}
}
122 changes: 122 additions & 0 deletions src/pubsub/src/subscriber/client_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::client::Subscriber;
use gax::client_builder::Result as BuilderResult;
use gaxi::options::ClientConfig;

/// A builder for [Subscriber].
///
/// ```no_rust
/// # use google_cloud_pubsub::client::Subscriber;
/// # async fn sample() -> anyhow::Result<()> {
/// let builder = Subscriber::builder();
/// let client = builder
/// .with_endpoint("https://pubsub.googleapis.com")
/// .build()
/// .await?;
/// # Ok(()) }
/// ```
pub struct ClientBuilder {
pub(crate) config: ClientConfig,
}

impl ClientBuilder {
pub(crate) fn new() -> Self {
Self {
config: ClientConfig::default(),
}
}

/// Creates a new client.
///
/// # Example
/// ```no_rust
/// # use google_cloud_pubsub::client::Subscriber;
/// # async fn sample() -> anyhow::Result<()> {
/// let client = Subscriber::builder().build().await?;
/// # Ok(()) }
/// ```
pub async fn build(self) -> BuilderResult<Subscriber> {
Subscriber::new(self).await
}

/// Sets the endpoint.
///
/// # Example
/// ```no_rust
/// # use google_cloud_pubsub::client::Subscriber;
/// # async fn sample() -> anyhow::Result<()> {
/// let client = Subscriber::builder()
/// .with_endpoint("https://private.googleapis.com")
/// .build()
/// .await?;
/// # Ok(()) }
/// ```
pub fn with_endpoint<V: Into<String>>(mut self, v: V) -> Self {
self.config.endpoint = Some(v.into());
self
}

/// Configures the authentication credentials.
///
/// More information about valid credentials types can be found in the
/// [google-cloud-auth] crate documentation.
///
/// # Example
/// ```no_rust
/// # use google_cloud_pubsub::client::Subscriber;
/// # async fn sample() -> anyhow::Result<()> {
/// use auth::credentials::mds;
/// let client = Subscriber::builder()
/// .with_credentials(
/// mds::Builder::default()
/// .with_scopes(["https://www.googleapis.com/auth/cloud-platform.read-only"])
/// .build()?)
/// .build()
/// .await?;
/// # Ok(()) }
/// ```
///
/// [google-cloud-auth]: https://docs.rs/google-cloud-auth
pub fn with_credentials<V: Into<auth::credentials::Credentials>>(mut self, v: V) -> Self {
self.config.cred = Some(v.into());
self
}
}

#[cfg(test)]
mod tests {
use super::*;
use auth::credentials::anonymous::Builder as Anonymous;

#[test]
fn defaults() {
let builder = ClientBuilder::new();
assert!(builder.config.endpoint.is_none());
assert!(builder.config.cred.is_none());
}

#[test]
fn setters() {
let builder = ClientBuilder::new()
.with_endpoint("test-endpoint.com")
.with_credentials(Anonymous::new().build());
assert_eq!(
builder.config.endpoint,
Some("test-endpoint.com".to_string())
);
assert!(builder.config.cred.is_some());
}
}
2 changes: 1 addition & 1 deletion src/pubsub/src/subscriber/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use super::stub::Stub;
use crate::Result;
use crate::generated::gapic_dataplane::stub::dynamic::Subscriber as GapicStub;
use crate::generated::gapic_dataplane::transport::Subscriber as Transport;
pub(crate) use crate::generated::gapic_dataplane::transport::Subscriber as Transport;
use crate::google::pubsub::v1::{StreamingPullRequest, StreamingPullResponse};
use tokio::sync::mpsc::Receiver;
use tokio_stream::wrappers::ReceiverStream;
Expand Down