Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a6c9815
feat: rip out existing otel setup + add basic instrumentor
mrkaye97 Feb 7, 2025
39f802b
feat: initial wiring up handle start step run
mrkaye97 Feb 7, 2025
5e653ee
feat: add cancel action handler
mrkaye97 Feb 7, 2025
575b5b8
feat: wrapper for get group key run
mrkaye97 Feb 7, 2025
9d258c2
feat: add example
mrkaye97 Feb 7, 2025
7d7dc50
feat: add to example
mrkaye97 Feb 7, 2025
bd28adc
fix: use trace provider instead of global tracer
mrkaye97 Feb 7, 2025
86086d2
feat: improve example more
mrkaye97 Feb 7, 2025
d314f7b
fix: clean up client
mrkaye97 Feb 7, 2025
9c51d00
fix: otel path
mrkaye97 Feb 7, 2025
ad080b1
feat: wire up event push
mrkaye97 Feb 7, 2025
d22e48d
fix: remove all the otel config in the sdk
mrkaye97 Feb 7, 2025
842a3f3
chore: ver
mrkaye97 Feb 7, 2025
5487800
feat: instrument bulk push
mrkaye97 Feb 7, 2025
73c33c6
feat: instrument run_workflow
mrkaye97 Feb 7, 2025
9dd3b2e
feat: wrap async run workflow
mrkaye97 Feb 7, 2025
937d859
feat: instrument run_workflows + async
mrkaye97 Feb 7, 2025
c03a143
feat: more examples working
mrkaye97 Feb 8, 2025
e1be1ca
chore: lint
mrkaye97 Feb 8, 2025
ea772f4
feat: tests
mrkaye97 Feb 8, 2025
7158a51
feat: comments
mrkaye97 Feb 8, 2025
da19831
fix: py 310
mrkaye97 Feb 8, 2025
6cb3d7d
fix: tests in ci
mrkaye97 Feb 8, 2025
384ca11
fix: hack for CI
mrkaye97 Feb 8, 2025
4b1050d
fix: namespacing
mrkaye97 Feb 8, 2025
4312cbb
debug: try making test async
mrkaye97 Feb 8, 2025
1e2e466
debug: use async client
mrkaye97 Feb 8, 2025
3432776
debug: skip test
mrkaye97 Feb 8, 2025
d9f8d18
feat: move otel deps to extras
mrkaye97 Feb 9, 2025
ebb9b90
feat: throw correct error if extra is not installed
mrkaye97 Feb 9, 2025
8adc12b
fix: install all extras
mrkaye97 Feb 9, 2025
e36af14
fix: implement uninstrument correctly
mrkaye97 Feb 9, 2025
154a83d
fix: PR comments
mrkaye97 Feb 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
virtualenvs-create: true
virtualenvs-in-project: true
- name: Install dependencies
run: poetry install --no-interaction
run: poetry install --no-interaction --all-extras

- name: Generate Env File
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
virtualenvs-in-project: true

- name: Install linting tools
run: poetry install
run: poetry install --all-extras

- name: Run Black
run: poetry run black . --check --verbose --diff --color
Expand Down
7 changes: 7 additions & 0 deletions examples/opentelemetry_instrumentation/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dotenv import load_dotenv

from hatchet_sdk import Hatchet

load_dotenv()

hatchet = Hatchet(debug=True)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import json

import pytest
from opentelemetry.trace import NoOpTracerProvider

from hatchet_sdk import Hatchet, Worker
from hatchet_sdk.clients.admin import TriggerWorkflowOptions
from hatchet_sdk.clients.events import PushEventOptions
from hatchet_sdk.opentelemetry.instrumentor import HatchetInstrumentor

trace_provider = NoOpTracerProvider()

instrumentor = HatchetInstrumentor(tracer_provider=trace_provider)
instrumentor.instrument()

tracer = trace_provider.get_tracer(__name__)


def create_additional_metadata() -> dict[str, str]:
return instrumentor.inject_traceparent_into_metadata(
{"hello": "world"}, instrumentor.create_traceparent()
)


def create_push_options() -> PushEventOptions:
return {"additional_metadata": create_additional_metadata()}


@pytest.mark.parametrize("worker", ["otel"], indirect=True)
def test_push_event(hatchet: Hatchet, worker: Worker) -> None:
key = "otel:event"
payload = {"test": "test"}

with tracer.start_as_current_span("push_event"):
event = hatchet.event.push(
event_key=key,
payload=payload,
options=create_push_options(),
)

"""Assert on `endswith` to ignore namespacing"""
assert event.key.endswith(key)
assert event.payload == json.dumps(payload)


@pytest.mark.skip("Failing in CI for unknown reason")
@pytest.mark.asyncio()
@pytest.mark.parametrize("worker", ["otel"], indirect=True)
async def test_run_workflow(aiohatchet: Hatchet, worker: Worker) -> None:
with tracer.start_as_current_span("run_workflow") as span:
workflow = aiohatchet.admin.run_workflow(
"OTelWorkflow",
{"test": "test"},
options=TriggerWorkflowOptions(
additional_metadata=create_additional_metadata()
),
)

with pytest.raises(Exception, match="Workflow Errors"):
await workflow.result()
45 changes: 45 additions & 0 deletions examples/opentelemetry_instrumentation/tracer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import os
from typing import cast

from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import NoOpTracerProvider

trace_provider: TracerProvider | NoOpTracerProvider

if os.getenv("CI", "false") == "true":
trace_provider = NoOpTracerProvider()
else:
resource = Resource(
attributes={
SERVICE_NAME: os.getenv("HATCHET_CLIENT_OTEL_SERVICE_NAME", "test-service")
}
)

headers = dict(
[
cast(
tuple[str, str],
tuple(
os.getenv(
"HATCHET_CLIENT_OTEL_EXPORTER_OTLP_HEADERS", "foo=bar"
).split("=")
),
)
]
)

processor = BatchSpanProcessor(
OTLPSpanExporter(
endpoint=os.getenv(
"HATCHET_CLIENT_OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317"
),
headers=headers,
),
)

trace_provider = TracerProvider(resource=resource)

trace_provider.add_span_processor(processor)
161 changes: 161 additions & 0 deletions examples/opentelemetry_instrumentation/triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import asyncio

from examples.opentelemetry_instrumentation.client import hatchet
from examples.opentelemetry_instrumentation.tracer import trace_provider
from hatchet_sdk.clients.admin import TriggerWorkflowOptions
from hatchet_sdk.clients.events import PushEventOptions
from hatchet_sdk.opentelemetry.instrumentor import HatchetInstrumentor

instrumentor = HatchetInstrumentor(tracer_provider=trace_provider)
tracer = trace_provider.get_tracer(__name__)


def create_additional_metadata() -> dict[str, str]:
return instrumentor.inject_traceparent_into_metadata(
{"hello": "world"}, instrumentor.create_traceparent()
)


def create_push_options() -> PushEventOptions:
return {"additional_metadata": create_additional_metadata()}


def push_event() -> None:
print("\npush_event")
with tracer.start_as_current_span("push_event") as span:
hatchet.event.push(
"otel:event",
{"test": "test"},
options=create_push_options(),
)


async def async_push_event() -> None:
print("\nasync_push_event")
with tracer.start_as_current_span("async_push_event") as span:
await hatchet.event.async_push(
"otel:event", {"test": "test"}, options=create_push_options()
)


def bulk_push_event() -> None:
print("\nbulk_push_event")
with tracer.start_as_current_span("bulk_push_event") as span:
hatchet.event.bulk_push(
[
{
"additional_metadata": create_additional_metadata(),
"key": "otel:event",
"payload": {"test": "test 1"},
},
{
"additional_metadata": create_additional_metadata(),
"key": "otel:event",
"payload": {"test": "test 2"},
},
],
)


async def async_bulk_push_event() -> None:
print("\nasync_bulk_push_event")
with tracer.start_as_current_span("bulk_push_event") as span:
await hatchet.event.async_bulk_push(
[
{
"additional_metadata": create_additional_metadata(),
"key": "otel:event",
"payload": {"test": "test 1"},
},
{
"additional_metadata": create_additional_metadata(),
"key": "otel:event",
"payload": {"test": "test 2"},
},
],
)


def run_workflow() -> None:
print("\nrun_workflow")
with tracer.start_as_current_span("run_workflow") as span:
hatchet.admin.run_workflow(
"OTelWorkflow",
{"test": "test"},
options=TriggerWorkflowOptions(
additional_metadata=create_additional_metadata()
),
)


async def async_run_workflow() -> None:
print("\nasync_run_workflow")
with tracer.start_as_current_span("async_run_workflow") as span:
await hatchet.admin.aio.run_workflow(
"OTelWorkflow",
{"test": "test"},
options=TriggerWorkflowOptions(
additional_metadata=create_additional_metadata()
),
)


def run_workflows() -> None:
print("\nrun_workflows")
with tracer.start_as_current_span("run_workflows") as span:
hatchet.admin.run_workflows(
[
{
"workflow_name": "OTelWorkflow",
"input": {"test": "test"},
"options": TriggerWorkflowOptions(
additional_metadata=create_additional_metadata()
),
},
{
"workflow_name": "OTelWorkflow",
"input": {"test": "test 2"},
"options": TriggerWorkflowOptions(
additional_metadata=create_additional_metadata()
),
},
],
)


async def async_run_workflows() -> None:
print("\nasync_run_workflows")
with tracer.start_as_current_span("async_run_workflows") as span:
await hatchet.admin.aio.run_workflows(
[
{
"workflow_name": "OTelWorkflow",
"input": {"test": "test"},
"options": TriggerWorkflowOptions(
additional_metadata=create_additional_metadata()
),
},
{
"workflow_name": "OTelWorkflow",
"input": {"test": "test 2"},
"options": TriggerWorkflowOptions(
additional_metadata=create_additional_metadata()
),
},
],
)


async def main() -> None:
push_event()
await async_push_event()
bulk_push_event()
await async_bulk_push_event()
run_workflow()
# await async_run_workflow()
run_workflows()
# await async_run_workflows()


if __name__ == "__main__":
asyncio.run(main())
47 changes: 47 additions & 0 deletions examples/opentelemetry_instrumentation/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from examples.opentelemetry_instrumentation.client import hatchet
from examples.opentelemetry_instrumentation.tracer import trace_provider
from hatchet_sdk import Context
from hatchet_sdk.opentelemetry.instrumentor import HatchetInstrumentor

HatchetInstrumentor(
tracer_provider=trace_provider,
).instrument()


@hatchet.workflow(on_events=["otel:event"])
class OTelWorkflow:
@hatchet.step()
def your_spans_are_children_of_hatchet_span(
self, context: Context
) -> dict[str, str]:
with trace_provider.get_tracer(__name__).start_as_current_span("step1"):
print("executed step")
return {
"foo": "bar",
}

@hatchet.step()
def your_spans_are_still_children_of_hatchet_span(self, context: Context) -> None:
with trace_provider.get_tracer(__name__).start_as_current_span("step2"):
raise Exception("Manually instrumented step failed failed")

@hatchet.step()
def this_step_is_still_instrumented(self, context: Context) -> dict[str, str]:
print("executed still-instrumented step")
return {
"still": "instrumented",
}

@hatchet.step()
def this_step_is_also_still_instrumented(self, context: Context) -> None:
raise Exception("Still-instrumented step failed")


def main() -> None:
worker = hatchet.worker("otel-example-worker", max_runs=1)
worker.register_workflow(OTelWorkflow())
worker.start()


if __name__ == "__main__":
main()
Loading