diff --git a/lib/fluent/config.rb b/lib/fluent/config.rb index d471114830..b1cde87843 100644 --- a/lib/fluent/config.rb +++ b/lib/fluent/config.rb @@ -26,7 +26,7 @@ module Config # @param additional_config [String] config which is added to last of config body # @param use_v1_config [Bool] config is formatted with v1 or not # @return [Fluent::Config] - def self.build(config_path:, encoding: 'utf-8', additional_config: nil, use_v1_config: true, type: nil) + def self.build(config_path:, encoding: 'utf-8', additional_config: nil, use_v1_config: true, type: nil, on_file_parsed: nil) if type == :guess config_file_ext = File.extname(config_path) if config_file_ext == '.yaml' || config_file_ext == '.yml' @@ -35,7 +35,7 @@ def self.build(config_path:, encoding: 'utf-8', additional_config: nil, use_v1_c end if type == :yaml || type == :yml - return Fluent::Config::YamlParser.parse(config_path) + return Fluent::Config::YamlParser.parse(config_path, on_file_parsed: on_file_parsed) end config_fname = File.basename(config_path) @@ -49,10 +49,10 @@ def self.build(config_path:, encoding: 'utf-8', additional_config: nil, use_v1_c s end - Fluent::Config.parse(config_data, config_fname, config_basedir, use_v1_config) + Fluent::Config.parse(config_data, config_fname, config_basedir, use_v1_config, on_file_parsed: on_file_parsed) end - def self.parse(str, fname, basepath = Dir.pwd, v1_config = nil, syntax: :v1) + def self.parse(str, fname, basepath = Dir.pwd, v1_config = nil, syntax: :v1, on_file_parsed: nil) parser = if fname =~ /\.rb$/ || syntax == :ruby :ruby elsif v1_config.nil? @@ -68,7 +68,7 @@ def self.parse(str, fname, basepath = Dir.pwd, v1_config = nil, syntax: :v1) case parser when :v1 require 'fluent/config/v1_parser' - V1Parser.parse(str, fname, basepath, Kernel.binding) + V1Parser.parse(str, fname, basepath, Kernel.binding, on_file_parsed: on_file_parsed) when :v0 # TODO: show deprecated message in v1 require 'fluent/config/parser' diff --git a/lib/fluent/config/v1_parser.rb b/lib/fluent/config/v1_parser.rb index 7666d3d8cd..af20945271 100644 --- a/lib/fluent/config/v1_parser.rb +++ b/lib/fluent/config/v1_parser.rb @@ -27,17 +27,18 @@ module Config class V1Parser < LiteralParser ELEMENT_NAME = /[a-zA-Z0-9_]+/ - def self.parse(data, fname, basepath = Dir.pwd, eval_context = nil) + def self.parse(data, fname, basepath = Dir.pwd, eval_context = nil, on_file_parsed: nil) ss = StringScanner.new(data) - ps = V1Parser.new(ss, basepath, fname, eval_context) + ps = V1Parser.new(ss, basepath, fname, eval_context, on_file_parsed: on_file_parsed) ps.parse! end - def initialize(strscan, include_basepath, fname, eval_context) + def initialize(strscan, include_basepath, fname, eval_context, on_file_parsed: nil) super(strscan, eval_context) @include_basepath = include_basepath @fname = fname @logger = defined?($log) ? $log : nil + @on_file_parsed = on_file_parsed end def parse! @@ -138,6 +139,8 @@ def parse_element(root_element, elem_name, attrs = {}, elems = []) end end + @on_file_parsed&.call(File.expand_path(File.join(@include_basepath, @fname))) if root_element + return attrs, elems end @@ -166,7 +169,7 @@ def eval_include(attrs, elems, uri) data = File.read(entry) data.force_encoding('UTF-8') ss = StringScanner.new(data) - V1Parser.new(ss, basepath, fname, @eval_context).parse_element(true, nil, attrs, elems) + V1Parser.new(ss, basepath, fname, @eval_context, on_file_parsed: @on_file_parsed).parse_element(true, nil, attrs, elems) } else require 'open-uri' @@ -175,7 +178,7 @@ def eval_include(attrs, elems, uri) data = u.open { |f| f.read } data.force_encoding('UTF-8') ss = StringScanner.new(data) - V1Parser.new(ss, basepath, fname, @eval_context).parse_element(true, nil, attrs, elems) + V1Parser.new(ss, basepath, fname, @eval_context, on_file_parsed: @on_file_parsed).parse_element(true, nil, attrs, elems) end rescue SystemCallError => e cpe = ConfigParseError.new("include error #{uri} - #{e}") diff --git a/lib/fluent/config/yaml_parser.rb b/lib/fluent/config/yaml_parser.rb index e000bfa446..d9ec19295d 100644 --- a/lib/fluent/config/yaml_parser.rb +++ b/lib/fluent/config/yaml_parser.rb @@ -21,7 +21,7 @@ module Fluent module Config module YamlParser - def self.parse(path) + def self.parse(path, on_file_parsed: nil) context = Kernel.binding unless context.respond_to?(:use_nil) @@ -48,7 +48,7 @@ def self.parse(path) end end - s = Fluent::Config::YamlParser::Loader.new(context).load(Pathname.new(path)) + s = Fluent::Config::YamlParser::Loader.new(context, on_file_parsed: on_file_parsed).load(Pathname.new(path)) Fluent::Config::YamlParser::Parser.new(s).build.to_element end end diff --git a/lib/fluent/config/yaml_parser/loader.rb b/lib/fluent/config/yaml_parser/loader.rb index 19a5fb1ad4..2cb07bc946 100644 --- a/lib/fluent/config/yaml_parser/loader.rb +++ b/lib/fluent/config/yaml_parser/loader.rb @@ -30,9 +30,10 @@ class Loader FLUENT_STR_TAG = 'tag:fluent/s'.freeze SHOVEL = '<<'.freeze - def initialize(context = Kernel.binding) + def initialize(context = Kernel.binding, on_file_parsed: nil) @context = context @current_path = nil + @on_file_parsed = on_file_parsed end # @param [String] path @@ -55,9 +56,13 @@ def load(path) Fluent::Config::YamlParser::FluentValue::StringValue.new(val, @context) end - path.open do |f| + config = path.open do |f| visitor.accept(Psych.parse(f)) end + + @on_file_parsed&.call(File.expand_path(path.to_s)) + + config end def eval_include(path, parent) diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 0b431c4a61..1857f3b277 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -18,6 +18,7 @@ require 'open3' require 'pathname' require 'find' +require 'set' require 'fluent/config' require 'fluent/counter' @@ -796,18 +797,20 @@ def configure(supervisor: false) $log.warn('the value "-" for `inline_config` is deprecated. See https://github.com/fluent/fluentd/issues/2711') @inline_config = STDIN.read end + parsed_files = Set.new @conf = Fluent::Config.build( config_path: @config_path, encoding: @conf_encoding, additional_config: @inline_config, use_v1_config: @use_v1_config, type: @config_file_type, + on_file_parsed: ->(path) { parsed_files << path }, ) @system_config = build_system_config(@conf) $log.info :supervisor, 'parsing config file is succeeded', path: @config_path - build_additional_configurations do |additional_conf| + build_additional_configurations(parsed_files) do |additional_conf| @conf += additional_conf end @@ -854,6 +857,7 @@ def setup_global_logger(supervisor: false) additional_config: @inline_config, use_v1_config: @use_v1_config, type: @config_file_type, + on_file_parsed: nil, ) system_config = build_system_config(conf) @@ -1088,15 +1092,17 @@ def reload_config $log.debug('worker got SIGUSR2') begin + parsed_files = Set.new conf = Fluent::Config.build( config_path: @config_path, encoding: @conf_encoding, additional_config: @inline_config, use_v1_config: @use_v1_config, type: @config_file_type, + on_file_parsed: ->(path) { parsed_files << path }, ) - build_additional_configurations do |additional_conf| + build_additional_configurations(parsed_files) do |additional_conf| conf += additional_conf end @@ -1206,7 +1212,7 @@ def build_system_config(conf) system_config end - def build_additional_configurations + def build_additional_configurations(parsed_files) if @system_config.config_include_dir&.empty? $log.info :supervisor, 'configuration include directory is disabled' return @@ -1218,11 +1224,17 @@ def build_additional_configurations next unless supported_suffixes.include?(File.extname(path)) # NOTE: both types of normal config (.conf) and YAML will be loaded. # Thus, it does not care whether @config_path is .conf or .yml. + if parsed_files.include?(path) + $log.info :supervisor, 'skip auto loading, it was already loaded', path: path + next + end + $log.info :supervisor, 'loading additional configuration file', path: path yield Fluent::Config.build(config_path: path, encoding: @conf_encoding, use_v1_config: @use_v1_config, - type: :guess) + type: :guess, + on_file_parsed: nil) end rescue Errno::ENOENT $log.info :supervisor, 'inaccessible include directory was specified', path: @system_config.config_include_dir diff --git a/test/test_config.rb b/test/test_config.rb index 019a534758..b370fb78bc 100644 --- a/test/test_config.rb +++ b/test/test_config.rb @@ -520,5 +520,118 @@ def write_config(path, data, encoding: 'utf-8') assert_equal('value', c['key']) assert_equal('value2', c['key2']) end + + sub_test_case 'on_file_parsed' do + test "calling order on normal configuration files" do + write_config("#{TMP_DIR}/build/common_param.conf", <<~EOS) + flush_interval 5s + total_limit_size 100m + chunk_limit_size 1m + EOS + write_config("#{TMP_DIR}/build/server.conf", <<~EOS) + + host 127.0.0.1 + port 24224 + + EOS + write_config("#{TMP_DIR}/build/forward.conf", <<~EOS) + + @type forward + + @include server.conf + + EOS + write_config("#{TMP_DIR}/build/inline.conf", <<~EOS) + + @type stdout + tag test + + EOS + write_config("#{TMP_DIR}/build/fluent.conf", <<~EOS) + + @type file + + @include common_param.conf + + + + @type stdout + + @include common_param.conf + + + + @include forward.conf + EOS + + # parsed_files contains file paths in the order of file parsed + parsed_files = [] + Fluent::Config.build( + config_path: "#{TMP_DIR}/build/fluent.conf", + additional_config: "@include inline.conf", + on_file_parsed: ->(path) { parsed_files << path }, + ) + + assert_equal( + [ + "#{TMP_DIR}/build/common_param.conf", + "#{TMP_DIR}/build/common_param.conf", + "#{TMP_DIR}/build/server.conf", + "#{TMP_DIR}/build/forward.conf", + "#{TMP_DIR}/build/inline.conf", + "#{TMP_DIR}/build/fluent.conf" + ], + parsed_files + ) + end + + test "calling order on YAML configuration files" do + write_config("#{TMP_DIR}/build/common_buffer.yaml", <<~EOS) + - buffer: + flush_interval: 5s + total_limit_size: 100m + chunk_limit_size: 1m + EOS + write_config("#{TMP_DIR}/build/forward.yaml", <<~EOS) + - match: + $tag: test.* + server: + host: 127.0.0.1 + port: 24224 + EOS + write_config("#{TMP_DIR}/build/fluent.yaml", <<~EOS) + config: + - match: + $tag: sample.* + $type: file + <<: !include common_buffer.yaml + - match: + $tag: debug.* + $type: stdout + <<: !include common_buffer.yaml + - !include forward.yaml + EOS + + # parsed_files contains file paths in the order of file parsed + # `additional_config` does not support YAML config + parsed_files = [] + Fluent::Config.build( + config_path: "#{TMP_DIR}/build/fluent.yaml", + type: :yaml, + on_file_parsed: ->(path) { parsed_files << path }, + ) + + assert_equal( + [ + "#{TMP_DIR}/build/common_buffer.yaml", + "#{TMP_DIR}/build/common_buffer.yaml", + "#{TMP_DIR}/build/forward.yaml", + "#{TMP_DIR}/build/fluent.yaml" + ], + parsed_files + ) + end + + end end end diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb index 6b489dc771..353a12b758 100644 --- a/test/test_supervisor.rb +++ b/test/test_supervisor.rb @@ -1085,7 +1085,8 @@ def test_stop_parallel_old_supervisor_after_delay stub.proxy(Fluent::Config).build stub(Fluent::Config).build(config_path: "/etc/fluent/fluent.conf", encoding: "utf-8", additional_config: anything, use_v1_config: anything, - type: anything) { config_element('ROOT', '', {}, [c]) } + type: anything, + on_file_parsed: anything) { config_element('ROOT', '', {}, [c]) } sources.each do |type| config = <<~EOF @@ -1111,7 +1112,8 @@ def test_stop_parallel_old_supervisor_after_delay stub.proxy(Fluent::Config).build stub(Fluent::Config).build(config_path: "/etc/fluent/fluent.conf", encoding: "utf-8", additional_config: anything, use_v1_config: anything, - type: anything) { config_element('ROOT', '', {}, [c]) } + type: anything, + on_file_parsed: anything) { config_element('ROOT', '', {}, [c]) } sources.each do |type| config = <<~EOF config: @@ -1139,7 +1141,8 @@ def test_stop_parallel_old_supervisor_after_delay stub.proxy(Fluent::Config).build stub(Fluent::Config).build(config_path: "/etc/fluent/fluent.conf", encoding: "utf-8", additional_config: anything, use_v1_config: anything, - type: anything) { config_element('ROOT', '', {}, [c]) } + type: anything, + on_file_parsed: anything) { config_element('ROOT', '', {}, [c]) } sources.each do |type| if yaml config = <<~EOF @@ -1165,6 +1168,295 @@ def test_stop_parallel_old_supervisor_after_delay expected = [c].concat(sources.collect { |type| {"@type" => type} }) assert_equal(expected, supervisor.instance_variable_get(:@conf).elements) end + + test "prevent duplicate loading" do + write_config("#{@config_include_dir}/system.conf", <<~EOF) + + config_include_dir #{@config_include_dir} + + EOF + write_config("#{@config_include_dir}/forward.conf", <<~EOF) + + @type forward + + @include #{@config_include_dir}/common_param.conf + + + host 127.0.0.1 + port 24224 + + + EOF + write_config("#{@config_include_dir}/common_param.conf", <<~EOF) + flush_interval 5s + EOF + write_config("#{@config_include_dir}/obsolete_plugins.conf", <<~EOF) + + @type obsolete_plugins + + EOF + + write_config("#{@tmp_dir}/fluent.conf", <<~EOF) + + @type file + + @include #{@config_include_dir}/common_param.conf + + + + @include #{@config_include_dir}/forward.conf + @include #{@config_include_dir}/system.conf + EOF + + supervisor = Fluent::Supervisor.new({ config_path: "#{@tmp_dir}/fluent.conf" }) + stub(supervisor).setup_global_logger { create_debug_dummy_logger } + + supervisor.configure(supervisor: true) + elements = supervisor.instance_variable_get(:@conf).elements + assert_equal(4, elements.size) + + assert_equal('match', elements[0].name) + assert_equal('file', elements[0]['@type']) + assert_equal('buffer', elements[0].elements[0].name) + assert_equal('5s', elements[0].elements[0]['flush_interval']) + + assert_equal('match', elements[1].name) + assert_equal('forward', elements[1]['@type']) + assert_equal('buffer', elements[1].elements[0].name) + assert_equal('5s', elements[1].elements[0]['flush_interval']) + + assert_equal('system', elements[2].name) + assert_equal(@config_include_dir, elements[2]['config_include_dir']) + + assert_equal('source', elements[3].name) + assert_equal('obsolete_plugins', elements[3]['@type']) + + skipped_files = %W[ + #{@config_include_dir}/common_param.conf + #{@config_include_dir}/forward.conf + #{@config_include_dir}/system.conf + ] + loaded_files = %W[ + #{@config_include_dir}/obsolete_plugins.conf + ] + + logs_line = $log.out.logs.join + skipped_files.each do |path| + assert { logs_line.include?("skip auto loading, it was already loaded path=\"#{path}\"") } + end + loaded_files.each do |path| + assert { logs_line.include?("loading additional configuration file path=\"#{path}\"") } + end + + # reload + $log.out.reset + supervisor.__send__(:reload_config) + sleep 0.2 # wait to finish reloading + + reload_elements = supervisor.instance_variable_get(:@conf).elements + assert_equal(elements, reload_elements) + + logs_line = $log.out.logs.join + skipped_files.each do |path| + assert { logs_line.include?("skip auto loading, it was already loaded path=\"#{path}\"") } + end + loaded_files.each do |path| + assert { logs_line.include?("loading additional configuration file path=\"#{path}\"") } + end + ensure + $log.out.reset if $log&.out&.respond_to?(:reset) + end + + test "do not load additional configuration when loaded all files with @include" do + write_config("#{@config_include_dir}/forward.conf", <<~EOF) + + @type forward + + host 127.0.0.1 + port 24224 + + + EOF + write_config("#{@config_include_dir}/obsolete_plugins.conf", <<~EOF) + + @type obsolete_plugins + + EOF + + write_config("#{@tmp_dir}/fluent.conf", <<~EOF) + + config_include_dir #{@config_include_dir} + + + + @type file + + + @include #{@config_include_dir}/*.conf + EOF + + supervisor = Fluent::Supervisor.new({ config_path: "#{@tmp_dir}/fluent.conf" }) + stub(supervisor).setup_global_logger { create_debug_dummy_logger } + + supervisor.configure(supervisor: true) + elements = supervisor.instance_variable_get(:@conf).elements + assert_equal(4, elements.size) + + assert_equal('system', elements[0].name) + assert_equal(@config_include_dir, elements[0]['config_include_dir']) + + assert_equal('match', elements[1].name) + assert_equal('file', elements[1]['@type']) + + assert_equal('match', elements[2].name) + assert_equal('forward', elements[2]['@type']) + + assert_equal('source', elements[3].name) + assert_equal('obsolete_plugins', elements[3]['@type']) + + # no additional load, all files were skipped + skipped_files = %W[ + #{@config_include_dir}/forward.conf + #{@config_include_dir}/obsolete_plugins.conf + ] + + logs_line = $log.out.logs.join + skipped_files.each do |path| + assert { logs_line.include?("skip auto loading, it was already loaded path=\"#{path}\"") } + end + assert_not_match(/loading additional configuration file/, logs_line) + ensure + $log.out.reset if $log&.out&.respond_to?(:reset) + end + + test "can load partial config loaded config_include_dir feature by even if already loaded" do + write_config("#{@config_include_dir}/system.conf", <<~EOF) + + config_include_dir #{@config_include_dir} + + EOF + write_config("#{@config_include_dir}/forward.conf", <<~EOF) + + @type forward + + @include #{@config_include_dir}/common_param.conf + + + host 127.0.0.1 + port 24224 + + + EOF + write_config("#{@config_include_dir}/common_param.conf", <<~EOF) + flush_interval 5s + EOF + write_config("#{@config_include_dir}/obsolete_plugins.conf", <<~EOF) + + @type obsolete_plugins + + EOF + + write_config("#{@tmp_dir}/fluent.conf", <<~EOF) + + @type file + + @include #{@config_include_dir}/common_param.conf + + + + @include #{@config_include_dir}/system.conf + EOF + + supervisor = Fluent::Supervisor.new({ config_path: "#{@tmp_dir}/fluent.conf" }) + stub(supervisor).setup_global_logger { create_debug_dummy_logger } + + supervisor.configure(supervisor: true) + elements = supervisor.instance_variable_get(:@conf).elements + assert_equal(4, elements.size) + + assert_equal('match', elements[0].name) + assert_equal('file', elements[0]['@type']) + assert_equal('buffer', elements[0].elements[0].name) + assert_equal('5s', elements[0].elements[0]['flush_interval']) + + assert_equal('system', elements[1].name) + assert_equal(@config_include_dir, elements[1]['config_include_dir']) + + # include forward.conf using config_include_dir feature + assert_equal('match', elements[2].name) + assert_equal('forward', elements[2]['@type']) + assert_equal('buffer', elements[2].elements[0].name) + assert_equal('5s', elements[2].elements[0]['flush_interval']) + + assert_equal('source', elements[3].name) + assert_equal('obsolete_plugins', elements[3]['@type']) + + skipped_files = %W[ + #{@config_include_dir}/common_param.conf + #{@config_include_dir}/system.conf + ] + loaded_files = %W[ + #{@config_include_dir}/forward.conf + #{@config_include_dir}/obsolete_plugins.conf + ] + + logs_line = $log.out.logs.join + skipped_files.each do |path| + assert { logs_line.include?("skip auto loading, it was already loaded path=\"#{path}\"") } + end + loaded_files.each do |path| + assert { logs_line.include?("loading additional configuration file path=\"#{path}\"") } + end + ensure + $log.out.reset if $log&.out&.respond_to?(:reset) + end + + test "can load config files even if disable config_include_dir" do + write_config("#{@config_include_dir}/forward.conf", <<~EOF) + + @type forward + + host 127.0.0.1 + port 24224 + + + EOF + + write_config("#{@tmp_dir}/fluent.conf", <<~EOF) + + config_include_dir "" + + + + @type file + + + @include #{@config_include_dir}/*.conf + EOF + + supervisor = Fluent::Supervisor.new({ config_path: "#{@tmp_dir}/fluent.conf" }) + stub(supervisor).setup_global_logger { create_debug_dummy_logger } + + supervisor.configure(supervisor: true) + elements = supervisor.instance_variable_get(:@conf).elements + assert_equal(3, elements.size) + + assert_equal('system', elements[0].name) + assert_equal("", elements[0]['config_include_dir']) + + assert_equal('match', elements[1].name) + assert_equal('file', elements[1]['@type']) + + assert_equal('match', elements[2].name) + assert_equal('forward', elements[2]['@type']) + + # There is no logs for additional loading + logs_line = $log.out.logs.join + assert_not_match(/skip auto loading, it was already loaded/, logs_line) + assert_not_match(/loading additional configuration file/, logs_line) + ensure + $log.out.reset if $log&.out&.respond_to?(:reset) + end end def create_debug_dummy_logger