This repository was archived by the owner on Feb 20, 2025. It is now read-only.
forked from hatchet-dev/hatchet-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconftest.py
More file actions
106 lines (83 loc) · 2.68 KB
/
conftest.py
File metadata and controls
106 lines (83 loc) · 2.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import logging
import os
import subprocess
import time
from io import BytesIO
from threading import Thread
from typing import AsyncGenerator, Callable, Generator, cast
import psutil
import pytest
import pytest_asyncio
from hatchet_sdk import ClientConfig, Hatchet
from hatchet_sdk.loader import ClientTLSConfig
@pytest.fixture(scope="session", autouse=True)
def token() -> str:
result = subprocess.run(
[
"docker",
"compose",
"run",
"--no-deps",
"setup-config",
"/hatchet/hatchet-admin",
"token",
"create",
"--config",
"/hatchet/config",
"--tenant-id",
"707d0855-80ab-4e1f-a156-f1c4546cbf52",
],
capture_output=True,
text=True,
)
token = result.stdout.strip()
os.environ["HATCHET_CLIENT_TOKEN"] = token
return token
@pytest_asyncio.fixture(scope="session")
async def aiohatchet(token: str) -> AsyncGenerator[Hatchet, None]:
yield Hatchet(
debug=True,
config=ClientConfig(
token=token,
tls_config=ClientTLSConfig(strategy="none"),
),
)
@pytest.fixture(scope="session")
def hatchet(token: str) -> Hatchet:
return Hatchet(
debug=True,
config=ClientConfig(
token=token,
tls_config=ClientTLSConfig(strategy="none"),
),
)
@pytest.fixture()
def worker(
request: pytest.FixtureRequest,
) -> Generator[subprocess.Popen[bytes], None, None]:
example = cast(str, request.param)
command = ["poetry", "run", example]
logging.info(f"Starting background worker: {' '.join(command)}")
proc = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=os.environ.copy()
)
# Check if the process is still running
if proc.poll() is not None:
raise Exception(f"Worker failed to start with return code {proc.returncode}")
time.sleep(5)
def log_output(pipe: BytesIO, log_func: Callable[[str], None]) -> None:
for line in iter(pipe.readline, b""):
log_func(line.decode().strip())
Thread(target=log_output, args=(proc.stdout, logging.info), daemon=True).start()
Thread(target=log_output, args=(proc.stderr, logging.error), daemon=True).start()
yield proc
logging.info("Cleaning up background worker")
parent = psutil.Process(proc.pid)
children = parent.children(recursive=True)
for child in children:
child.terminate()
parent.terminate()
_, alive = psutil.wait_procs([parent] + children, timeout=3)
for p in alive:
logging.warning(f"Force killing process {p.pid}")
p.kill()