diff --git a/source/plugins/ruby/CustomMetricsUtils.rb b/source/plugins/ruby/CustomMetricsUtils.rb index eb5470056..fb981041d 100644 --- a/source/plugins/ruby/CustomMetricsUtils.rb +++ b/source/plugins/ruby/CustomMetricsUtils.rb @@ -20,7 +20,7 @@ def check_custom_metrics_availability return true end - if enable_custom_metrics.nil? || enable_custom_metrics.to_s.downcase == 'false' + if enable_custom_metrics.nil? || enable_custom_metrics.to_s.empty? || enable_custom_metrics.to_s.downcase == 'false' return false end diff --git a/source/plugins/ruby/KubernetesApiClient.rb b/source/plugins/ruby/KubernetesApiClient.rb index 38f373fd6..dccfd4c85 100644 --- a/source/plugins/ruby/KubernetesApiClient.rb +++ b/source/plugins/ruby/KubernetesApiClient.rb @@ -10,6 +10,9 @@ class KubernetesApiClient require "time" require "ipaddress" require "jwt" + require "zlib" + require "stringio" + require 'yajl' require_relative "oms_common" require_relative "constants" @@ -864,18 +867,187 @@ def getResourcesAndContinuationTokenV2(uri, api_group: nil) resourceInventory = nil responseCode = nil begin - @Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2 : Getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}" - responseCode, resourceInfo = getKubeResourceInfoV2(uri, api_group: api_group) - @Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2 : Done getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}" - if !responseCode.nil? && responseCode == "200" && !resourceInfo.nil? - @Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2:Start:Parsing data for #{uri} using JSON @ #{Time.now.utc.iso8601}" - resourceInventory = JSON.parse(resourceInfo.body) - @Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2:End:Parsing data for #{uri} using JSON @ #{Time.now.utc.iso8601}" - resourceInfo = nil + resource_path = getResourceUri(uri, api_group) + if resource_path.nil? + @Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: resource path nil for #{uri}" + return continuationToken, resourceInventory, responseCode end - if (!resourceInventory.nil? && !resourceInventory["metadata"].nil?) - continuationToken = resourceInventory["metadata"]["continue"] + parsed_items = [] + metadata_continue = nil + resource_version = nil + parse_mode = "stream" + total_uncompressed_bytes = 0 + total_compressed_bytes = 0 + started_at = Time.now.utc + + begin + parsed_uri = URI.parse(resource_path) + if !File.exist?(@@CaFile) + raise "#{@@CaFile} doesnt exist" + end + + Net::HTTP.start(parsed_uri.host, parsed_uri.port, :use_ssl => true, :ca_file => @@CaFile, :verify_mode => OpenSSL::SSL::VERIFY_PEER, :open_timeout => 20, :read_timeout => 40) do |http| + kubeApiRequest = Net::HTTP::Get.new(parsed_uri.request_uri) + kubeApiRequest['Authorization'] = 'Bearer ' + getTokenStr + kubeApiRequest['User-Agent'] = getUserAgent() + kubeApiRequest['Accept-Encoding'] = 'gzip' + kubeApiRequest['Accept'] = 'application/json' + + @Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2(stream): Requesting #{uri} (api_group=#{api_group}) @ #{started_at.iso8601}" + + http.request(kubeApiRequest) do |response| + responseCode = response.code + unless responseCode == '200' + parse_mode = 'error' + @Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: Non-success code #{responseCode} for #{uri}" + # Send telemetry for non-success response codes + @@K8sApiResponseTelemetryTimeTracker = ApplicationInsightsUtility.sendAPIResponseTelemetry(responseCode, uri, "K8sAPIStatus", @@K8sApiResponseCodeHash, @@K8sApiResponseTelemetryTimeTracker) + break + end + + # Decide whether to stream or fallback to full parse based on Content-Length (if small, cheaper to full-parse) + content_length = nil + begin + content_length = Integer(response['Content-Length']) if response['Content-Length'] + rescue; end + small_threshold = 256 * 1024 # 256KB + + if content_length && content_length <= small_threshold + # Read whole (possibly compressed) body then use faster parser for small payloads + body_buf = +"" # mutable string + response.read_body { |c| body_buf << c } + total_compressed_bytes = body_buf.bytesize + if response['Content-Encoding'] == 'gzip' + begin + body_buf = Zlib::GzipReader.new(StringIO.new(body_buf)).read + parse_mode = 'full_gzip' + rescue => gzerr + @Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: gzip decompress(small) failed: #{gzerr}; using compressed body (parse will likely fail)" + end + else + parse_mode = 'full_plain' + end + total_uncompressed_bytes = body_buf.bytesize + resourceInventory = JSON.parse(body_buf) + else + # Streaming path - CRITICAL: Create parser ONCE outside the read_body loop + parse_mode = 'stream' + is_gzip = (response['Content-Encoding'] == 'gzip') + inflater = nil + yajl_parser = nil + begin + if is_gzip + # Use Inflate with gzip window bits for streaming + inflater = Zlib::Inflate.new(Zlib::MAX_WBITS + 32) + parse_mode = 'stream_gzip' + end + + # Create Yajl parser ONCE and reuse for all chunks + yajl_parser = Yajl::Parser.new + + # Set up the parser callback to extract items and continuation token + yajl_parser.on_parse_complete = lambda do |obj| + if obj.is_a?(Hash) + if obj.key?('items') && obj['items'].is_a?(Array) + # Force deep copy via JSON round-trip to avoid Yajl object reference issues + begin + serialized_items = JSON.generate(obj['items']) + deep_copied_items = JSON.parse(serialized_items) + parsed_items.concat(deep_copied_items) + rescue => json_err + @Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: JSON round-trip failed: #{json_err}, using shallow copy" + parsed_items.concat(obj['items']) + end + end + if obj.key?('metadata') && obj['metadata'].is_a?(Hash) + metadata_continue = obj['metadata']['continue'] if obj['metadata'].key?('continue') + resource_version = obj['metadata']['resourceVersion'] if obj['metadata'].key?('resourceVersion') + end + end + end + + # Stream and parse chunks + chunk_count = 0 + response.read_body do |compressed_chunk| + chunk_count += 1 + total_compressed_bytes += compressed_chunk.bytesize + + decompressed = if is_gzip + begin + inflater.inflate(compressed_chunk) + rescue Zlib::Error => zerr + @Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: gzip inflate failed at chunk #{chunk_count}: #{zerr}" + raise + end + else + compressed_chunk + end + + total_uncompressed_bytes += decompressed.bytesize + + # Feed decompressed chunk to the parser + # Yajl can handle incomplete JSON and will buffer internally + begin + yajl_parser << decompressed + rescue Yajl::ParseError => perr + # Only log parse errors, don't break the stream - might be incomplete chunk + @Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: Yajl parse error at chunk #{chunk_count}: #{perr}" + end + + # Yield control periodically to allow other threads to run (every 10 chunks) + Thread.pass if chunk_count % 10 == 0 + end # read_body + + # Finalize the parsing - this triggers on_parse_complete callback + yajl_parser.parse("") rescue nil + + # Build minimal inventory structure + resourceInventory = { + 'metadata' => { 'continue' => metadata_continue, 'resourceVersion' => resource_version }, + 'items' => parsed_items + } + + @Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2: Successfully parsed #{parsed_items.length} items in #{chunk_count} chunks" + + rescue => stream_err + @Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: Stream processing error: #{stream_err}" + raise + ensure + # Always clean up inflater resources, regardless of success or failure + if inflater + inflater.finish rescue nil + inflater.close rescue nil + end + end + end # streaming path + end # http.request + end # Net::HTTP.start + rescue => inner_err + @Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: streaming fetch/parse failed for #{uri}: #{inner_err}; falling back to legacy getKubeResourceInfoV2" + parse_mode = 'fallback' + begin + # Fallback to legacy path + fallbackResponseCode, resourceInfo = getKubeResourceInfoV2(uri, api_group: api_group) + responseCode = fallbackResponseCode + if fallbackResponseCode == '200' && resourceInfo && resourceInfo.body && !resourceInfo.body.empty? + resourceInventory = JSON.parse(resourceInfo.body) + # Set continuationToken from fallback response + if resourceInventory && resourceInventory['metadata'] + continuationToken = resourceInventory['metadata']['continue'] + end + end + rescue => legacy_err + @Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: legacy fallback also failed: #{legacy_err}" + ApplicationInsightsUtility.sendExceptionTelemetry(legacy_err) + end + end + + # Derive continuation token if not already set + if continuationToken.nil? && resourceInventory && resourceInventory['metadata'] + continuationToken = resourceInventory['metadata']['continue'] if resourceInventory['metadata'].key?('continue') end + duration_ms = ((Time.now.utc - started_at) * 1000).round(1) + @Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2: mode=#{parse_mode} code=#{responseCode} items=#{resourceInventory && resourceInventory['items'] ? resourceInventory['items'].length : 'n/a'} cont=#{continuationToken.nil? ? 'nil' : continuationToken.empty? ? 'empty' : 'set'} compBytes=#{total_compressed_bytes} uncompBytes=#{total_uncompressed_bytes} ms=#{duration_ms} uri=#{uri}" rescue => errorStr @Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2:Failed in get resources for #{uri} and continuation token: #{errorStr}" ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) diff --git a/source/plugins/ruby/in_kube_podinventory.rb b/source/plugins/ruby/in_kube_podinventory.rb index 9ba02327c..c0091a46a 100644 --- a/source/plugins/ruby/in_kube_podinventory.rb +++ b/source/plugins/ruby/in_kube_podinventory.rb @@ -689,7 +689,9 @@ def getPodInventoryRecords(item, serviceRecords, batchTime = Time.utc.iso8601) records.push(record) end #container status block end - @mdmPodRecordItems.push(mdmPodRecord.dup) + if CustomMetricsUtils.check_custom_metrics_availability + @mdmPodRecordItems.push(mdmPodRecord.dup) + end records.each do |record| if !record.nil?