diff --git a/examples/opentelemetry_instrumentation/test_otel_instrumentation.py b/examples/opentelemetry_instrumentation/test_otel_instrumentation.py index fa4e785d..4ec3def2 100644 --- a/examples/opentelemetry_instrumentation/test_otel_instrumentation.py +++ b/examples/opentelemetry_instrumentation/test_otel_instrumentation.py @@ -6,7 +6,11 @@ from hatchet_sdk import Hatchet, Worker from hatchet_sdk.clients.admin import TriggerWorkflowOptions from hatchet_sdk.clients.events import PushEventOptions -from hatchet_sdk.opentelemetry.instrumentor import HatchetInstrumentor +from hatchet_sdk.opentelemetry.instrumentor import ( + HatchetInstrumentor, + create_traceparent, + inject_traceparent_into_metadata, +) trace_provider = NoOpTracerProvider() @@ -17,9 +21,7 @@ def create_additional_metadata() -> dict[str, str]: - return instrumentor.inject_traceparent_into_metadata( - {"hello": "world"}, instrumentor.create_traceparent() - ) + return inject_traceparent_into_metadata({"hello": "world"}) def create_push_options() -> PushEventOptions: diff --git a/examples/opentelemetry_instrumentation/triggers.py b/examples/opentelemetry_instrumentation/triggers.py index 3a7b8cb0..b9ab4fd6 100644 --- a/examples/opentelemetry_instrumentation/triggers.py +++ b/examples/opentelemetry_instrumentation/triggers.py @@ -4,16 +4,18 @@ from examples.opentelemetry_instrumentation.tracer import trace_provider from hatchet_sdk.clients.admin import TriggerWorkflowOptions from hatchet_sdk.clients.events import PushEventOptions -from hatchet_sdk.opentelemetry.instrumentor import HatchetInstrumentor +from hatchet_sdk.opentelemetry.instrumentor import ( + HatchetInstrumentor, + create_traceparent, + inject_traceparent_into_metadata, +) instrumentor = HatchetInstrumentor(tracer_provider=trace_provider) tracer = trace_provider.get_tracer(__name__) def create_additional_metadata() -> dict[str, str]: - return instrumentor.inject_traceparent_into_metadata( - {"hello": "world"}, instrumentor.create_traceparent() - ) + return inject_traceparent_into_metadata({"hello": "world"}) def create_push_options() -> PushEventOptions: diff --git a/hatchet_sdk/opentelemetry/instrumentor.py b/hatchet_sdk/opentelemetry/instrumentor.py index d13e3fc5..91474c52 100644 --- a/hatchet_sdk/opentelemetry/instrumentor.py +++ b/hatchet_sdk/opentelemetry/instrumentor.py @@ -13,6 +13,7 @@ StatusCode, TracerProvider, get_tracer, + get_tracer_provider, ) from opentelemetry.trace.propagation.tracecontext import ( TraceContextTextMapPropagator, @@ -43,49 +44,121 @@ InstrumentKwargs = TracerProvider | MeterProvider | None +OTEL_TRACEPARENT_KEY = "traceparent" -class HatchetInstrumentor(BaseInstrumentor): # type: ignore[misc] - OTEL_TRACEPARENT_KEY = "traceparent" - def __init__( - self, - tracer_provider: TracerProvider, - meter_provider: MeterProvider = NoOpMeterProvider(), - ): - self.tracer_provider = tracer_provider - self.meter_provider = meter_provider +def create_traceparent() -> str | None: + """ + Creates and returns a W3C traceparent header value using OpenTelemetry's context propagation. - super().__init__() + The traceparent header is used to propagate context information across service boundaries + in distributed tracing systems. It follows the W3C Trace Context specification. - def create_traceparent(self) -> str | None: - carrier: dict[str, str] = {} - TraceContextTextMapPropagator().inject(carrier) + :returns: A W3C-formatted traceparent header value if successful, None if the context + injection fails or no active span exists.\n + Example: `00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01` + :rtype: str | None: + """ - return carrier.get("traceparent") + carrier: dict[str, str] = {} + TraceContextTextMapPropagator().inject(carrier) - def parse_carrier_from_metadata( - self, metadata: dict[str, str] | None - ) -> Context | None: - if not metadata: - return None + return carrier.get("traceparent") - traceparent = metadata.get(self.OTEL_TRACEPARENT_KEY) - if not traceparent: - return None +def parse_carrier_from_metadata(metadata: dict[str, str] | None) -> Context | None: + """ + Parses OpenTelemetry trace context from a metadata dictionary. - return TraceContextTextMapPropagator().extract( - {self.OTEL_TRACEPARENT_KEY: traceparent} - ) + Extracts the trace context from metadata using the W3C Trace Context format, + specifically looking for the `traceparent` header. + + :param metadata: A dictionary containing metadata key-value pairs, + potentially including the `traceparent` header. Can be None. + :type metadata: dict[str, str] | None + :returns: The extracted OpenTelemetry Context object if a valid `traceparent` + is found in the metadata, otherwise None. + :rtype: Context | None + + :Example: + + >>> metadata = {"traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"} + >>> context = parse_carrier_from_metadata(metadata) + """ + + if not metadata: + return None + + traceparent = metadata.get(OTEL_TRACEPARENT_KEY) + + if not traceparent: + return None + + return TraceContextTextMapPropagator().extract({OTEL_TRACEPARENT_KEY: traceparent}) + + +def inject_traceparent_into_metadata( + metadata: dict[str, str], traceparent: str | None = None +) -> dict[str, str]: + """ + Injects OpenTelemetry `traceparent` into a metadata dictionary. + + Takes a metadata dictionary and an optional `traceparent` string, + returning a new metadata dictionary with the `traceparent` added under the + `OTEL_TRACEPARENT_KEY`. If no `traceparent` is provided, it attempts to create one. - def inject_traceparent_into_metadata( - self, metadata: dict[str, str], traceparent: str | None - ) -> dict[str, str]: - if traceparent: - metadata[self.OTEL_TRACEPARENT_KEY] = traceparent + :param metadata: The metadata dictionary to inject the `traceparent` into. + :type metadata: dict[str, str] + :param traceparent: The `traceparent` string to inject. If None, attempts to use + the current span. + :type traceparent: str | None, optional + :returns: A new metadata dictionary containing the original metadata plus + the injected `traceparent`, if one was available or could be created. + :rtype: dict[str, str] + :Example: + + >>> metadata = {"key": "value"} + >>> new_metadata = inject_traceparent(metadata, "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") + >>> print(new_metadata) + {"key": "value", "traceparent": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"} + """ + + if not traceparent: + traceparent = create_traceparent() + + if not traceparent: return metadata + return { + **metadata, + OTEL_TRACEPARENT_KEY: traceparent, + } + + +class HatchetInstrumentor(BaseInstrumentor): # type: ignore[misc] + def __init__( + self, + tracer_provider: TracerProvider | None = None, + meter_provider: MeterProvider | None = None, + ): + """ + Hatchet OpenTelemetry instrumentor. + + The instrumentor provides an OpenTelemetry integration for Hatchet by setting up + tracing and metrics collection. + + :param tracer_provider: TracerProvider | None: The OpenTelemetry TracerProvider to use. + If not provided, the global tracer provider will be used. + :param meter_provider: MeterProvider | None: The OpenTelemetry MeterProvider to use. + If not provided, a no-op meter provider will be used. + """ + + self.tracer_provider = tracer_provider or get_tracer_provider() + self.meter_provider = meter_provider or NoOpMeterProvider() + + super().__init__() + def instrumentation_dependencies(self) -> Collection[str]: return tuple() @@ -154,7 +227,7 @@ async def _wrap_handle_start_step_run( kwargs: Any, ) -> Exception | None: action = args[0] - traceparent = self.parse_carrier_from_metadata(action.additional_metadata) + traceparent = parse_carrier_from_metadata(action.additional_metadata) with self._tracer.start_as_current_span( "hatchet.start_step_run", diff --git a/pyproject.toml b/pyproject.toml index 29cc6578..a3016a55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "hatchet-sdk" -version = "0.46.1" +version = "0.46.2" description = "" authors = ["Alexander Belanger "] readme = "README.md"