I’m trying to run a python model to do a data transformation that is much easier with Pandas than SQL. When I run the python code in Snowflake, the Snowpark dataframe is returned properly. When I run the model as a dbt model in the dbt Cloud IDE, the table builds as expected. However, when I try to run the job in dbt Cloud as part of our CI job to get it merged into the main branch, the model returns an error. Our CI command is a simple dbt run --full-refresh --select state:modified+
The code is as follows:
import snowflake.snowpark.functions as F
import pandas as pd
def model(dbt, session):
dbt.config(
materialized="table",
packages=["pandas==1.5.3"],
schema='test'
)
# Your code goes here, inside the "main" handler.
conversions = dbt.ref('<table_name>')
conv_df = conversions.select('A', 'B', 'TIME_PERIOD', 'C', 'D', 'E', 'F').collect()
conv_df = pd.DataFrame(conv_df)
conv_df['TIME_PERIOD'] = pd.to_datetime(conv_df['TIME_PERIOD'])
min_date = conv_df['TIME_PERIOD'].min()
max_date = session.sql('select current_date()').collect()[0][0]
daterange = pd.date_range(min_date, max_date, freq='D')
resampled = (
conv_df
.set_index('TIME_PERIOD')
.groupby(['A','B', 'C','D',])[['E','F']]
.resample('D')
.sum()
)
resampled = (
resampled
.reset_index(['A','B', 'C','D',])
.groupby(['A','B', 'C','D',])['E', 'F;]
.apply(lambda x: x.reindex(daterange, fill_value=0))
)
resampled.index.rename(['A', 'B', 'C', 'D','TIME_PERIOD'], inplace=True)
resampled = resampled.reset_index()
resampled['TIME_PERIOD'] = pd.to_datetime(resampled['TIME_PERIOD'],yearfirst=True).dt.tz_localize('UTC')
df = session.create_dataframe(resampled)
# work around for custom schema name error found in different post
session.use_database(dbt.this.database)
session.use_schema(dbt.this.schema)
return df
And the error is:
10:53:39 File "pandas/_libs/index.pyx", line 165, in pandas._libs.index.IndexEngine.get_loc
10:53:39 File "pandas/_libs/hashtable_class_helper.pxi", line 5745, in pandas._libs.hashtable.PyObjectHashTable.get_item
10:53:39 File "pandas/_libs/hashtable_class_helper.pxi", line 5753, in pandas._libs.hashtable.PyObjectHashTable.get_item
10:53:39 KeyError: 'TIME_PERIOD'
10:53:39
10:53:39 The above exception was the direct cause of the following exception:
10:53:39
10:53:39 Traceback (most recent call last):
10:53:39 File "_udf_code.py", line 135, in main
10:53:39 File "_udf_code.py", line 25, in model
10:53:39 File "/usr/lib/python_udf/edfa6f38f4b4116d7991a94348dbf203754adb1287fd9fe99430b0992dfaeade/lib/python3.8/site-packages/pandas/core/frame.py", line 3807, in __getitem__
10:53:39 indexer = self.columns.get_loc(key)
10:53:39 File "/usr/lib/python_udf/edfa6f38f4b4116d7991a94348dbf203754adb1287fd9fe99430b0992dfaeade/lib/python3.8/site-packages/pandas/core/indexes/base.py", line 3804, in get_loc
10:53:39 raise KeyError(key) from err
10:53:39 KeyError: 'TIME_PERIOD'
The error code would suggest a Pandas error however the Pandas version is fixed at 1.5.3 both in the dbt code and there are no issues when running in Snowflake or via the IDE.
Can anyone suggest anything I may be overlooking when trying to debug this as I can’t currently seem to find any way to fix it?
Thanks in advance
UPDATE 17th August
This model builds if I bypass the CI commands and build straight into our main database so it would seem to be something with the CI command dbt run --full-refresh --select state:modified+