Say you want to create an automated workflow that includes a dbt Cloud Job. Maybe the workflow is within an orchestration tool like Airflow, Azure Data Factory, or AWS Step Functions. Or perhaps you want a dbt Cloud Job to be part of a git change management process powered by GitHub Actions, CircleCI, or Azure DevOps. How can you do that?
Well, that’s where the dbt Cloud API comes in! The trigger job to run
endpoint can be accessed to run a Job of your choosing. Then, you can use the get run
endpoint to implement a basic and effective API polling technique to check the Job Run status.
I prefer a python script for this solution as it’s a powerful language understood by many developers, with exceptional error handling capabilities. Below is a basic script I recommend starting with:
import enum
import os
import time
# Be sure to `pip install requests` in your python environment
import requests
ACCOUNT_ID = <your-dbt-cloud-account-id>
JOB_ID = <your-dbt-cloud-account-id>
# Store your dbt Cloud API token securely in your workflow tool
API_KEY = os.getenv('DBT_CLOUD_API_TOKEN')
# These are documented on the dbt Cloud API docs
class DbtJobRunStatus(enum.IntEnum):
QUEUED = 1
STARTING = 2
RUNNING = 3
SUCCESS = 10
ERROR = 20
CANCELLED = 30
def _trigger_job() -> int:
res = requests.post(
url=f"https://cloud.getdbt.com/api/v2/accounts/{ACCOUNT_ID}/jobs/{JOB_ID}/run/",
headers={'Authorization': f"Token {API_KEY}"},
json={
# Optionally pass a description that can be viewed within the dbt Cloud API.
# See the API docs for additional parameters that can be passed in,
# including `schema_override`
'cause': f"Triggered by my workflow!",
}
)
try:
res.raise_for_status()
except:
print(f"API token (last four): ...{API_KEY[-4:]}")
raise
response_payload = res.json()
return response_payload['data']['id']
def _get_job_run_status(job_run_id):
res = requests.get(
url=f"https://cloud.getdbt.com/api/v2/accounts/{ACCOUNT_ID}/runs/{job_run_id}/",
headers={'Authorization': f"Token {API_KEY}"},
)
res.raise_for_status()
response_payload = res.json()
return response_payload['data']['status']
def run():
job_run_id = _trigger_job()
print(f"job_run_id = {job_run_id}")
while True:
time.sleep(5)
status = _get_job_run_status(job_run_id)
print(f"status = {status}")
if status == DbtJobRunStatus.SUCCESS:
break
elif status == DbtJobRunStatus.ERROR or status == DbtJobRunStatus.CANCELLED:
raise Exception("Failure!")
if __name__ == '__main__':
run()
Ideas for using this script
- Triggering a Continuous Integration job when you open a pull request in BitBucket or Azure DevOps. (Note: with GitHub and GitLab, this can be done in dbt Cloud with the
Run On Pull Request
feature!) - Triggering a Production job when you merge a pull request in to the
main
branch.
Related topics
- @matt.winkler describes how to use Airflow to orchestrate dbt Cloud and Fivetran.
-
@DAVOS shows us how to use
curl
commands to trigger the dbt Cloud API within CircleCI.