Hello! I’m really excited to be able to use Python models, but I’ve been running into issues with incremental ones.
If I have a working model with materialized = "table"
and I change it to materialized = "incremental"
, shouldn’t that work fine even before adding in the correct if dbt.is_incremental:
logic?
For some reason I get this error when I just change table → incremental for a working model:
File "/tmp/d87435d6-edb3-4afa-84e7-04dae648adcf/query_consistency.py", line 5
create or replace table graph-mainnet.internal_metrics.query_consistency__dbt_tmp
^
IndentationError: unexpected indent
Any thoughts here? Has anyone else had issues with python incremental models? I’m on bigquery.
Code which works fine below (and does not work fine when going from table to incremental):
import requests
import pandas as pd
import json
import time
def model(dbt, session):
dbt.config(materialized = "table")
# ENTER THE SCHEMA TYPE YOU WANT TO GET ALL DATA FOR
schema_type = 'dex-amm'
# fetch the data from the deployment file
response = requests.get('https://raw.githubusercontent.com/messari/subgraphs/master/deployment/deployment.json')
subgraphs = response.json()
# create query
query = '''{
financialsDailySnapshots(orderBy: timestamp, orderDirection: desc, first: 365) {
cumulativeVolumeUSD
dailyProtocolSideRevenueUSD
totalValueLockedUSD
cumulativeTotalRevenueUSD
dailyTotalRevenueUSD
dailyVolumeUSD
timestamp
}
}'''
base_url = 'https://api.thegraph.com/subgraphs/name/messari/'
data = []
for project in subgraphs:
for deployment in subgraphs[project]['deployments']:
schema = subgraphs[project]['schema']
status = subgraphs[project]['deployments'][deployment]['status']
if status != 'prod' or schema != schema_type:
continue
if len(data) >= 4: # check if we've reached the subgraphs limit
break
try: # need this because not all have hosted-service field
slug = subgraphs[project]['deployments'][deployment]['services']['hosted-service']['slug']
except KeyError:
print(f"KeyError: unable to extract data from '{slug}' for '{project}'")
response = requests.post(base_url + slug, json={'query': query})
time.sleep(1)
if response.ok:
response_json = response.json()
headers = response.headers
timestamp_query = headers.get('Date')
try:
data.append((project, deployment, timestamp_query, response_json['data']['financialsDailySnapshots']))
print(f"Got data for: {slug}")
except KeyError:
print(f"KeyError: unable to extract data from '{slug}' for '{project}'")
else:
print(f'Request failed for {base_url + slug} with status {response.status_code}')
if len(data) >= 4: # check if we've reached the subgraphs limit
break
# create dataframe
df = pd.DataFrame(data, columns=['project', 'deployment', 'timestamp_query', 'data'])
# adjust df
df = df.explode('data')
df = pd.concat([df.drop(['data'], axis=1), df['data'].apply(pd.Series)], axis=1)
# convert timestamp
df['timestamp_query'] = df['timestamp_query'].drop_duplicates().reset_index(drop=True)
df['timestamp_query'] = pd.to_datetime(df['timestamp_query'], format='%a, %d %b %Y %H:%M:%S %Z')
# identifier
df['product'] = 'hosted_service'
# return result
return df