diff --git a/.gitignore b/.gitignore index 2fd5c21..6757fd5 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ build/* dist/* docs_src/_build/ +examples/config.yaml poetry.lock tamr_cloud_sdk.egg-info __pycache__/ diff --git a/docs_src/examples.md b/docs_src/examples.md index 63904ff..69cea8a 100644 --- a/docs_src/examples.md +++ b/docs_src/examples.md @@ -2,8 +2,14 @@ ## Jobs -Fetch a list of jobs: +Fetch a list of jobs and check for statuses: -```{literalinclude} ../examples/list_jobs.py +```{literalinclude} ../examples/jobs/list_jobs.py +:language: python +``` + +Poll a job until complete: + +```{literalinclude} ../examples/jobs/poll_job.py :language: python ``` \ No newline at end of file diff --git a/examples/config.template.yaml b/examples/config.template.yaml new file mode 100644 index 0000000..904be78 --- /dev/null +++ b/examples/config.template.yaml @@ -0,0 +1,2 @@ +tamr_cloud_host: +tamr_api_key: \ No newline at end of file diff --git a/examples/jobs/list_jobs.py b/examples/jobs/list_jobs.py new file mode 100644 index 0000000..3c756fb --- /dev/null +++ b/examples/jobs/list_jobs.py @@ -0,0 +1,60 @@ +"""Example script for fetching jobs from the Jobs API.""" + +import logging +import os +import sys + +import yaml + +from tamr.api.v1beta1.jobs_pb2 import JobState +from tamr_sdk.api_client import TamrApiClient + +PAGE_SIZE = 25 +NUM_PAGES_TO_CHECK = 10 +STATUS_STRS = ["PENDING", "RUNNING"] +STATUSES = [JobState.Value(s) for s in STATUS_STRS] + +if __name__ == "__main__": + # Set up logging + logger = logging.getLogger() + logger.setLevel("INFO") + logger.addHandler(logging.StreamHandler(sys.stdout)) + + # Read Tamr Cloud configurations from file + dir_path = os.path.dirname(os.path.realpath(__file__)) + config_path = os.path.join(dir_path, "..", "config.yaml") + with open(config_path) as stream: + config = yaml.safe_load(stream) + + # Initialize Tamr Cloud client + tamr_client = TamrApiClient( + config["tamr_cloud_host"], + [("x-api-key", config["tamr_api_key"])], + grpc_stack_trace=True, + ) + jobs_client = tamr_client.jobs() + logger.info("Client initialization complete.") + + # Find most recent job with status in `STATUSES` + next_page_token = None + + for p in range(NUM_PAGES_TO_CHECK): + list_jobs_resp = jobs_client.list_jobs( + page_token=next_page_token, page_size=PAGE_SIZE + ) + + matches = [j.status.state in STATUSES for j in list_jobs_resp.jobs] + + if any(matches): + first_matching_job = list_jobs_resp.jobs[matches.index(True)] + logger.info( + f"Most recent job with status in {STATUS_STRS} is {first_matching_job}." + ) + break + + next_page_token = list_jobs_resp.next_page_token + if p == NUM_PAGES_TO_CHECK - 1: + logger.info( + f"No job with status in {STATUS_STRS} found in recent " + + f"{NUM_PAGES_TO_CHECK * PAGE_SIZE} jobs." + ) diff --git a/examples/jobs/poll_job.py b/examples/jobs/poll_job.py new file mode 100644 index 0000000..b6e736e --- /dev/null +++ b/examples/jobs/poll_job.py @@ -0,0 +1,92 @@ +"""Example script for polling a job from the Jobs API.""" + +import datetime +import logging +import os +import sys +import time + +import yaml + +from tamr.api.v1beta1.jobs_pb2 import Job, JobState +from tamr_sdk.api_client import TamrApiClient +from tamr_sdk.jobs.jobs_client import JobsClient + +JOB_ID = "job_****************" + + +def calculate_runtime(job: Job) -> datetime.timedelta: + """Calculates runtime for a job. + + Assumes current status of job is "DONE". If job is not done, returns the + duration between start of job and beginning of job's current status. + + Args: + job: Job object + + Returns: + time delta object representing job duration + """ + stop = job.status.state_start_time + start = job.status_history[-1].state_start_time + diff = datetime.timedelta( + seconds=stop.seconds + stop.nanos * 1e-9 - start.seconds - start.nanos * 1e-9 + ) + return diff + + +def poll_job( + *, + jobs_client: JobsClient, + job_id: str, + logger: logging.Logger, + polling_interval_sec: int = 5, +) -> None: + """Poll job and return runtime when finished. + + Args: + jobs_client: Client instance associated with Tamr Cloud jobs service + job_id: job_id string (e.g. 'job_*********') + logger: logging instance + polling_interval_sec: how frequently to re-check job status + """ + while True: + # Check job status + job = jobs_client.get_job(job_id) + state = job.status.state + # Print info if complete + if state == JobState.DONE: + runtime = calculate_runtime(job) + logger.info(f"Job '{job_id}' finished in {runtime}.") + if job.status.error.message: + logger.warning( + f"Job '{job_id}' raised error: {job.status.error.message}." + ) + break + + # Wait before checking again + time.sleep(polling_interval_sec) + + +if __name__ == "__main__": + # Set up logging + logger = logging.getLogger() + logger.setLevel("INFO") + logger.addHandler(logging.StreamHandler(sys.stdout)) + + # Read Tamr Cloud configurations from file + dir_path = os.path.dirname(os.path.realpath(__file__)) + config_path = os.path.join(dir_path, "..", "config.yaml") + with open(config_path) as stream: + config = yaml.safe_load(stream) + + # Initialize Tamr Cloud jobs client + tamr_client = TamrApiClient( + config["tamr_cloud_host"], + [("x-api-key", config["tamr_api_key"])], + grpc_stack_trace=True, + ) + jobs_client = tamr_client.jobs() + logger.info("Client initialization complete.") + + poll_job(jobs_client=jobs_client, job_id=JOB_ID, logger=logger) diff --git a/examples/list_jobs.py b/examples/list_jobs.py deleted file mode 100644 index 1e5f600..0000000 --- a/examples/list_jobs.py +++ /dev/null @@ -1,26 +0,0 @@ -"""Example script for fetching jobs from the Jobs API.""" - -import time - -from tamr_sdk.api_client import TamrApiClient - -start = time.time() -tamr_client = TamrApiClient( - "", [("x-api-key", "")], grpc_stack_trace=True -) -one = time.time() - - -jobs = tamr_client.jobs() -two = time.time() - -list_jobs_resp = jobs.list_jobs() -three = time.time() - -get_job_resp = jobs.get_job(job_id="") -four = time.time() - -print(get_job_resp) -print( - f"APIClient took {one -start}; JobsClient took {two-one} and api call took {three-two} and second is {four-three}" -) diff --git a/requirements.txt b/requirements.txt index 38545a0..58b6676 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ google-api-python-client==2.126.0 grpcio==1.62.2 +pyyaml typing_extensions diff --git a/requirements_dev.txt b/requirements_dev.txt index e4c8577..31dec65 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -6,4 +6,5 @@ sphinx==7.3.7 sphinx-autodoc-typehints==2.1.0 sphinx-rtd-theme==2.0.0 types-protobuf +types-PyYAML types-setuptools