diff --git a/lib/fluent/config.rb b/lib/fluent/config.rb
index d471114830..b1cde87843 100644
--- a/lib/fluent/config.rb
+++ b/lib/fluent/config.rb
@@ -26,7 +26,7 @@ module Config
# @param additional_config [String] config which is added to last of config body
# @param use_v1_config [Bool] config is formatted with v1 or not
# @return [Fluent::Config]
- def self.build(config_path:, encoding: 'utf-8', additional_config: nil, use_v1_config: true, type: nil)
+ def self.build(config_path:, encoding: 'utf-8', additional_config: nil, use_v1_config: true, type: nil, on_file_parsed: nil)
if type == :guess
config_file_ext = File.extname(config_path)
if config_file_ext == '.yaml' || config_file_ext == '.yml'
@@ -35,7 +35,7 @@ def self.build(config_path:, encoding: 'utf-8', additional_config: nil, use_v1_c
end
if type == :yaml || type == :yml
- return Fluent::Config::YamlParser.parse(config_path)
+ return Fluent::Config::YamlParser.parse(config_path, on_file_parsed: on_file_parsed)
end
config_fname = File.basename(config_path)
@@ -49,10 +49,10 @@ def self.build(config_path:, encoding: 'utf-8', additional_config: nil, use_v1_c
s
end
- Fluent::Config.parse(config_data, config_fname, config_basedir, use_v1_config)
+ Fluent::Config.parse(config_data, config_fname, config_basedir, use_v1_config, on_file_parsed: on_file_parsed)
end
- def self.parse(str, fname, basepath = Dir.pwd, v1_config = nil, syntax: :v1)
+ def self.parse(str, fname, basepath = Dir.pwd, v1_config = nil, syntax: :v1, on_file_parsed: nil)
parser = if fname =~ /\.rb$/ || syntax == :ruby
:ruby
elsif v1_config.nil?
@@ -68,7 +68,7 @@ def self.parse(str, fname, basepath = Dir.pwd, v1_config = nil, syntax: :v1)
case parser
when :v1
require 'fluent/config/v1_parser'
- V1Parser.parse(str, fname, basepath, Kernel.binding)
+ V1Parser.parse(str, fname, basepath, Kernel.binding, on_file_parsed: on_file_parsed)
when :v0
# TODO: show deprecated message in v1
require 'fluent/config/parser'
diff --git a/lib/fluent/config/v1_parser.rb b/lib/fluent/config/v1_parser.rb
index 7666d3d8cd..af20945271 100644
--- a/lib/fluent/config/v1_parser.rb
+++ b/lib/fluent/config/v1_parser.rb
@@ -27,17 +27,18 @@ module Config
class V1Parser < LiteralParser
ELEMENT_NAME = /[a-zA-Z0-9_]+/
- def self.parse(data, fname, basepath = Dir.pwd, eval_context = nil)
+ def self.parse(data, fname, basepath = Dir.pwd, eval_context = nil, on_file_parsed: nil)
ss = StringScanner.new(data)
- ps = V1Parser.new(ss, basepath, fname, eval_context)
+ ps = V1Parser.new(ss, basepath, fname, eval_context, on_file_parsed: on_file_parsed)
ps.parse!
end
- def initialize(strscan, include_basepath, fname, eval_context)
+ def initialize(strscan, include_basepath, fname, eval_context, on_file_parsed: nil)
super(strscan, eval_context)
@include_basepath = include_basepath
@fname = fname
@logger = defined?($log) ? $log : nil
+ @on_file_parsed = on_file_parsed
end
def parse!
@@ -138,6 +139,8 @@ def parse_element(root_element, elem_name, attrs = {}, elems = [])
end
end
+ @on_file_parsed&.call(File.expand_path(File.join(@include_basepath, @fname))) if root_element
+
return attrs, elems
end
@@ -166,7 +169,7 @@ def eval_include(attrs, elems, uri)
data = File.read(entry)
data.force_encoding('UTF-8')
ss = StringScanner.new(data)
- V1Parser.new(ss, basepath, fname, @eval_context).parse_element(true, nil, attrs, elems)
+ V1Parser.new(ss, basepath, fname, @eval_context, on_file_parsed: @on_file_parsed).parse_element(true, nil, attrs, elems)
}
else
require 'open-uri'
@@ -175,7 +178,7 @@ def eval_include(attrs, elems, uri)
data = u.open { |f| f.read }
data.force_encoding('UTF-8')
ss = StringScanner.new(data)
- V1Parser.new(ss, basepath, fname, @eval_context).parse_element(true, nil, attrs, elems)
+ V1Parser.new(ss, basepath, fname, @eval_context, on_file_parsed: @on_file_parsed).parse_element(true, nil, attrs, elems)
end
rescue SystemCallError => e
cpe = ConfigParseError.new("include error #{uri} - #{e}")
diff --git a/lib/fluent/config/yaml_parser.rb b/lib/fluent/config/yaml_parser.rb
index e000bfa446..d9ec19295d 100644
--- a/lib/fluent/config/yaml_parser.rb
+++ b/lib/fluent/config/yaml_parser.rb
@@ -21,7 +21,7 @@
module Fluent
module Config
module YamlParser
- def self.parse(path)
+ def self.parse(path, on_file_parsed: nil)
context = Kernel.binding
unless context.respond_to?(:use_nil)
@@ -48,7 +48,7 @@ def self.parse(path)
end
end
- s = Fluent::Config::YamlParser::Loader.new(context).load(Pathname.new(path))
+ s = Fluent::Config::YamlParser::Loader.new(context, on_file_parsed: on_file_parsed).load(Pathname.new(path))
Fluent::Config::YamlParser::Parser.new(s).build.to_element
end
end
diff --git a/lib/fluent/config/yaml_parser/loader.rb b/lib/fluent/config/yaml_parser/loader.rb
index 19a5fb1ad4..2cb07bc946 100644
--- a/lib/fluent/config/yaml_parser/loader.rb
+++ b/lib/fluent/config/yaml_parser/loader.rb
@@ -30,9 +30,10 @@ class Loader
FLUENT_STR_TAG = 'tag:fluent/s'.freeze
SHOVEL = '<<'.freeze
- def initialize(context = Kernel.binding)
+ def initialize(context = Kernel.binding, on_file_parsed: nil)
@context = context
@current_path = nil
+ @on_file_parsed = on_file_parsed
end
# @param [String] path
@@ -55,9 +56,13 @@ def load(path)
Fluent::Config::YamlParser::FluentValue::StringValue.new(val, @context)
end
- path.open do |f|
+ config = path.open do |f|
visitor.accept(Psych.parse(f))
end
+
+ @on_file_parsed&.call(File.expand_path(path.to_s))
+
+ config
end
def eval_include(path, parent)
diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb
index 0b431c4a61..1857f3b277 100644
--- a/lib/fluent/supervisor.rb
+++ b/lib/fluent/supervisor.rb
@@ -18,6 +18,7 @@
require 'open3'
require 'pathname'
require 'find'
+require 'set'
require 'fluent/config'
require 'fluent/counter'
@@ -796,18 +797,20 @@ def configure(supervisor: false)
$log.warn('the value "-" for `inline_config` is deprecated. See https://github.com/fluent/fluentd/issues/2711')
@inline_config = STDIN.read
end
+ parsed_files = Set.new
@conf = Fluent::Config.build(
config_path: @config_path,
encoding: @conf_encoding,
additional_config: @inline_config,
use_v1_config: @use_v1_config,
type: @config_file_type,
+ on_file_parsed: ->(path) { parsed_files << path },
)
@system_config = build_system_config(@conf)
$log.info :supervisor, 'parsing config file is succeeded', path: @config_path
- build_additional_configurations do |additional_conf|
+ build_additional_configurations(parsed_files) do |additional_conf|
@conf += additional_conf
end
@@ -854,6 +857,7 @@ def setup_global_logger(supervisor: false)
additional_config: @inline_config,
use_v1_config: @use_v1_config,
type: @config_file_type,
+ on_file_parsed: nil,
)
system_config = build_system_config(conf)
@@ -1088,15 +1092,17 @@ def reload_config
$log.debug('worker got SIGUSR2')
begin
+ parsed_files = Set.new
conf = Fluent::Config.build(
config_path: @config_path,
encoding: @conf_encoding,
additional_config: @inline_config,
use_v1_config: @use_v1_config,
type: @config_file_type,
+ on_file_parsed: ->(path) { parsed_files << path },
)
- build_additional_configurations do |additional_conf|
+ build_additional_configurations(parsed_files) do |additional_conf|
conf += additional_conf
end
@@ -1206,7 +1212,7 @@ def build_system_config(conf)
system_config
end
- def build_additional_configurations
+ def build_additional_configurations(parsed_files)
if @system_config.config_include_dir&.empty?
$log.info :supervisor, 'configuration include directory is disabled'
return
@@ -1218,11 +1224,17 @@ def build_additional_configurations
next unless supported_suffixes.include?(File.extname(path))
# NOTE: both types of normal config (.conf) and YAML will be loaded.
# Thus, it does not care whether @config_path is .conf or .yml.
+ if parsed_files.include?(path)
+ $log.info :supervisor, 'skip auto loading, it was already loaded', path: path
+ next
+ end
+
$log.info :supervisor, 'loading additional configuration file', path: path
yield Fluent::Config.build(config_path: path,
encoding: @conf_encoding,
use_v1_config: @use_v1_config,
- type: :guess)
+ type: :guess,
+ on_file_parsed: nil)
end
rescue Errno::ENOENT
$log.info :supervisor, 'inaccessible include directory was specified', path: @system_config.config_include_dir
diff --git a/test/test_config.rb b/test/test_config.rb
index 019a534758..b370fb78bc 100644
--- a/test/test_config.rb
+++ b/test/test_config.rb
@@ -520,5 +520,118 @@ def write_config(path, data, encoding: 'utf-8')
assert_equal('value', c['key'])
assert_equal('value2', c['key2'])
end
+
+ sub_test_case 'on_file_parsed' do
+ test "calling order on normal configuration files" do
+ write_config("#{TMP_DIR}/build/common_param.conf", <<~EOS)
+ flush_interval 5s
+ total_limit_size 100m
+ chunk_limit_size 1m
+ EOS
+ write_config("#{TMP_DIR}/build/server.conf", <<~EOS)
+
+ host 127.0.0.1
+ port 24224
+
+ EOS
+ write_config("#{TMP_DIR}/build/forward.conf", <<~EOS)
+
+ @type forward
+
+ @include server.conf
+
+ EOS
+ write_config("#{TMP_DIR}/build/inline.conf", <<~EOS)
+
+ @type stdout
+ tag test
+
+ EOS
+ write_config("#{TMP_DIR}/build/fluent.conf", <<~EOS)
+
+ @type file
+
+ @include common_param.conf
+
+
+
+ @type stdout
+
+ @include common_param.conf
+
+
+
+ @include forward.conf
+ EOS
+
+ # parsed_files contains file paths in the order of file parsed
+ parsed_files = []
+ Fluent::Config.build(
+ config_path: "#{TMP_DIR}/build/fluent.conf",
+ additional_config: "@include inline.conf",
+ on_file_parsed: ->(path) { parsed_files << path },
+ )
+
+ assert_equal(
+ [
+ "#{TMP_DIR}/build/common_param.conf",
+ "#{TMP_DIR}/build/common_param.conf",
+ "#{TMP_DIR}/build/server.conf",
+ "#{TMP_DIR}/build/forward.conf",
+ "#{TMP_DIR}/build/inline.conf",
+ "#{TMP_DIR}/build/fluent.conf"
+ ],
+ parsed_files
+ )
+ end
+
+ test "calling order on YAML configuration files" do
+ write_config("#{TMP_DIR}/build/common_buffer.yaml", <<~EOS)
+ - buffer:
+ flush_interval: 5s
+ total_limit_size: 100m
+ chunk_limit_size: 1m
+ EOS
+ write_config("#{TMP_DIR}/build/forward.yaml", <<~EOS)
+ - match:
+ $tag: test.*
+ server:
+ host: 127.0.0.1
+ port: 24224
+ EOS
+ write_config("#{TMP_DIR}/build/fluent.yaml", <<~EOS)
+ config:
+ - match:
+ $tag: sample.*
+ $type: file
+ <<: !include common_buffer.yaml
+ - match:
+ $tag: debug.*
+ $type: stdout
+ <<: !include common_buffer.yaml
+ - !include forward.yaml
+ EOS
+
+ # parsed_files contains file paths in the order of file parsed
+ # `additional_config` does not support YAML config
+ parsed_files = []
+ Fluent::Config.build(
+ config_path: "#{TMP_DIR}/build/fluent.yaml",
+ type: :yaml,
+ on_file_parsed: ->(path) { parsed_files << path },
+ )
+
+ assert_equal(
+ [
+ "#{TMP_DIR}/build/common_buffer.yaml",
+ "#{TMP_DIR}/build/common_buffer.yaml",
+ "#{TMP_DIR}/build/forward.yaml",
+ "#{TMP_DIR}/build/fluent.yaml"
+ ],
+ parsed_files
+ )
+ end
+
+ end
end
end
diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb
index 6b489dc771..353a12b758 100644
--- a/test/test_supervisor.rb
+++ b/test/test_supervisor.rb
@@ -1085,7 +1085,8 @@ def test_stop_parallel_old_supervisor_after_delay
stub.proxy(Fluent::Config).build
stub(Fluent::Config).build(config_path: "/etc/fluent/fluent.conf", encoding: "utf-8",
additional_config: anything, use_v1_config: anything,
- type: anything) { config_element('ROOT', '', {}, [c]) }
+ type: anything,
+ on_file_parsed: anything) { config_element('ROOT', '', {}, [c]) }
sources.each do |type|
config = <<~EOF
@@ -1111,7 +1112,8 @@ def test_stop_parallel_old_supervisor_after_delay
stub.proxy(Fluent::Config).build
stub(Fluent::Config).build(config_path: "/etc/fluent/fluent.conf", encoding: "utf-8",
additional_config: anything, use_v1_config: anything,
- type: anything) { config_element('ROOT', '', {}, [c]) }
+ type: anything,
+ on_file_parsed: anything) { config_element('ROOT', '', {}, [c]) }
sources.each do |type|
config = <<~EOF
config:
@@ -1139,7 +1141,8 @@ def test_stop_parallel_old_supervisor_after_delay
stub.proxy(Fluent::Config).build
stub(Fluent::Config).build(config_path: "/etc/fluent/fluent.conf", encoding: "utf-8",
additional_config: anything, use_v1_config: anything,
- type: anything) { config_element('ROOT', '', {}, [c]) }
+ type: anything,
+ on_file_parsed: anything) { config_element('ROOT', '', {}, [c]) }
sources.each do |type|
if yaml
config = <<~EOF
@@ -1165,6 +1168,295 @@ def test_stop_parallel_old_supervisor_after_delay
expected = [c].concat(sources.collect { |type| {"@type" => type} })
assert_equal(expected, supervisor.instance_variable_get(:@conf).elements)
end
+
+ test "prevent duplicate loading" do
+ write_config("#{@config_include_dir}/system.conf", <<~EOF)
+
+ config_include_dir #{@config_include_dir}
+
+ EOF
+ write_config("#{@config_include_dir}/forward.conf", <<~EOF)
+
+ @type forward
+
+ @include #{@config_include_dir}/common_param.conf
+
+
+ host 127.0.0.1
+ port 24224
+
+
+ EOF
+ write_config("#{@config_include_dir}/common_param.conf", <<~EOF)
+ flush_interval 5s
+ EOF
+ write_config("#{@config_include_dir}/obsolete_plugins.conf", <<~EOF)
+
+ @type obsolete_plugins
+
+ EOF
+
+ write_config("#{@tmp_dir}/fluent.conf", <<~EOF)
+
+ @type file
+
+ @include #{@config_include_dir}/common_param.conf
+
+
+
+ @include #{@config_include_dir}/forward.conf
+ @include #{@config_include_dir}/system.conf
+ EOF
+
+ supervisor = Fluent::Supervisor.new({ config_path: "#{@tmp_dir}/fluent.conf" })
+ stub(supervisor).setup_global_logger { create_debug_dummy_logger }
+
+ supervisor.configure(supervisor: true)
+ elements = supervisor.instance_variable_get(:@conf).elements
+ assert_equal(4, elements.size)
+
+ assert_equal('match', elements[0].name)
+ assert_equal('file', elements[0]['@type'])
+ assert_equal('buffer', elements[0].elements[0].name)
+ assert_equal('5s', elements[0].elements[0]['flush_interval'])
+
+ assert_equal('match', elements[1].name)
+ assert_equal('forward', elements[1]['@type'])
+ assert_equal('buffer', elements[1].elements[0].name)
+ assert_equal('5s', elements[1].elements[0]['flush_interval'])
+
+ assert_equal('system', elements[2].name)
+ assert_equal(@config_include_dir, elements[2]['config_include_dir'])
+
+ assert_equal('source', elements[3].name)
+ assert_equal('obsolete_plugins', elements[3]['@type'])
+
+ skipped_files = %W[
+ #{@config_include_dir}/common_param.conf
+ #{@config_include_dir}/forward.conf
+ #{@config_include_dir}/system.conf
+ ]
+ loaded_files = %W[
+ #{@config_include_dir}/obsolete_plugins.conf
+ ]
+
+ logs_line = $log.out.logs.join
+ skipped_files.each do |path|
+ assert { logs_line.include?("skip auto loading, it was already loaded path=\"#{path}\"") }
+ end
+ loaded_files.each do |path|
+ assert { logs_line.include?("loading additional configuration file path=\"#{path}\"") }
+ end
+
+ # reload
+ $log.out.reset
+ supervisor.__send__(:reload_config)
+ sleep 0.2 # wait to finish reloading
+
+ reload_elements = supervisor.instance_variable_get(:@conf).elements
+ assert_equal(elements, reload_elements)
+
+ logs_line = $log.out.logs.join
+ skipped_files.each do |path|
+ assert { logs_line.include?("skip auto loading, it was already loaded path=\"#{path}\"") }
+ end
+ loaded_files.each do |path|
+ assert { logs_line.include?("loading additional configuration file path=\"#{path}\"") }
+ end
+ ensure
+ $log.out.reset if $log&.out&.respond_to?(:reset)
+ end
+
+ test "do not load additional configuration when loaded all files with @include" do
+ write_config("#{@config_include_dir}/forward.conf", <<~EOF)
+
+ @type forward
+
+ host 127.0.0.1
+ port 24224
+
+
+ EOF
+ write_config("#{@config_include_dir}/obsolete_plugins.conf", <<~EOF)
+
+ @type obsolete_plugins
+
+ EOF
+
+ write_config("#{@tmp_dir}/fluent.conf", <<~EOF)
+
+ config_include_dir #{@config_include_dir}
+
+
+
+ @type file
+
+
+ @include #{@config_include_dir}/*.conf
+ EOF
+
+ supervisor = Fluent::Supervisor.new({ config_path: "#{@tmp_dir}/fluent.conf" })
+ stub(supervisor).setup_global_logger { create_debug_dummy_logger }
+
+ supervisor.configure(supervisor: true)
+ elements = supervisor.instance_variable_get(:@conf).elements
+ assert_equal(4, elements.size)
+
+ assert_equal('system', elements[0].name)
+ assert_equal(@config_include_dir, elements[0]['config_include_dir'])
+
+ assert_equal('match', elements[1].name)
+ assert_equal('file', elements[1]['@type'])
+
+ assert_equal('match', elements[2].name)
+ assert_equal('forward', elements[2]['@type'])
+
+ assert_equal('source', elements[3].name)
+ assert_equal('obsolete_plugins', elements[3]['@type'])
+
+ # no additional load, all files were skipped
+ skipped_files = %W[
+ #{@config_include_dir}/forward.conf
+ #{@config_include_dir}/obsolete_plugins.conf
+ ]
+
+ logs_line = $log.out.logs.join
+ skipped_files.each do |path|
+ assert { logs_line.include?("skip auto loading, it was already loaded path=\"#{path}\"") }
+ end
+ assert_not_match(/loading additional configuration file/, logs_line)
+ ensure
+ $log.out.reset if $log&.out&.respond_to?(:reset)
+ end
+
+ test "can load partial config loaded config_include_dir feature by even if already loaded" do
+ write_config("#{@config_include_dir}/system.conf", <<~EOF)
+
+ config_include_dir #{@config_include_dir}
+
+ EOF
+ write_config("#{@config_include_dir}/forward.conf", <<~EOF)
+
+ @type forward
+
+ @include #{@config_include_dir}/common_param.conf
+
+
+ host 127.0.0.1
+ port 24224
+
+
+ EOF
+ write_config("#{@config_include_dir}/common_param.conf", <<~EOF)
+ flush_interval 5s
+ EOF
+ write_config("#{@config_include_dir}/obsolete_plugins.conf", <<~EOF)
+
+ @type obsolete_plugins
+
+ EOF
+
+ write_config("#{@tmp_dir}/fluent.conf", <<~EOF)
+
+ @type file
+
+ @include #{@config_include_dir}/common_param.conf
+
+
+
+ @include #{@config_include_dir}/system.conf
+ EOF
+
+ supervisor = Fluent::Supervisor.new({ config_path: "#{@tmp_dir}/fluent.conf" })
+ stub(supervisor).setup_global_logger { create_debug_dummy_logger }
+
+ supervisor.configure(supervisor: true)
+ elements = supervisor.instance_variable_get(:@conf).elements
+ assert_equal(4, elements.size)
+
+ assert_equal('match', elements[0].name)
+ assert_equal('file', elements[0]['@type'])
+ assert_equal('buffer', elements[0].elements[0].name)
+ assert_equal('5s', elements[0].elements[0]['flush_interval'])
+
+ assert_equal('system', elements[1].name)
+ assert_equal(@config_include_dir, elements[1]['config_include_dir'])
+
+ # include forward.conf using config_include_dir feature
+ assert_equal('match', elements[2].name)
+ assert_equal('forward', elements[2]['@type'])
+ assert_equal('buffer', elements[2].elements[0].name)
+ assert_equal('5s', elements[2].elements[0]['flush_interval'])
+
+ assert_equal('source', elements[3].name)
+ assert_equal('obsolete_plugins', elements[3]['@type'])
+
+ skipped_files = %W[
+ #{@config_include_dir}/common_param.conf
+ #{@config_include_dir}/system.conf
+ ]
+ loaded_files = %W[
+ #{@config_include_dir}/forward.conf
+ #{@config_include_dir}/obsolete_plugins.conf
+ ]
+
+ logs_line = $log.out.logs.join
+ skipped_files.each do |path|
+ assert { logs_line.include?("skip auto loading, it was already loaded path=\"#{path}\"") }
+ end
+ loaded_files.each do |path|
+ assert { logs_line.include?("loading additional configuration file path=\"#{path}\"") }
+ end
+ ensure
+ $log.out.reset if $log&.out&.respond_to?(:reset)
+ end
+
+ test "can load config files even if disable config_include_dir" do
+ write_config("#{@config_include_dir}/forward.conf", <<~EOF)
+
+ @type forward
+
+ host 127.0.0.1
+ port 24224
+
+
+ EOF
+
+ write_config("#{@tmp_dir}/fluent.conf", <<~EOF)
+
+ config_include_dir ""
+
+
+
+ @type file
+
+
+ @include #{@config_include_dir}/*.conf
+ EOF
+
+ supervisor = Fluent::Supervisor.new({ config_path: "#{@tmp_dir}/fluent.conf" })
+ stub(supervisor).setup_global_logger { create_debug_dummy_logger }
+
+ supervisor.configure(supervisor: true)
+ elements = supervisor.instance_variable_get(:@conf).elements
+ assert_equal(3, elements.size)
+
+ assert_equal('system', elements[0].name)
+ assert_equal("", elements[0]['config_include_dir'])
+
+ assert_equal('match', elements[1].name)
+ assert_equal('file', elements[1]['@type'])
+
+ assert_equal('match', elements[2].name)
+ assert_equal('forward', elements[2]['@type'])
+
+ # There is no logs for additional loading
+ logs_line = $log.out.logs.join
+ assert_not_match(/skip auto loading, it was already loaded/, logs_line)
+ assert_not_match(/loading additional configuration file/, logs_line)
+ ensure
+ $log.out.reset if $log&.out&.respond_to?(:reset)
+ end
end
def create_debug_dummy_logger