Triggering a dbt Cloud Job in your automated workflow with python

Say you want to create an automated workflow that includes a dbt Cloud Job. Maybe the workflow is within an orchestration tool like Airflow, Azure Data Factory, or AWS Step Functions. Or perhaps you want a dbt Cloud Job to be part of a git change management process powered by GitHub Actions, CircleCI, or Azure DevOps. How can you do that?

Well, that’s where the dbt Cloud API comes in! The trigger job to run endpoint can be accessed to run a Job of your choosing. Then, you can use the get run endpoint to implement a basic and effective API polling technique to check the Job Run status.

I prefer a python script for this solution as it’s a powerful language understood by many developers, with exceptional error handling capabilities. Below is a basic script I recommend starting with:

import enum
import os
import time

# Be sure to `pip install requests` in your python environment
import requests


ACCOUNT_ID = <your-dbt-cloud-account-id>
JOB_ID = <your-dbt-cloud-account-id>

# Store your dbt Cloud API token securely in your workflow tool
API_KEY = os.getenv('DBT_CLOUD_API_TOKEN')


# These are documented on the dbt Cloud API docs
class DbtJobRunStatus(enum.IntEnum):
    QUEUED = 1
    STARTING = 2
    RUNNING = 3
    SUCCESS = 10
    ERROR = 20
    CANCELLED = 30


def _trigger_job() -> int:
    res = requests.post(
        url=f"https://cloud.getdbt.com/api/v2/accounts/{ACCOUNT_ID}/jobs/{JOB_ID}/run/",
        headers={'Authorization': f"Token {API_KEY}"},
        data={
            # Optionally pass a description that can be viewed within the dbt Cloud API.
            # See the API docs for additional parameters that can be passed in,
            # including `schema_override` 
            'cause': f"Triggered by my workflow!",
        }
    )

    try:
        res.raise_for_status()
    except:
        print(f"API token (last four): ...{API_KEY[-4:]}")
        raise

    response_payload = res.json()
    return response_payload['data']['id']


def _get_job_run_status(job_run_id):
    res = requests.get(
        url=f"https://cloud.getdbt.com/api/v2/accounts/{ACCOUNT_ID}/runs/{job_run_id}/",
        headers={'Authorization': f"Token {API_KEY}"},
    )

    res.raise_for_status()
    response_payload = res.json()
    return response_payload['data']['status']


def run():
    job_run_id = _trigger_job()

    print(f"job_run_id = {job_run_id}")

    
    while True:
        time.sleep(5)

        status = _get_job_run_status(job_run_id)

        print(f"status = {status}")

        if status == DbtJobRunStatus.SUCCESS:
            break
        elif status == DbtJobRunStatus.ERROR or status == DbtJobRunStatus.CANCELLED:
            raise Exception("Failure!")


if __name__ == '__main__':
    run()

Ideas for using this script

Related topics

6 Likes