Running dbt in Composer using a KubernetesPodOperator

Quick disclaimer: We use GCP and the solution is based on GCP only

We’ve been using dbt for a while now and have had a few deployment choices. I’d like to explain how we went about getting the Airflow using a KubernetesPodOperator choice working and also give a brief explanation as to why.

Let’s start with the why.

Cloud Build Triggers and Cloud Scheduler

This was our first choice and we have a few models running using this strategy. This works well if you have no idea about Composer or Airflow.
This works well if you have a set schedule to run your dbt models but can’t trigger loads based non-dbt ingestion. This works well if you don’t need any smarts in the scheduling process, such as checking that the previous load has completed before you start another load. This works well if you don’t exceed the limitation of Cloud Scheduler.
Plan B…

Composer using a virtual environment

Why run in a virtual environment and not install as a PyPI package? The dependencies of dbt PyPI package have conflicts with Composer’s. Direct installation of dbt in Composer causes failure. And depending on the version of Composer the conflicts vary.
The choice bypassed all the issues that Cloud Scheduler had. We can trigger the dbt DAG via pub/sub message or schedule it. Current implementation is to create a temporary virtual environment with dbt installed for each dbt job. The virtual environment is destroyed when dbt job finished.
Why stop using this option? Running dbt job in virtual environment created in Composer is causing security threats. This Again the answer is simple, security. Google support is void installing software into an virtual environment bypassing security check.

And that is how we ended up with Plan C.

Composer using a KubernetesPodOperator

Since the dbt models are docker containerised, dependency conflict issue will not be a problem.
As the KubernetesPodOperator will automatically spin up the pods to run the docker image, we do not need to manage the infrastructure.
This option will require monitoring on the Composer resource usage. And we need to scale it up or scale it out if it is needed. However, we have already been doing this as part of support.

To-Do List:

  1. Create a base Docker image Artifact of containing dbt, python, git and gcloud.
  2. Create a Docker image Artifact of your dbt repo.
  3. Set up Workload Identity.
  4. Use the KubernetesPodOperator in your DAG.

Build a base dbt Artifact

We decided to use a separate docker image that contains all the “installs” to execute a dbt command. This means that there is a central image for updating versions and also compilation time for docker image using this dbt docker image is much faster.

To enable this, we created a base_dbt_docker repo with the following files:

  1. A docker file
  2. A requirements.txt file
  3. A yaml file to create a docker image of your repo in the Artifact repository

Docker file

Feel free to try alpine instead of ubuntu. I had issues with getting gcloud and python installed in alpine.

FROM ubuntu:20.04
COPY . /app/

RUN apt-get update 
# Install gcloud
RUN echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg]   cloud-sdk main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list
RUN apt-get install apt-transport-https ca-certificates gnupg curl -y
RUN curl https://packages.cloud.google.com/apt/doc/apt-key.gpg  | apt-key --keyring /usr/share/keyrings/cloud.google.gpg add -
RUN apt-get update && apt-get install google-cloud-sdk -y
# Install git
RUN apt-get install -y git
# Install Python
RUN apt-get install --no-install-recommends -y python3.8 python3-pip 
# Install dbt
RUN python3 -m pip install -r /app/requirements.txt

Requirements file

This is a simple file containing the version of dbt to install dbt>=0.18.1

YAML file

The naming format for an Artifact is LOCATION-docker.pkg.dev/PROJECT-ID/REPOSITORY/IMAGE. For more information, have a look at this link. Our location is australia-southeast1, please adjust accordingly.

steps:
- name: 'gcr.io/cloud-builders/docker'
  entrypoint: 'bash'
  args: ['-c', 'docker build -t ${_ARTIFACT_NAME} .']

- name: 'gcr.io/cloud-builders/docker'
  entrypoint: 'bash'
  args: ['-c', 'docker tag ${_ARTIFACT_NAME} australia-southeast1-docker.pkg.dev/${_PROJECT_ID}/${_ARTIFACT_NAME}/${_IMAGE_NAME}']
  
- name: 'gcr.io/cloud-builders/docker'
  entrypoint: 'bash'
  args: ['-c', 'docker push australia-southeast1-docker.pkg.dev/${_PROJECT_ID}/${_ARTIFACT_NAME}/${_IMAGE_NAME}']

substitutions:
  _PROJECT_ID: $PROJECT_ID

Create a Cloud Build trigger to use this yaml to build the Artifact.
image

Remember that each time there is a change in the repo, you need to rebuild the Artifact, just run the Cloud Build trigger again. You’ll need to rebuild the dbt project Artifact too.

Build a dbt project Artifact

You will need the following files in your repo:

  1. A docker file
  2. A shell script that will execute your dbt commands
  3. A yaml file to create a docker image of your repo in the Artifact repository

Docker file

Note that the docker file points to the base dbt Artifact you had just created, see the FROM.

ARG project_id
ARG base_artifact
ARG base_image

FROM australia-southeast1-docker.pkg.dev/${project_id}/${base_artifact}/${base_image}
COPY . /app/
RUN echo "dbt via docker"

RUN chmod +x /app/dbt_run.sh
WORKDIR /app/
ENTRYPOINT ["/app/dbt_run.sh" ]`

Shell script

The .sh file executes the dbt commands and currently caters for dbt run and dbt debug.

#!/bin/bash
mode=$1
dbt_target=$2
dbt_models=$3
dbt_vars=$4
full_refresh=$5

cd /app/

if [ $mode = "run" ]; then
    echo "dbt Mode is run"
    if [ $full_refresh = "True" ]; then
        echo "Doing a full refresh"
        dbt run --target="$dbt_target" --models="$dbt_models" --vars="$dbt_vars" --profiles-dir=/app/profiles_dir --full-refresh
    else
        echo "Not doing a full refresh"
        dbt run --target="$dbt_target" --models="$dbt_models" --vars="$dbt_vars" --profiles-dir=/app/profiles_dir
    fi
elif [ $mode = "debug" ]; then
    echo "dbt Mode is debug"
    dbt debug --profiles-dir=/app/profiles_dir --target="$dbt_target"
else   
    echo "Incorrect dbt Mode. Nothing to do"
fi

# capture the exit code from the dbt run command
# so that the final exit code form removing virtualenv cmd doesn't get used by KubernetesPodOperator 
exit_code=$?

# rethrowing the exit code to KubernetesPodOperator
exit $exit_code 

Note the correlation between the COPY . /app/ from the docker file and the cd /app/ in the .sh file

YAML file

steps:
- name: 'gcr.io/cloud-builders/docker'
  entrypoint: 'bash'
  args: ['-c', 'docker build --build-arg "_PROJECT_ID=${_PROJECT_ID}" --build-arg "base_artifact=${_BASE_ARTIFACT}" --build-arg "base_image=${_BASE_IMAGE}" -t ${_REPO_NAME} .']

- name: 'gcr.io/cloud-builders/docker'
  entrypoint: 'bash'
  args: ['-c', 'docker tag ${_REPO_NAME} australia-southeast1-docker.pkg.dev/${_PROJECT_ID}/${_REPO_NAME}/${_IMAGE_NAME}']
  
- name: 'gcr.io/cloud-builders/docker'
  entrypoint: 'bash'
  args: ['-c', 'docker push australia-southeast1-docker.pkg.dev/${_PROJECT_ID}/${_REPO_NAME}/${_IMAGE_NAME}']

  substitutions:
    _PROJECT_ID: $PROJECT_ID

Create a Cloud Build trigger to use this yaml to build the Artifact.
image

Remember that each time there is a change in the repo, you need to rebuild the Artifact, just run the Cloud Build trigger again.

Workload Identity

What is Workload Identity? It is basically a way to make a Kubernetes service account have the same credentials as a Google service account. Or as Google puts it, “Workload Identity is the recommended way to access Google Cloud services from applications running within GKE due to its improved security properties and manageability.
Here is the link to Google’s documentation. It gives an in-depth explanation of Workload Identity and has an easy-to-follow set of steps to get Workload Identity up and running. Below are the steps we took, a condensed version.

Steps (these steps can be done with Terraform too):

  1. Update a cluster with Workload Identity enabled. (Another option is creating a new cluster)
    gcloud container clusters update <cluster_name> --zone australia-southeast1-a --project=<project_id> --workload-pool=<gcp_project_id>.svc.id.goog
  2. Add a new node pool to the cluster with Workload Identity enabled. (Another option is to modify an existing node pool but that it not recommended)
    gcloud container node-pools create <node_pool_name> --zone australia-southeast1-a --project=<project_id> --cluster=<cluster_name> --workload-metadata=GKE_METADATA --machine-type=<pick_a_type default is e2-medium>
  3. Authenticating to Google Cloud
    – Configure kubectl to communicate with the cluster
    gcloud container clusters get-credentials <cluster_name> --zone australia-southeast1-a --project=<project_id>
    – Create the namespace to use for the Kubernetes service account
    kubectl create namespace <k8s_namespace>
    – Create the Kubernetes service account
    kubectl create serviceaccount --namespace <k8s_namespace> <k8s_sa>
    – Allow the Kubernetes service account to impersonate the Google service account by creating an IAM policy binding between the two
    gcloud iam service-accounts add-iam-policy-binding --role roles/iam.workloadIdentityUser --member "serviceAccount:<gcp_project_id>.svc.id.goog[<k8s_namespace>/<k8s_sa>]" <gcp_sa> --project=<project_id>
    – Add the iam.gke.io/gcp-service-account annotation to the Kubernetes service account
    kubectl annotate serviceaccount --namespace <k8s_namespace> <k8s_sa> iam.gke.io/gcp-service-account=<gcp_sa>

KubernetesPodOperator

Here is the link to Google’s documentation.
We use the same mechanism that trigger and schedule the Virtual environment DAG. I’m not explaining that mechanism but just as a final step I’m giving you the minimum requirements for your KubernetesPodOperator DAG.

from airflow import DAG
from airflow import models
# Make sure to add below
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'dummy',
    'retries': 0,
}

docker_dag = DAG(
    dag_id='dbt_docker_test',
    default_args=args,
    schedule_interval=None,
    start_date=days_ago(1),
    catchup=False,
    tags=['dbt run model'],
)

quay_k8s = KubernetesPodOperator(
    # k8s namespace created earlier
    namespace='kube-pod', 
    # k8s service account name created earlier
    service_account_name='ap-kube-pod-sa', 
    # Ensures that the right node-pool is used
    nodeSelector={'http://cloud.google.com/gke-nodepool ': 'k8sworkload-pool'}, 
    # Ensures that cache is always refreshed
    image_pull_policy='Always', 
    # Artifact image of dbt repo
    image='australia-southeast1-docker.pkg.dev/<gcp-project-id>/<artifact>/<artifact-image>', 
    # links to ENTRYPOINT in .sh file
    cmds=['/app/dbt_run.sh'], 
    # matches sequence of arguments in .sh file (mode,dbt_target,dbt_models,dbt_vars,full_refresh)
    arguments=['run', 'projectzone', 'tag:test2', '{from_date: "", to_date: ""}','False'], 
    name="run-dbt-in-pod",
    task_id="run-dbt-in-pod",
    get_logs=True,
    dag=docker_dag,    
    )

The key to making sure that the DAG use the k8s service account is the service_account_name argument. The documentation suggests using kubernetes secrets but that doesn’t work for our purposes.
We also found that the Artifact image isn’t always updated and that a cached version is used. Using argument image_pull_policy='Always' solves this issue.

And there you have it. Follow these steps, put the DAG in the correct folder in Composer and you’ll have dbt running using a KubernetesPodOperator.

Questions?
Feel free to ask and I’ll respond as best I can.

Thank you!

I’d like to thank my fellow contributors in getting this solution to work.
@ryanyuan - Composer, Airflow and general GCP maestro
@Phil_Xu - kubernetes maestro

1 Like