I am using the DbtCloudRunJobOperator in a managed (Google Cloud Composer) Apache Airflow instance to trigger jobs in dbt Cloud. While the dbt Cloud jobs themselves run successfully and complete without any issues, the DbtCloudRunJobOperator fails to recognize the successful completion and reports the task as failed. This issue seems to occur consistently, regardless of the job configuration or dbt Cloud project settings.
Airflow version: 2.7.3
Composer version: 2.7.1
dbt Cloud API version: Latest
DbtCloudRunJobOperator pip package version: 3.8.0
trigger_dbt_job = DbtCloudRunJobOperator(
task_id="trigger_dbt_job",
job_id=job_id,
wait_for_termination=True, # Flag to wait on a job run’s termination.
check_interval=30, # The interval in seconds between each check for termination.
execution_timeout=timedelta(minutes=60), # The maximum amount of time to wait for the job to complete.
steps_override=steps_override,
retries=config.get(
"max_retries", 1
), # If max_retries is found in the config, it uses that value. Otherwise it uses 1.
)
I expected the DbtCloudRunJobOperator in Apache Airflow to successfully recognize and report when a dbt Cloud job completes successfully. The operator is configured to wait for the job’s termination and should ideally update the task status to “success” upon the successful completion of the dbt job. The expected behavior is for the Airflow UI to show the task as completed successfully, allowing subsequent tasks in the DAG to proceed as planned.
Trying to understand the issues more - before suggesting something.
Q: Does the dbt cloud job complete before timeout duration?
Just incase if you not aware of Alternatively solution that can be followed as per documentation here for long running dbt job -
, wait_for_termination can be set to False to perform an asynchronous wait (typically paired with the **DbtCloudJobRunSensor**). Setting wait_for_termination to False is a good approach for long-running dbt Cloud jobs.
Q: Does the dbt cloud job complete before timeout duration?
A: Yes, it does.
I have seen the documentation indeed. What qualifies as a long running job? Our jobs take at most 5 minutes.
I have set up the DbtCloudRunJobOperator + DbtCloudJobRunSensor. However, also the DbtCloudJobRunSensor fails inexplicably even though the job finishes with a succes state.
trigger_task = DbtCloudRunJobOperator(
task_id=f"trigger_{config_file_name_underscore}",
job_id=job_id,
wait_for_termination=False,
steps_override=steps_override,
retries=config.get("max_retries", 5),
reuse_existing_run=False,
retry_delay=timedelta(minutes=2),
retry_exponential_backoff=True, # Allow progressively longer waits between retries by using exponential backoff algorithm on retry delay (delay will be converted into seconds).
max_retry_delay=timedelta(hours=5),
owner=config.get("creator", "data_engineers"),
doc_md="When failed, check the logs in DBT Cloud for more information. You can safely retry the task if the error was transient. To retry, click on the task and then click on the 'Clear Task' button.",
)
sensor_task = DbtCloudJobRunSensor(
task_id=f"job_completion_{config_file_name_underscore}",
run_id="{{ ti.xcom_pull(task_ids='"
+ trigger_task.task_id
+ "', key='return_value') }}",
poke_interval=60,
exponential_backoff=True, # Allow progressive longer waits between pokes by using exponential backoff algorithm
mode="poke",
retries=100,
soft_fail=True, # Set to true to mark the task as SKIPPED on failure. This way the DAG can continue.
owner=config.get("creator", "data_engineers"),
doc_md="When failed, check the logs in DBT Cloud for more information. You can safely retry the task if the preceding (trigger) task has successfully run and the job was successfull in DBT Cloud. To retry, click on the task and then click on the 'Clear Task' button.",
)