Conversation
| Example connection string: | ||
|
|
||
| ```text | ||
| DefaultEndpointsProtocol=https;AccountName=myaccount;AccountKey=mykey;EndpointSuffix=core.windows.net |
Check failure
Code scanning / check-spelling
Unrecognized Spelling Error
| container_name = "logs" | ||
|
|
||
| [sources.azure_logs.queue] | ||
| queue_name = "eventgrid" |
Check failure
Code scanning / check-spelling
Unrecognized Spelling Error
There was a problem hiding this comment.
Pull request overview
This pull request introduces a new azure_blob source for Vector that enables reading logs from Azure Blob Storage via Event Grid notifications delivered through Azure Storage Queues.
Changes:
- New Azure Blob Storage source with queue-based event processing
- Support for compression (gzip, zstd), multiple codecs, and multiline aggregation
- Comprehensive unit and integration tests
- Documentation files and configuration examples
Reviewed changes
Copilot reviewed 20 out of 21 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| src/sources/azure_blob/mod.rs | Main source implementation with streaming and event processing |
| src/sources/azure_blob/queue.rs | Queue integration, blob retrieval, and Event Grid message processing |
| src/sources/azure_blob/test.rs | Unit tests for compression detection and blob processing |
| src/sources/azure_blob/integration_tests.rs | Integration tests covering various scenarios |
| src/internal_events/azure_queue.rs | Internal event definitions for metrics and logging |
| website/cue/reference/components/sources/azure_blob.cue | Component documentation and metadata |
| Cargo.toml | Dependency and feature flag additions |
| tests/integration/azure/config/*.yaml | Integration test configuration |
| testing/github-XXXXX/* | Test artifacts and documentation (should be removed) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
e962d8b to
8a72d0d
Compare
8a72d0d to
fa44b4a
Compare
|
@copilot review |
A new `azure_blob` source for reading logs from Azure Blob Storage containers via Azure Storage Queue notifications (Event Grid). Designed for feature parity with the existing `aws_s3` source. Key features: - Event-driven architecture using Azure Event Grid via Storage Queue - Connection string authentication - Configurable compression (gzip, zstd) with auto-detection - Configurable framing (newline-delimited, character-delimited, etc.) - Multiline aggregation for stack traces and multi-line logs - Event metadata enrichment (container, blob, timestamp) - Acknowledgement support
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 258 out of 292 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if contains_module cargo-deny; then | ||
| if ! cargo-deny --version 2>/dev/null | grep -q '^cargo-deny 0.18.9'; then | ||
| if ! cargo-deny --version 2>/dev/null | grep -q '^cargo-deny 0.19.0'; then | ||
| cargo "${install[@]}" cargo-deny --version 0.18.9 --force --locked |
There was a problem hiding this comment.
The version gate checks for cargo-deny 0.19.0, but the install command still pins 0.18.9. This will keep reinstalling the wrong version (or fail policy expectations). Update the install line to install 0.19.0 so it matches the check.
| cargo "${install[@]}" cargo-deny --version 0.18.9 --force --locked | |
| cargo "${install[@]}" cargo-deny --version 0.19.0 --force --locked |
| impl<T, D> Stream for LenientFramedRead<T, D> | ||
| where | ||
| T: AsyncRead + Unpin, | ||
| D: Decoder, | ||
| { | ||
| type Item = Result<D::Item, D::Error>; | ||
|
|
||
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
| let mut this = self.project(); | ||
| match this.inner.as_mut().poll_next(cx) { | ||
| Poll::Ready(Some(Err(DecoderError::IO(e)))) | ||
| if e.kind() == io::ErrorKind::ConnectionReset => | ||
| { | ||
| let buffer = this.inner.read_buffer(); | ||
|
|
||
| if buffer.is_empty() { | ||
| // Clean RST - no partial data, treat as EOF | ||
| Poll::Ready(None) | ||
| } else { | ||
| // Partial frame in buffer | ||
| Poll::Ready(Some(Err(D::Error::from(e)))) | ||
| } | ||
| } | ||
| other => other.map_err(|e| e.into_inner()), | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
This poll_next implementation won’t compile as written: (1) Poll<...> doesn’t provide map_err here, and (2) D::Error::from(e) requires D::Error: From<io::Error>, but that bound is not present on this Stream impl. A concrete fix is to add D::Error: From<io::Error> to the where clause and replace the last arm with an explicit mapping that converts DecoderError<D::Error> into D::Error (e.g., map over Poll/Option/Result and call into_inner() on the DecoderError).
| let mut starting_state = State::new(BTreeMap::new()); | ||
| starting_state.sort_state.column = opts.sort_field; | ||
| starting_state.sort_state.reverse = opts.sort_desc; | ||
| starting_state.filter_state.column = opts.filter_field; | ||
| starting_state.filter_state.pattern = opts | ||
| .filter_value | ||
| .as_deref() | ||
| .map(Regex::new) | ||
| .and_then(Result::ok); |
There was a problem hiding this comment.
Invalid regex patterns are silently ignored (Result::ok()), which can make vector top --filter-value ... appear to do nothing without any feedback. Since this is user-provided CLI input, it should fail fast with a clear error (e.g., return a non-zero exit code and print the regex parse error), rather than disabling filtering implicitly.
| let mut starting_state = State::new(BTreeMap::new()); | |
| starting_state.sort_state.column = opts.sort_field; | |
| starting_state.sort_state.reverse = opts.sort_desc; | |
| starting_state.filter_state.column = opts.filter_field; | |
| starting_state.filter_state.pattern = opts | |
| .filter_value | |
| .as_deref() | |
| .map(Regex::new) | |
| .and_then(Result::ok); | |
| // Validate the filter regex (if provided) and fail fast on errors. | |
| let filter_pattern = match opts.filter_value.as_deref() { | |
| Some(pattern) => match Regex::new(pattern) { | |
| Ok(regex) => Some(regex), | |
| Err(err) => { | |
| #[allow(clippy::print_stderr)] | |
| { | |
| eprintln!( | |
| "Invalid regular expression for --filter-value: {pattern}\nError: {err}" | |
| ); | |
| } | |
| return exitcode::USAGE; | |
| } | |
| }, | |
| None => None, | |
| }; | |
| let mut starting_state = State::new(BTreeMap::new()); | |
| starting_state.sort_state.column = opts.sort_field; | |
| starting_state.sort_state.reverse = opts.sort_desc; | |
| starting_state.filter_state.column = opts.filter_field; | |
| starting_state.filter_state.pattern = filter_pattern; |
| } | ||
|
|
||
| fn increment(&mut self, group_by_key: &Option<String>, value: &Option<Cow<'_, str>>) -> bool { | ||
| fn increment(&mut self, group_by_key: Option<String>, value: Option<&Value>) -> bool { |
There was a problem hiding this comment.
Calling value.to_string_lossy() inside increment can allocate a temporary string for every sampled event and then immediately discard it. Since this code runs in the hot path for sampling, consider hashing a borrowed representation when possible (e.g., pass a Cow<'_, str>/&str into increment, or add a fast path for string/bytes Value variants) to avoid repeated allocations.
| }; | ||
| if let Some(value) = value { | ||
| self.hash_within_ratio(value.as_bytes()) | ||
| self.hash_within_ratio(value.to_string_lossy().as_bytes()) |
There was a problem hiding this comment.
Calling value.to_string_lossy() inside increment can allocate a temporary string for every sampled event and then immediately discard it. Since this code runs in the hot path for sampling, consider hashing a borrowed representation when possible (e.g., pass a Cow<'_, str>/&str into increment, or add a fast path for string/bytes Value variants) to avoid repeated allocations.
| self.hash_within_ratio(value.to_string_lossy().as_bytes()) | |
| if let Some(bytes) = value.as_bytes() { | |
| self.hash_within_ratio(bytes) | |
| } else { | |
| self.hash_within_ratio(value.to_string_lossy().as_bytes()) | |
| } |
| impl FunctionTransform for NoopTransform { | ||
| fn transform(&mut self, output: &mut OutputBuffer, event: Event) { | ||
| if let Some(delay) = self.delay { | ||
| std::thread::sleep(delay); | ||
| } | ||
| output.push(event); | ||
| } | ||
| } |
There was a problem hiding this comment.
Using std::thread::sleep inside a FunctionTransform can block the Tokio runtime worker thread if the transform executes on it, causing unrelated async tasks/tests to stall and increasing flakiness. Since this delay is for deterministic latency tests, consider limiting delays to the TaskTransform path (which already uses tokio::time::sleep), or run the blocking sleep in a dedicated blocking context (e.g., tokio::task::block_in_place) with a comment explaining why blocking is safe here.
| let result = blob_client | ||
| .upload( | ||
| RequestContent::from(request.blob_data.to_vec()), | ||
| false, | ||
| byte_size as u64, | ||
| Some(upload_options), | ||
| ) |
There was a problem hiding this comment.
This converts request.blob_data into a new Vec<u8> (to_vec()), which forces an extra allocation + full copy of the payload for every request. If the Azure SDK supports building RequestContent from the existing buffer type (e.g., Bytes/Vec<u8> directly), prefer that to avoid the copy.
| image: docker.io/amazon/amazon-ecs-local-container-endpoints:latest | ||
| environment: | ||
| # https://github.com/vectordotdev/vector/issues/24687 | ||
| - DOCKER_API_VERSION=1.44 |
There was a problem hiding this comment.
There are now two different Docker API version workarounds in the repo (DOCKER_API_VERSION=1.44 here, vs DOCKER_API_VERSION=1.43 in scripts/test-e2e-kubernetes.sh). Consider standardizing on a single version (and referencing a single upstream issue) to reduce confusion and avoid drifting behavior between CI paths.
| - DOCKER_API_VERSION=1.44 | |
| - DOCKER_API_VERSION=1.43 |
fa44b4a to
6fd2c39
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 21 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let connection_string = ConnectionString::new(&connection_string)?; | ||
| let account_name = connection_string | ||
| .account_name | ||
| .ok_or("Account name missing in connection string")?; | ||
|
|
There was a problem hiding this comment.
The code unconditionally requires account_name from the connection string (ok_or(...)), but the comment below states it is not required when a custom blob_endpoint is provided. Either update the comment to match the actual requirement, or change the logic to only require account_name when it’s actually needed.
| select! { | ||
| _ = shutdown.clone() => { | ||
| info!("Shutdown signal received, stopping Azure Blob queue polling."); | ||
| break; | ||
| }, |
There was a problem hiding this comment.
Shutdown is only awaited in the messages.messages.is_empty() branch. If the queue is continuously non-empty, this loop will keep processing and never observe shutdown, preventing timely shutdown. Consider selecting on shutdown in the main loop (or at least between messages) so shutdown is responsive under load.
| .map(move |res| { | ||
| res.inspect_err(|err| { | ||
| error!("Framing error for blob '{}': {}", blob_for_error, err); | ||
| }) | ||
| .ok() |
There was a problem hiding this comment.
Framing errors are logged and then dropped via .ok(), which can lead to partially processed blobs being treated as successful and the queue message being deleted, causing data loss. The AWS S3 source records a read/framing error and avoids ack/deletion in that case; mirroring that approach here would preserve at-least-once semantics.
| source_type: { | ||
| description: "The name of the source type." | ||
| required: true | ||
| type: string: { | ||
| examples: ["azure_blob"] | ||
| } | ||
| } |
There was a problem hiding this comment.
The documented output fields don’t include container and blob, but the source always enriches events with these (Legacy namespace overwrites top-level fields; Vector namespace adds source metadata). Add them to the documented output schema so users can rely on these fields like bucket/object in aws_s3.
| ingest_timestamp: { | ||
| description: "The timestamp of when the blob was ingested by Vector." | ||
| required: true | ||
| type: string: { | ||
| examples: ["2020-10-26T12:34:56.789Z"] | ||
| } | ||
| } |
There was a problem hiding this comment.
ingest_timestamp is documented here as a required top-level string field, but in the implementation it’s written as a timestamp under standard Vector metadata (.metadata.vector.ingest_timestamp). Either remove this from the top-level output fields (and rely on standard metadata docs), or document it in the correct namespace/type to avoid misleading users.
| ingest_timestamp: { | |
| description: "The timestamp of when the blob was ingested by Vector." | |
| required: true | |
| type: string: { | |
| examples: ["2020-10-26T12:34:56.789Z"] | |
| } | |
| } |
| The source automatically handles: | ||
| - Blob downloads with streaming to handle large files efficiently | ||
| - 404 errors for blobs that no longer exist | ||
| - Queue message visibility timeouts and retries |
There was a problem hiding this comment.
This section claims the source “automatically handles … queue message visibility timeouts and retries”, but the implementation does not set/extend message visibility (and there’s no retry/renewal logic). Please adjust the documentation to match the actual behavior or implement visibility timeout management.
| - Queue message visibility timeouts and retries |
| @@ -0,0 +1,164 @@ | |||
| # Testing Plan - Azure Blob Storage Source (#XXXXX) | |||
There was a problem hiding this comment.
The title still contains a #XXXXX placeholder. Update it to reference the actual issue/PR number (e.g. #13882) so the testing plan is traceable.
| # Testing Plan - Azure Blob Storage Source (#XXXXX) | |
| # Testing Plan - Azure Blob Storage Source (#13882) |
| azure_storage_queues = { version = "0.21", default-features = false, optional = true } | ||
|
|
||
| # Needed to bridge with outdated version of azure_core used in azure_storage* | ||
| azure_core_for_storage = { package = "azure_core", version = "0.21.0", default-features = false, features = ["enable_reqwest", "hmac_openssl"] } |
There was a problem hiding this comment.
azure_core_for_storage is declared as a non-optional dependency, which means the older azure_core (0.21.x) and its transitive deps will be pulled into every build even when Azure features are disabled. Make this dependency optional = true and gate it behind the existing azure / sources-azure_blob features (and update src/azure/mod.rs to compile accordingly).
| azure_core_for_storage = { package = "azure_core", version = "0.21.0", default-features = false, features = ["enable_reqwest", "hmac_openssl"] } | |
| azure_core_for_storage = { package = "azure_core", version = "0.21.0", default-features = false, features = ["enable_reqwest", "hmac_openssl"], optional = true } |
Summary
This PR adds Azure Blob Storage source. From the user's perspective, it's intended to work in a similar manner to AWS S3 source.
Vector configuration
How did you test this PR?
Change Type
Is this a breaking change?
Does this PR include user facing changes?
no-changeloglabel to this PR.References
Notes
make build-licenseswas run to regenerate the license inventory.