dbt Fusion and Deferred Schema Cache

Intro

I have recently been able to produce the full defer schema cash that dbt Fusion uses for various analysis from the existing catalog.json dbt artifact produced from the docs generate command in dbt Core. I did this as a proof of concept and because I need the cashed schemas so that I could get an error free compile in dbt Fusion. I think that something like this should be a native part of dbt Fusion and integrated with the existing defer options. The rest of this post will go into details on my reasoning, the how of getting it to work and the code that I used to make it work.

The Why

In my dbt project, we use only dbt Core, I do not have access to every source and model that is defined, but dbt Fusion needs information about all of those nodes schema so that it can properly analyze the SQL and let the dbt extension work its magic. But we have service accounts that do have access to all of the nodes and those service accounts are used to produce our docs site as well as run our CI and production jobs. So if I want the extension to be able to work its magic then I will need all of that schema information and I may not be able to query it at the time of development so I will need a way to use the service accounts to get the information.

The How

When I first encountered this it felt similar to the problem that defer solves: I don’t have that table locally use that one over there. We currently use the manifest.json produced by our service accounts when generating our dbt docs site to defer to during local development. In the case of the schemas it would be more : I can’t query that table but here is a source of the information that is needed. The manifest.json artifact does not have the column data types explicitly, but the catalog.json artifact has the full schema and column data types and was already being produced and stored in the same location as the manifest.json.

So getting the information was going to be simple enough, but dbt Fusion is expecting the information in a specific format, specifically a schema only parquet file in a specific directory structure. Another trouble is that custom structures are used in the production parquet files and they are not, at least I could not find it, part of the public Fusion repository. So it became a game of guess and check to try and reverse engineer a parquet file that dbt Fusion would accept. Here I have to thank Sophie Li on the dbt Slack for poking at this and getting me pointed in the right direction.

In addition to that I was not getting where I wanted to with python so I switched over to rust and hand to get that all set up. I was trying to minimize the differences between my parquet files an the ones produced by dbt Fusion.

The Outcome

With all of that in place I am able to copy the catalog.json artifact produced by one of our service accounts into my local development environment and then run the rust script to create all of the parquet files that dbt Fusion would need to match the defer target. dbt Fusion accepts the parquet files and all of the warnings about missing schemas are gone when I run a full compile with a defer target.

So as a proof of concept it works. What I would like is for this to just work natively as part of the defer option. The state flag is already pointing to a directory and is already expecting other dbt artifacts so while I may be myopic to this the structure of the solution, it just seems to fit. I am open to whatever the structure is but it feels like this should be an option with dbt Fusion as a whole.

The Code

I do not yet have a repository for this code set up as it just lives in my Fusion test branch at the moment but I am including the code as part of this discussion so that others can see what was done and maybe make use of it for there own needs. In full transparency I use generative AI tools to help produce and modify the code, I know generally what all parts of it are doing but I am by no means an expert in rust and I do not know if this is the best way to do this. My project is using the Snowflake database so all of the code is geared for that, it may not work with everything in Snowflake I have only tested in on my project. But if you have questions please reach out to me on the dbt community Slack and I will do what I can to answer them.

  • build_reference_db - Generates zero-record Parquet files from dbt catalog metadata
  • read_parquet - Inspects and analyzes Parquet file metadata and structure
build_reference_db
use std::collections::HashMap;
use std::fs::{self, File};
use std::io::{BufReader, Write};
use std::path::Path;
use std::sync::Arc;

use arrow::array::{ArrayRef, FixedSizeListArray, PrimitiveArray, StringArray, TimestampNanosecondArray};
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use arrow::record_batch::RecordBatch;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use serde_json::{Map, Value};

//# Build and run
//cargo build --release
//cargo run --release --bin build_reference_db
//
//# Or for development
//cargo run --bin build_reference_db
//
//# Run tests
//cargo test
//

/// Maps Snowflake data types to Arrow data types
fn map_snowflake_to_arrow_type(snowflake_type: &str) -> DataType {
    let snowflake_type = snowflake_type.to_uppercase();
    
    // Handle parameterized types with Rust-optimized mappings
    if snowflake_type.starts_with("VARCHAR") 
        || snowflake_type.starts_with("STRING") 
        || snowflake_type.starts_with("TEXT") {
        DataType::Utf8
    } else if snowflake_type.starts_with("NUMBER") || snowflake_type.starts_with("DECIMAL") {
        // Extract precision and scale if available
        if let Some(start) = snowflake_type.find('(') {
            if let Some(end) = snowflake_type.find(')') {
                let params_str = &snowflake_type[start + 1..end];
                let params: Vec<&str> = params_str.split(',').collect();
                
                if params.len() == 2 {
                    if let (Ok(precision), Ok(scale)) = (params[0].trim().parse::<u8>(), params[1].trim().parse::<i8>()) {
                        return DataType::Decimal128(precision, scale);
                    }
                } else if params.len() == 1 {
                    if let Ok(precision) = params[0].trim().parse::<u8>() {
                        return DataType::Decimal128(precision, 0);
                    }
                }
            }
        }
        // Default NUMBER without parameters - use int64 for better performance
        DataType::Decimal128(18, 0)
    } else if snowflake_type.starts_with("TIMESTAMP_NTZ") {
        DataType::FixedSizeList(Arc::new(Field::new_list_field(
            DataType::Timestamp(TimeUnit::Nanosecond, None),
            true,
        ).with_name("timestamp_ntz:9")), 1)       
    } else if snowflake_type.starts_with("TIMESTAMP_LTZ") || snowflake_type.starts_with("TIMESTAMP_TZ") {
        // Create nested structure for TIMESTAMP_TZ with timestamp and offset_minutes
        let timestamp_tz_struct = DataType::Struct(vec![
            Field::new("timestamp", DataType::Timestamp(TimeUnit::Nanosecond, None), false),
            Field::new("offset_minutes", DataType::Int32, false),
        ].into());
        
        DataType::FixedSizeList(Arc::new(Field::new_list_field(
            timestamp_tz_struct,
            true,
        ).with_name("timestamp_tz:9")), 1)
    } else if snowflake_type.starts_with("TIMESTAMP") {
        DataType::Timestamp(TimeUnit::Nanosecond, None)
    } else if snowflake_type.starts_with("DATE") {
        DataType::Date32
    } else if snowflake_type.starts_with("TIME") {
        DataType::Time64(TimeUnit::Nanosecond)
    } else if snowflake_type.starts_with("BOOLEAN") {
        DataType::Boolean
    } else if snowflake_type.starts_with("BINARY") || snowflake_type.starts_with("VARBINARY") {
        DataType::Binary
    } else if snowflake_type.starts_with("FLOAT") || snowflake_type.starts_with("DOUBLE") {
        DataType::Float64
    } else if snowflake_type.starts_with("INTEGER") || snowflake_type.starts_with("BIGINT") {
        DataType::Int64
    } else if snowflake_type.starts_with("SMALLINT") {
        DataType::Int16
    } else if snowflake_type.starts_with("TINYINT") {
        DataType::Int8
    //} else if snowflake_type.starts_with("ARRAY") {
    //    // Rust-optimized fixed size list type
    //    DataType::FixedSizeList(Arc::new(Field::new_list_field(
    //        DataType::Utf8,
    //        true,
    //    ).with_name("array")), 1)
    } else if snowflake_type.starts_with("OBJECT") || snowflake_type.starts_with("VARIANT") || snowflake_type.starts_with("ARRAY") {
        DataType::FixedSizeList(Arc::new(Field::new_list_field(
            DataType::Utf8,
            true,
        ).with_name("variant")), 1)
    } else if snowflake_type.starts_with("GEOGRAPHY") || snowflake_type.starts_with("GEOMETRY") {
        DataType::Utf8 // Store as WKT string
    } else {
        // Default fallback
        DataType::Utf8
    }
}

/// Creates an empty array for the given data type
fn create_empty_array(data_type: &DataType) -> Result<ArrayRef, Box<dyn std::error::Error>> {
    match data_type {
        DataType::Utf8 => Ok(Arc::new(StringArray::from(Vec::<Option<String>>::new()))),
        DataType::Int64 => Ok(Arc::new(PrimitiveArray::<arrow::datatypes::Int64Type>::from(Vec::<Option<i64>>::new()))),
        DataType::Int32 => Ok(Arc::new(PrimitiveArray::<arrow::datatypes::Int32Type>::from(Vec::<Option<i32>>::new()))),
        DataType::Int16 => Ok(Arc::new(PrimitiveArray::<arrow::datatypes::Int16Type>::from(Vec::<Option<i16>>::new()))),
        DataType::Int8 => Ok(Arc::new(PrimitiveArray::<arrow::datatypes::Int8Type>::from(Vec::<Option<i8>>::new()))),
        DataType::Float64 => Ok(Arc::new(PrimitiveArray::<arrow::datatypes::Float64Type>::from(Vec::<Option<f64>>::new()))),
        DataType::Boolean => Ok(Arc::new(arrow::array::BooleanArray::from(Vec::<Option<bool>>::new()))),
        DataType::Date32 => Ok(Arc::new(PrimitiveArray::<arrow::datatypes::Date32Type>::from(Vec::<Option<i32>>::new()))),
        DataType::Time64(TimeUnit::Microsecond) => Ok(Arc::new(PrimitiveArray::<arrow::datatypes::Time64MicrosecondType>::from(Vec::<Option<i64>>::new()))),
        DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
            let array = TimestampNanosecondArray::from(Vec::<Option<i64>>::new());
            if tz.is_some() {
                Ok(Arc::new(array.with_timezone_opt(tz.clone())))
            } else {
                Ok(Arc::new(array))
            }
        },
        DataType::Binary => {
            // Create empty binary array using the builder pattern
            let mut builder = arrow::array::BinaryBuilder::new();
            Ok(Arc::new(builder.finish()))
        },
        DataType::Decimal128(precision, scale) => {
            Ok(Arc::new(arrow::array::Decimal128Array::from(Vec::<Option<i128>>::new())
                .with_precision_and_scale(*precision, *scale)?))
        },
        DataType::FixedSizeList(field, size) => {
            // Create empty fixed size list array
            let inner_array = create_empty_array(field.data_type())?;
            let fixed_size_list_array = FixedSizeListArray::new(
                Arc::new(field.as_ref().clone()),
                *size,
                inner_array,
                None,
            );
            Ok(Arc::new(fixed_size_list_array))
        },
        DataType::Struct(fields) => {
            // Create empty struct array
            let mut struct_arrays: Vec<ArrayRef> = Vec::new();
            for field in fields.iter() {
                let empty_array = create_empty_array(field.data_type())?;
                struct_arrays.push(empty_array);
            }
            Ok(Arc::new(arrow::array::StructArray::new(
                fields.clone(),
                struct_arrays,
                None,
            )))
        },
        _ => Err(format!("Unsupported data type: {:?}", data_type).into()),
    }
}

/// Creates a zero-record parquet file with the given schema
fn create_rust_optimized_parquet(
    schema: Schema,
    output_path: &Path,
) -> Result<(), Box<dyn std::error::Error>> {
    // Create empty arrays for each field
    let mut arrays: Vec<ArrayRef> = Vec::new();
    for field in schema.fields() {
        let empty_array = create_empty_array(field.data_type())?;
        arrays.push(empty_array);
    }
    
    // Create empty record batch
    let record_batch = RecordBatch::try_new(Arc::new(schema), arrays)?;
    
    // Create parquet file with optimized settings
    let file = File::create(output_path)?;
    let props = WriterProperties::builder()
        .set_compression(parquet::basic::Compression::SNAPPY)
        .set_dictionary_enabled(true)
        .set_statistics_enabled(parquet::file::properties::EnabledStatistics::Chunk)
        .build();
    
    let mut writer = ArrowWriter::try_new(file, record_batch.schema(), Some(props))?;
    writer.write(&record_batch)?;
    writer.close()?;
    
    Ok(())
}

/// Processes the dbt catalog.json and generates zero-record parquet files
fn process_dbt_catalog_and_schemas(
    catalog_path: &str,
    output_base_path: &str,
    log_file: &mut File,
) -> Result<usize, Box<dyn std::error::Error>> {
    // Create output directory if it doesn't exist
    fs::create_dir_all(output_base_path)?;
    
    // Load the catalog.json
    let file = File::open(catalog_path)?;
    let reader = BufReader::new(file);
    let catalog_data: Value = serde_json::from_reader(reader)?;
    
    // Extract nodes and sources from the catalog
    let empty_map = Map::new();
    let nodes = catalog_data.get("nodes").and_then(|v| v.as_object()).unwrap_or(&empty_map);
    let sources = catalog_data.get("sources").and_then(|v| v.as_object()).unwrap_or(&empty_map);
    
    // Combine nodes and sources for processing
    let mut all_resources = HashMap::new();
    for (key, value) in nodes.iter().chain(sources.iter()) {
        all_resources.insert(key.clone(), value.clone());
    }
    
    let mut files_created = 0;
    
    for (unique_id, resource_info) in all_resources.iter() {
        // We are interested in models and sources that have columns defined
        if let Some(columns) = resource_info.get("columns").and_then(|v| v.as_object()) {
            if !columns.is_empty() {
                // Determine the database, schema and table name
                let metadata = resource_info.get("metadata").and_then(|v| v.as_object());
                let database_name = metadata
                    .and_then(|m| m.get("database"))
                    .and_then(|v| v.as_str())
                    .unwrap_or("UNKNOWN_DATABASE");
                let schema_name = metadata
                    .and_then(|m| m.get("schema"))
                    .and_then(|v| v.as_str());
                let table_name = metadata
                    .and_then(|m| m.get("name"))
                    .and_then(|v| v.as_str());
                
                if schema_name.is_none() || table_name.is_none() {
                    writeln!(log_file, "Skipping resource {}: Missing schema or table name in catalog metadata.", unique_id)?;
                    continue;
                }
                
                let schema_name = schema_name.unwrap();
                let table_name = table_name.unwrap();
                
                // Create directory structure: DATABASE_NAME/schema_name/table_name/
                let table_output_dir = Path::new(output_base_path)
                    .join(database_name)
                    .join(schema_name.to_lowercase())
                    .join(table_name.to_lowercase());
                fs::create_dir_all(&table_output_dir)?;
                
                // Output file is always named 'output.parquet' within the table directory
                let output_parquet_file = table_output_dir.join("output.parquet");
                
                // Build Arrow schema from catalog columns
                let mut fields = Vec::new();
                for (col_name, col_info) in columns.iter() {
                    let data_type = col_info
                        .get("type")
                        .and_then(|v| v.as_str())
                        .unwrap_or("STRING");
                    let arrow_type = map_snowflake_to_arrow_type(data_type);
                    fields.push(Field::new(col_name, arrow_type, true));
                }
                
                if !fields.is_empty() {
                    let schema = Schema::new(fields);
                    
                    match create_rust_optimized_parquet(schema, &output_parquet_file) {
                        Ok(_) => {
                            writeln!(
                                log_file,
                                "Generated Rust-native zero-record Parquet for {}.{}.{} with {} columns at {}",
                                database_name,
                                schema_name,
                                table_name,
                                columns.len(),
                                output_parquet_file.display()
                            )?;
                            files_created += 1;
                        }
                        Err(e) => {
                            writeln!(
                                log_file,
                                "Error creating Rust-native parquet for {}.{}.{}: {}",
                                database_name, schema_name, table_name, e
                            )?;
                        }
                    }
                } else {
                    writeln!(
                        log_file,
                        "No columns found for {}.{}.{}, skipping Parquet generation.",
                        database_name, schema_name, table_name
                    )?;
                }
            }
        }
    }
    
    Ok(files_created)
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let catalog_file_path = "reference_state/catalog.json";
    let output_directory = "target/db/schemas"; // This will be created in the script's execution directory
    let log_file_path = "target/build_reference_db.log";
    
    // Create target directory if it doesn't exist
    fs::create_dir_all("target")?;
    
    // Create log file
    let mut log_file = File::create(log_file_path)?;
    writeln!(log_file, "Starting build_reference_db process...")?;
    writeln!(log_file, "Catalog file: {}", catalog_file_path)?;
    writeln!(log_file, "Output directory: {}", output_directory)?;
    writeln!(log_file, "Log file: {}", log_file_path)?;
    writeln!(log_file, "---")?;
    
    let files_created = process_dbt_catalog_and_schemas(catalog_file_path, output_directory, &mut log_file)?;
    
    writeln!(log_file, "---")?;
    writeln!(log_file, "Process completed. Total files created: {}", files_created)?;
    
    // Only output the count to stdout
    println!("Process completed. Total files created: {} Output directory: {}", files_created,output_directory);
    
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use arrow::datatypes::TimeUnit;
    
    #[test]
    fn test_basic_snowflake_to_arrow_type_mapping() {
        // String types
        assert_eq!(map_snowflake_to_arrow_type("VARCHAR"), DataType::Utf8);
        assert_eq!(map_snowflake_to_arrow_type("STRING"), DataType::Utf8);
        assert_eq!(map_snowflake_to_arrow_type("TEXT"), DataType::Utf8);
        assert_eq!(map_snowflake_to_arrow_type("VARCHAR(255)"), DataType::Utf8);
        
        // Integer types
        assert_eq!(map_snowflake_to_arrow_type("INTEGER"), DataType::Int64);
        assert_eq!(map_snowflake_to_arrow_type("BIGINT"), DataType::Int64);
        assert_eq!(map_snowflake_to_arrow_type("SMALLINT"), DataType::Int16);
        assert_eq!(map_snowflake_to_arrow_type("TINYINT"), DataType::Int8);
        
        // Floating point types
        assert_eq!(map_snowflake_to_arrow_type("FLOAT"), DataType::Float64);
        assert_eq!(map_snowflake_to_arrow_type("DOUBLE"), DataType::Float64);
        
        // Boolean type
        assert_eq!(map_snowflake_to_arrow_type("BOOLEAN"), DataType::Boolean);
        
        // Date and time types
        assert_eq!(map_snowflake_to_arrow_type("DATE"), DataType::Date32);
        assert_eq!(map_snowflake_to_arrow_type("TIME"), DataType::Time64(TimeUnit::Nanosecond));
        assert_eq!(map_snowflake_to_arrow_type("TIMESTAMP"), DataType::Timestamp(TimeUnit::Nanosecond, None));
        
        // Binary types
        assert_eq!(map_snowflake_to_arrow_type("BINARY"), DataType::Binary);
        assert_eq!(map_snowflake_to_arrow_type("VARBINARY"), DataType::Binary);
        
        // Geospatial types (stored as strings)
        assert_eq!(map_snowflake_to_arrow_type("GEOGRAPHY"), DataType::Utf8);
        assert_eq!(map_snowflake_to_arrow_type("GEOMETRY"), DataType::Utf8);
    }
    
    #[test]
    fn test_number_type_mapping() {
        // Default NUMBER without parameters
        assert_eq!(map_snowflake_to_arrow_type("NUMBER"), DataType::Decimal128(18, 0));
        assert_eq!(map_snowflake_to_arrow_type("DECIMAL"), DataType::Decimal128(18, 0));
        
        // NUMBER with precision only
        assert_eq!(map_snowflake_to_arrow_type("NUMBER(10)"), DataType::Decimal128(10, 0));
        
        // NUMBER with precision and scale
        assert_eq!(map_snowflake_to_arrow_type("NUMBER(18,2)"), DataType::Decimal128(18, 2));
        assert_eq!(map_snowflake_to_arrow_type("DECIMAL(10,3)"), DataType::Decimal128(10, 3));
    }
    
    #[test]
    fn test_timestamp_ntz_type_mapping() {
        let timestamp_ntz_type = map_snowflake_to_arrow_type("TIMESTAMP_NTZ");
        match &timestamp_ntz_type {
            DataType::FixedSizeList(field, size) => {
                assert_eq!(field.data_type(), &DataType::Timestamp(TimeUnit::Nanosecond, None));
                assert_eq!(*size, 1);
                assert_eq!(field.name(), "timestamp_ntz:9");
            }
            _ => panic!("Expected FixedSizeList type for TIMESTAMP_NTZ, got {:?}", timestamp_ntz_type),
        }
    }
    
    #[test]
    fn test_timestamp_tz_type_mapping() {
        let timestamp_tz_type = map_snowflake_to_arrow_type("TIMESTAMP_TZ");
        match &timestamp_tz_type {
            DataType::FixedSizeList(field, size) => {
                assert_eq!(*size, 1);
                assert_eq!(field.name(), "timestamp_tz:9");
                
                match field.data_type() {
                    DataType::Struct(struct_fields) => {
                        assert_eq!(struct_fields.len(), 2);
                        assert_eq!(struct_fields[0].name(), "timestamp");
                        assert_eq!(struct_fields[0].data_type(), &DataType::Timestamp(TimeUnit::Nanosecond, None));
                        assert_eq!(struct_fields[1].name(), "offset_minutes");
                        assert_eq!(struct_fields[1].data_type(), &DataType::Int32);
                    }
                    _ => panic!("Expected Struct type inside FixedSizeList for TIMESTAMP_TZ"),
                }
            }
            _ => panic!("Expected FixedSizeList type for TIMESTAMP_TZ, got {:?}", timestamp_tz_type),
        }
        
        // Test TIMESTAMP_LTZ as well
        let timestamp_ltz_type = map_snowflake_to_arrow_type("TIMESTAMP_LTZ");
        assert_eq!(timestamp_ltz_type, timestamp_tz_type);
    }
    
    #[test]
    fn test_variant_and_array_type_mapping() {
        // Test ARRAY type
        let array_type = map_snowflake_to_arrow_type("ARRAY");
        match &array_type {
            DataType::FixedSizeList(field, size) => {
                assert_eq!(field.data_type(), &DataType::Utf8);
                assert_eq!(*size, 1);
                assert_eq!(field.name(), "variant");
            }
            _ => panic!("Expected FixedSizeList type for ARRAY, got {:?}", array_type),
        }
        
        // Test OBJECT and VARIANT types
        let object_type = map_snowflake_to_arrow_type("OBJECT");
        let variant_type = map_snowflake_to_arrow_type("VARIANT");
        assert_eq!(object_type, array_type);
        assert_eq!(variant_type, array_type);
    }
    
    #[test]
    fn test_case_insensitive_mapping() {
        // Test that type mapping is case insensitive
        assert_eq!(map_snowflake_to_arrow_type("varchar"), DataType::Utf8);
        assert_eq!(map_snowflake_to_arrow_type("VARCHAR"), DataType::Utf8);
        assert_eq!(map_snowflake_to_arrow_type("VarChar"), DataType::Utf8);
        
        assert_eq!(map_snowflake_to_arrow_type("boolean"), DataType::Boolean);
        assert_eq!(map_snowflake_to_arrow_type("BOOLEAN"), DataType::Boolean);
        assert_eq!(map_snowflake_to_arrow_type("Boolean"), DataType::Boolean);
    }
    
    #[test]
    fn test_unknown_type_fallback() {
        // Test that unknown types fall back to Utf8
        assert_eq!(map_snowflake_to_arrow_type("UNKNOWN_TYPE"), DataType::Utf8);
        assert_eq!(map_snowflake_to_arrow_type("CUSTOM_TYPE"), DataType::Utf8);
        assert_eq!(map_snowflake_to_arrow_type(""), DataType::Utf8);
    }
    
    #[test]
    fn test_create_empty_array_basic_types() -> Result<(), Box<dyn std::error::Error>> {
        // Test string array creation
        let string_array = create_empty_array(&DataType::Utf8)?;
        assert_eq!(string_array.len(), 0);
        assert_eq!(string_array.data_type(), &DataType::Utf8);
        
        // Test integer array creation
        let int64_array = create_empty_array(&DataType::Int64)?;
        assert_eq!(int64_array.len(), 0);
        assert_eq!(int64_array.data_type(), &DataType::Int64);
        
        // Test boolean array creation
        let bool_array = create_empty_array(&DataType::Boolean)?;
        assert_eq!(bool_array.len(), 0);
        assert_eq!(bool_array.data_type(), &DataType::Boolean);
        
        Ok(())
    }
    
    #[test]
    fn test_create_empty_array_complex_types() -> Result<(), Box<dyn std::error::Error>> {
        // Test decimal array creation
        let decimal_array = create_empty_array(&DataType::Decimal128(18, 2))?;
        assert_eq!(decimal_array.len(), 0);
        assert_eq!(decimal_array.data_type(), &DataType::Decimal128(18, 2));
        
        // Test timestamp array creation
        let timestamp_array = create_empty_array(&DataType::Timestamp(TimeUnit::Nanosecond, None))?;
        assert_eq!(timestamp_array.len(), 0);
        assert_eq!(timestamp_array.data_type(), &DataType::Timestamp(TimeUnit::Nanosecond, None));
        
        Ok(())
    }
}
read_parquet
use std::env;
use std::fs::File;
use std::path::Path;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::schema::printer;

//# Build and run
//cd transform/snowflake-dbt
//cargo build --release
//cargo run --release --bin build_reference_db
//
//# Or for development
//cargo run --bin build_reference_db
//
//# Run tests
//cargo test
//

fn print_parquet_metadata(filename: &str) -> Result<(), Box<dyn std::error::Error>> {
    // Check if file exists
    if !Path::new(filename).exists() {
        return Err(format!("Error: Parquet file '{}' not found.", filename).into());
    }

    // Open the Parquet file
    let file = File::open(filename)?;
    let reader = SerializedFileReader::new(file)?;

    // 1. File-level metadata
    let metadata = reader.metadata();
    let file_metadata = metadata.file_metadata();
    let schema = file_metadata.schema_descr();
    
    println!("--- File Metadata ---");
    println!("Number of columns: {}", schema.num_columns());
    
    // Calculate total rows across all row groups
    let total_rows: i64 = (0..metadata.num_row_groups())
        .map(|i| metadata.row_group(i).num_rows())
        .sum();
    println!("Number of rows: {}", total_rows);
    
    println!("Number of row groups: {}", metadata.num_row_groups());
    println!("Format version: {}", file_metadata.version());
    if let Some(created_by) = file_metadata.created_by() {
        println!("Created by: {}", created_by);
    }

    // 2. Schema information
    println!("\n--- Schema ---");
    printer::print_schema(&mut std::io::stdout(), schema.root_schema());

    // 3. Row group and column-level metadata
    for i in 0..metadata.num_row_groups() {
        let row_group_metadata = metadata.row_group(i);
        println!("\n--- Row Group {} Metadata ---", i);
        println!("Number of rows in row group: {}", row_group_metadata.num_rows());
        println!("Total byte size of row group: {}", row_group_metadata.total_byte_size());

        for j in 0..row_group_metadata.num_columns() {
            let column_chunk_metadata = row_group_metadata.column(j);
            let column_descriptor = schema.column(j);
            let column_name = column_descriptor.name();
            println!("  --- Column '{}' (Index {}) Metadata ---", column_name, j);
            println!("  Physical type: {:?}", column_chunk_metadata.column_type());
            println!("  Compression: {:?}", column_chunk_metadata.compression());
            println!("  Total compressed size: {}", column_chunk_metadata.compressed_size());
            println!("  Total uncompressed size: {}", column_chunk_metadata.uncompressed_size());
            println!("  Number of values: {}", column_chunk_metadata.num_values());
            
            // Print statistics if available (using non-deprecated methods)
            if let Some(statistics) = column_chunk_metadata.statistics() {
                let has_min_max = statistics.min_bytes_opt().is_some() && statistics.max_bytes_opt().is_some();
                println!("  Has min/max: {}", has_min_max);
                
                if let Some(null_count) = statistics.null_count_opt() {
                    println!("  Null count: {}", null_count);
                }
                
                if let Some(distinct_count) = statistics.distinct_count_opt() {
                    println!("  Distinct count: {}", distinct_count);
                }
            }
        }
    }

    Ok(())
}

fn main() {
    let args: Vec<String> = env::args().collect();
    
    let filename = if args.len() > 1 {
        &args[1]
    } else {
        // Default file path from the Python script
        ""
    };

    match print_parquet_metadata(filename) {
        Ok(()) => {},
        Err(e) => eprintln!("An error occurred: {}", e),
    }
}
Cargo.toml
[package]
name = "build_reference_db"
version = "0.1.0"
edition = "2021"

[[bin]]
name = "build_reference_db"
path = "build_reference_db.rs"

[[bin]]
name = "read_parquet"
path = "read_parquet.rs"

[dependencies]
arrow = "55.0.0"
parquet = { version = "55.0.0", features = ["arrow"] }
serde_json = "1.0"
tokio = { version = "1.0", features = ["full"] }

[profile.release]
opt-level = 3
lto = true
codegen-units = 1
panic = "abort"

README.md (7.0 KB)