Skip to content

Ldrozdz93/azure blob storage source#1

Open
ldrozdz93 wants to merge 1 commit intomasterfrom
ldrozdz93/azure-blob-storage-source
Open

Ldrozdz93/azure blob storage source#1
ldrozdz93 wants to merge 1 commit intomasterfrom
ldrozdz93/azure-blob-storage-source

Conversation

@ldrozdz93
Copy link
Owner

@ldrozdz93 ldrozdz93 commented Jan 20, 2026

Summary

This PR adds Azure Blob Storage source. From the user's perspective, it's intended to work in a similar manner to AWS S3 source.

Vector configuration

sources:
  azure_blob:
    type: azure_blob
    connection_string: REDACTED
    container_name: logs
    queue:
      queue_name: eventgrid

sinks:
  console:
    type: console
    inputs:
      - azure_blob
    encoding:
      codec: json  

How did you test this PR?

  • Unit tests.
  • Integration tests.
  • Manual tests, described in the steps.md.

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

Notes

  • make build-licenses was run to regenerate the license inventory.
  • historical context: This Azure source was originally implemented for vector 0.38 and run in our product MVP for a few months. It supported just a tiny subset of features, only precisely what we needed. During that time, I've rebased it on the latest vector versions at the time. Then we've dropped the Azure Blobs approach in our product. I picked this up again a few months ago, added the intended feature parity with AWS S3 source, updated docs, extended tests etc. Basically done all that was required to contribute this to open-source.
  • This is my first opensource vector contribution. I'd appreciate feedback related to both the code and the process.

Example connection string:

```text
DefaultEndpointsProtocol=https;AccountName=myaccount;AccountKey=mykey;EndpointSuffix=core.windows.net

Check failure

Code scanning / check-spelling

Unrecognized Spelling Error

myaccount is not a recognized word. (unrecognized-spelling)
container_name = "logs"

[sources.azure_logs.queue]
queue_name = "eventgrid"

Check failure

Code scanning / check-spelling

Unrecognized Spelling Error

eventgrid is not a recognized word. (unrecognized-spelling)
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request introduces a new azure_blob source for Vector that enables reading logs from Azure Blob Storage via Event Grid notifications delivered through Azure Storage Queues.

Changes:

  • New Azure Blob Storage source with queue-based event processing
  • Support for compression (gzip, zstd), multiple codecs, and multiline aggregation
  • Comprehensive unit and integration tests
  • Documentation files and configuration examples

Reviewed changes

Copilot reviewed 20 out of 21 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
src/sources/azure_blob/mod.rs Main source implementation with streaming and event processing
src/sources/azure_blob/queue.rs Queue integration, blob retrieval, and Event Grid message processing
src/sources/azure_blob/test.rs Unit tests for compression detection and blob processing
src/sources/azure_blob/integration_tests.rs Integration tests covering various scenarios
src/internal_events/azure_queue.rs Internal event definitions for metrics and logging
website/cue/reference/components/sources/azure_blob.cue Component documentation and metadata
Cargo.toml Dependency and feature flag additions
tests/integration/azure/config/*.yaml Integration test configuration
testing/github-XXXXX/* Test artifacts and documentation (should be removed)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@ldrozdz93
Copy link
Owner Author

@copilot review

@ldrozdz93 ldrozdz93 requested a review from Copilot February 20, 2026 07:44
A new `azure_blob` source for reading logs from Azure Blob Storage
containers via Azure Storage Queue notifications (Event Grid).

Designed for feature parity with the existing `aws_s3` source.

Key features:
- Event-driven architecture using Azure Event Grid via Storage Queue
- Connection string authentication
- Configurable compression (gzip, zstd) with auto-detection
- Configurable framing (newline-delimited, character-delimited, etc.)
- Multiline aggregation for stack traces and multi-line logs
- Event metadata enrichment (container, blob, timestamp)
- Acknowledgement support
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 258 out of 292 changed files in this pull request and generated 8 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

if contains_module cargo-deny; then
if ! cargo-deny --version 2>/dev/null | grep -q '^cargo-deny 0.18.9'; then
if ! cargo-deny --version 2>/dev/null | grep -q '^cargo-deny 0.19.0'; then
cargo "${install[@]}" cargo-deny --version 0.18.9 --force --locked
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The version gate checks for cargo-deny 0.19.0, but the install command still pins 0.18.9. This will keep reinstalling the wrong version (or fail policy expectations). Update the install line to install 0.19.0 so it matches the check.

Suggested change
cargo "${install[@]}" cargo-deny --version 0.18.9 --force --locked
cargo "${install[@]}" cargo-deny --version 0.19.0 --force --locked

Copilot uses AI. Check for mistakes.
Comment on lines 181 to 207
impl<T, D> Stream for LenientFramedRead<T, D>
where
T: AsyncRead + Unpin,
D: Decoder,
{
type Item = Result<D::Item, D::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
match this.inner.as_mut().poll_next(cx) {
Poll::Ready(Some(Err(DecoderError::IO(e))))
if e.kind() == io::ErrorKind::ConnectionReset =>
{
let buffer = this.inner.read_buffer();

if buffer.is_empty() {
// Clean RST - no partial data, treat as EOF
Poll::Ready(None)
} else {
// Partial frame in buffer
Poll::Ready(Some(Err(D::Error::from(e))))
}
}
other => other.map_err(|e| e.into_inner()),
}
}
}
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This poll_next implementation won’t compile as written: (1) Poll<...> doesn’t provide map_err here, and (2) D::Error::from(e) requires D::Error: From<io::Error>, but that bound is not present on this Stream impl. A concrete fix is to add D::Error: From<io::Error> to the where clause and replace the last arm with an explicit mapping that converts DecoderError<D::Error> into D::Error (e.g., map over Poll/Option/Result and call into_inner() on the DecoderError).

Copilot uses AI. Check for mistakes.
Comment on lines 58 to 66
let mut starting_state = State::new(BTreeMap::new());
starting_state.sort_state.column = opts.sort_field;
starting_state.sort_state.reverse = opts.sort_desc;
starting_state.filter_state.column = opts.filter_field;
starting_state.filter_state.pattern = opts
.filter_value
.as_deref()
.map(Regex::new)
.and_then(Result::ok);
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invalid regex patterns are silently ignored (Result::ok()), which can make vector top --filter-value ... appear to do nothing without any feedback. Since this is user-provided CLI input, it should fail fast with a clear error (e.g., return a non-zero exit code and print the regex parse error), rather than disabling filtering implicitly.

Suggested change
let mut starting_state = State::new(BTreeMap::new());
starting_state.sort_state.column = opts.sort_field;
starting_state.sort_state.reverse = opts.sort_desc;
starting_state.filter_state.column = opts.filter_field;
starting_state.filter_state.pattern = opts
.filter_value
.as_deref()
.map(Regex::new)
.and_then(Result::ok);
// Validate the filter regex (if provided) and fail fast on errors.
let filter_pattern = match opts.filter_value.as_deref() {
Some(pattern) => match Regex::new(pattern) {
Ok(regex) => Some(regex),
Err(err) => {
#[allow(clippy::print_stderr)]
{
eprintln!(
"Invalid regular expression for --filter-value: {pattern}\nError: {err}"
);
}
return exitcode::USAGE;
}
},
None => None,
};
let mut starting_state = State::new(BTreeMap::new());
starting_state.sort_state.column = opts.sort_field;
starting_state.sort_state.reverse = opts.sort_desc;
starting_state.filter_state.column = opts.filter_field;
starting_state.filter_state.pattern = filter_pattern;

Copilot uses AI. Check for mistakes.
}

fn increment(&mut self, group_by_key: &Option<String>, value: &Option<Cow<'_, str>>) -> bool {
fn increment(&mut self, group_by_key: Option<String>, value: Option<&Value>) -> bool {
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling value.to_string_lossy() inside increment can allocate a temporary string for every sampled event and then immediately discard it. Since this code runs in the hot path for sampling, consider hashing a borrowed representation when possible (e.g., pass a Cow<'_, str>/&str into increment, or add a fast path for string/bytes Value variants) to avoid repeated allocations.

Copilot uses AI. Check for mistakes.
};
if let Some(value) = value {
self.hash_within_ratio(value.as_bytes())
self.hash_within_ratio(value.to_string_lossy().as_bytes())
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling value.to_string_lossy() inside increment can allocate a temporary string for every sampled event and then immediately discard it. Since this code runs in the hot path for sampling, consider hashing a borrowed representation when possible (e.g., pass a Cow<'_, str>/&str into increment, or add a fast path for string/bytes Value variants) to avoid repeated allocations.

Suggested change
self.hash_within_ratio(value.to_string_lossy().as_bytes())
if let Some(bytes) = value.as_bytes() {
self.hash_within_ratio(bytes)
} else {
self.hash_within_ratio(value.to_string_lossy().as_bytes())
}

Copilot uses AI. Check for mistakes.
Comment on lines 94 to 101
impl FunctionTransform for NoopTransform {
fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
if let Some(delay) = self.delay {
std::thread::sleep(delay);
}
output.push(event);
}
}
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using std::thread::sleep inside a FunctionTransform can block the Tokio runtime worker thread if the transform executes on it, causing unrelated async tasks/tests to stall and increasing flakiness. Since this delay is for deterministic latency tests, consider limiting delays to the TaskTransform path (which already uses tokio::time::sleep), or run the blocking sleep in a dedicated blocking context (e.g., tokio::task::block_in_place) with a comment explaining why blocking is safe here.

Copilot uses AI. Check for mistakes.
Comment on lines 52 to 58
let result = blob_client
.upload(
RequestContent::from(request.blob_data.to_vec()),
false,
byte_size as u64,
Some(upload_options),
)
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This converts request.blob_data into a new Vec<u8> (to_vec()), which forces an extra allocation + full copy of the payload for every request. If the Azure SDK supports building RequestContent from the existing buffer type (e.g., Bytes/Vec<u8> directly), prefer that to avoid the copy.

Copilot uses AI. Check for mistakes.
image: docker.io/amazon/amazon-ecs-local-container-endpoints:latest
environment:
# https://github.com/vectordotdev/vector/issues/24687
- DOCKER_API_VERSION=1.44
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are now two different Docker API version workarounds in the repo (DOCKER_API_VERSION=1.44 here, vs DOCKER_API_VERSION=1.43 in scripts/test-e2e-kubernetes.sh). Consider standardizing on a single version (and referencing a single upstream issue) to reduce confusion and avoid drifting behavior between CI paths.

Suggested change
- DOCKER_API_VERSION=1.44
- DOCKER_API_VERSION=1.43

Copilot uses AI. Check for mistakes.
@ldrozdz93 ldrozdz93 force-pushed the ldrozdz93/azure-blob-storage-source branch from fa44b4a to 6fd2c39 Compare February 20, 2026 10:28
@ldrozdz93 ldrozdz93 changed the base branch from master to blob-storage-rebase February 20, 2026 10:30
@ldrozdz93 ldrozdz93 changed the base branch from blob-storage-rebase to master February 20, 2026 10:30
@ldrozdz93 ldrozdz93 requested a review from Copilot February 20, 2026 10:31
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 21 changed files in this pull request and generated 8 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +17 to +21
let connection_string = ConnectionString::new(&connection_string)?;
let account_name = connection_string
.account_name
.ok_or("Account name missing in connection string")?;

Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code unconditionally requires account_name from the connection string (ok_or(...)), but the comment below states it is not required when a custom blob_endpoint is provided. Either update the comment to match the actual requirement, or change the logic to only require account_name when it’s actually needed.

Copilot uses AI. Check for mistakes.
Comment on lines +117 to +121
select! {
_ = shutdown.clone() => {
info!("Shutdown signal received, stopping Azure Blob queue polling.");
break;
},
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shutdown is only awaited in the messages.messages.is_empty() branch. If the queue is continuously non-empty, this loop will keep processing and never observe shutdown, preventing timely shutdown. Consider selecting on shutdown in the main loop (or at least between messages) so shutdown is responsive under load.

Copilot uses AI. Check for mistakes.
Comment on lines +377 to +381
.map(move |res| {
res.inspect_err(|err| {
error!("Framing error for blob '{}': {}", blob_for_error, err);
})
.ok()
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Framing errors are logged and then dropped via .ok(), which can lead to partially processed blobs being treated as successful and the queue message being deleted, causing data loss. The AWS S3 source records a read/framing error and avoids ack/deletion in that case; mirroring that approach here would preserve at-least-once semantics.

Copilot uses AI. Check for mistakes.
Comment on lines +65 to +71
source_type: {
description: "The name of the source type."
required: true
type: string: {
examples: ["azure_blob"]
}
}
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documented output fields don’t include container and blob, but the source always enriches events with these (Legacy namespace overwrites top-level fields; Vector namespace adds source metadata). Add them to the documented output schema so users can rely on these fields like bucket/object in aws_s3.

Copilot uses AI. Check for mistakes.
Comment on lines +72 to +78
ingest_timestamp: {
description: "The timestamp of when the blob was ingested by Vector."
required: true
type: string: {
examples: ["2020-10-26T12:34:56.789Z"]
}
}
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ingest_timestamp is documented here as a required top-level string field, but in the implementation it’s written as a timestamp under standard Vector metadata (.metadata.vector.ingest_timestamp). Either remove this from the top-level output fields (and rely on standard metadata docs), or document it in the correct namespace/type to avoid misleading users.

Suggested change
ingest_timestamp: {
description: "The timestamp of when the blob was ingested by Vector."
required: true
type: string: {
examples: ["2020-10-26T12:34:56.789Z"]
}
}

Copilot uses AI. Check for mistakes.
The source automatically handles:
- Blob downloads with streaming to handle large files efficiently
- 404 errors for blobs that no longer exist
- Queue message visibility timeouts and retries
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section claims the source “automatically handles … queue message visibility timeouts and retries”, but the implementation does not set/extend message visibility (and there’s no retry/renewal logic). Please adjust the documentation to match the actual behavior or implement visibility timeout management.

Suggested change
- Queue message visibility timeouts and retries

Copilot uses AI. Check for mistakes.
@@ -0,0 +1,164 @@
# Testing Plan - Azure Blob Storage Source (#XXXXX)
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The title still contains a #XXXXX placeholder. Update it to reference the actual issue/PR number (e.g. #13882) so the testing plan is traceable.

Suggested change
# Testing Plan - Azure Blob Storage Source (#XXXXX)
# Testing Plan - Azure Blob Storage Source (#13882)

Copilot uses AI. Check for mistakes.
azure_storage_queues = { version = "0.21", default-features = false, optional = true }

# Needed to bridge with outdated version of azure_core used in azure_storage*
azure_core_for_storage = { package = "azure_core", version = "0.21.0", default-features = false, features = ["enable_reqwest", "hmac_openssl"] }
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

azure_core_for_storage is declared as a non-optional dependency, which means the older azure_core (0.21.x) and its transitive deps will be pulled into every build even when Azure features are disabled. Make this dependency optional = true and gate it behind the existing azure / sources-azure_blob features (and update src/azure/mod.rs to compile accordingly).

Suggested change
azure_core_for_storage = { package = "azure_core", version = "0.21.0", default-features = false, features = ["enable_reqwest", "hmac_openssl"] }
azure_core_for_storage = { package = "azure_core", version = "0.21.0", default-features = false, features = ["enable_reqwest", "hmac_openssl"], optional = true }

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add Azure Blob Storage source

2 participants