diff --git a/dingolytics/ingest/vector.py b/dingolytics/ingest/vector.py index 6a2c448c..acf8dda6 100644 --- a/dingolytics/ingest/vector.py +++ b/dingolytics/ingest/vector.py @@ -13,11 +13,14 @@ enabled: true """ -VECTOR_HTTP_INPUT = "http_server" +VECTOR_HTTP_INPUT = "http_input" VECTOR_HTTP_ROUTER = "http_router" VECTOR_HTTP_PATH_KEY = "_path_" VECTOR_SINK_PREFIX = "sink-" VECTOR_INTERNAL_INPUT = "vector_internal_logs" +VECTOR_IP_ADDR_FIELD = "ip_addr_v4" +VECTOR_IP_ADDR_HEADER = "x-real-ip" +VECTOR_IP_ADDR_REMAP = "ip_addr_remap" # VECTOR_INTERNAL_REMAP = "vector_internal_remap" @@ -78,13 +81,14 @@ class VectorInternalSource(VectorSection): class VectorHTTPSource(VectorSection): - type: str = VECTOR_HTTP_INPUT + type: str = "http_server" address: str = "0.0.0.0:8180" method = "POST" path = "/ingest" path_key: str = VECTOR_HTTP_PATH_KEY strict_path: bool = False decoding: dict = {"codec": "json"} + headers: list = ["x-real-ip"] class VectorConsoleSink(VectorSection): @@ -109,9 +113,16 @@ class VectorClickHouseSink(VectorSection): endpoint: str = "http://clickhouse:8123" +class VectorIPAddressTransform(VectorSection): + type: str = "remap" + inputs: list = [VECTOR_HTTP_INPUT] + source: str = f'.{VECTOR_IP_ADDR_FIELD} = del(."{VECTOR_IP_ADDR_HEADER}")' + + class VectorRouteTransform(VectorSection): type: str = "route" - inputs: list = [VECTOR_HTTP_INPUT] + # inputs: list = [VECTOR_HTTP_INPUT] # use original input + inputs: list = [VECTOR_IP_ADDR_REMAP] # use remapped input route: dict = {} _path_key: str = VECTOR_HTTP_PATH_KEY @@ -144,6 +155,7 @@ def add_defaults(self) -> None: self.add_source(VectorInternalSource(key=VECTOR_INTERNAL_INPUT)) self.add_source(VectorHTTPSource(key=VECTOR_HTTP_INPUT)) self.add_sink(VectorConsoleSink(key="console")) + self.add_transform(VectorIPAddressTransform(key=VECTOR_IP_ADDR_REMAP)) # self.add_transform(VectorInternalTransform(key=VECTOR_INTERNAL_REMAP)) def add_section(self, item: VectorSection, group_key: str) -> None: diff --git a/dingolytics/tests/test_ingest_config.py b/dingolytics/tests/test_ingest_config.py index 90c0f2b6..2ab261e4 100644 --- a/dingolytics/tests/test_ingest_config.py +++ b/dingolytics/tests/test_ingest_config.py @@ -32,5 +32,5 @@ def test_update_vector_config(self, _): # After adding streams 3 sinks are created vector_config = update_vector_config(streams, clean=False) self.assertEqual(len(vector_config.config["sources"]), 2) - self.assertEqual(len(vector_config.config["transforms"]), 1) + self.assertEqual(len(vector_config.config["transforms"]), 2) self.assertEqual(len(vector_config.config["sinks"]), 3) diff --git a/redash/query_runner/clickhouse.py b/redash/query_runner/clickhouse.py index b41768be..5a197ca9 100644 --- a/redash/query_runner/clickhouse.py +++ b/redash/query_runner/clickhouse.py @@ -110,7 +110,7 @@ def run_query( results = self._clickhouse_query( query, session_id, session_check=True ) - data = json_dumps(results) + data = json_dumps(results, default=str) error = None except Exception as exc: data = None