dbtw
#!/bin/bash
# Required Parameter -------------------------------------------------
export DBT_IMPERSONATE_SA='dbt-user@<gcp-project>.iam.gserviceaccount.com'
# Optional Parameter -------------------------------------------------
# export DBT_LOG_PATH='logs'
# -------------------------------------------------
export DBT_PROFILES_DIR=$(cd "$(dirname "$0")" && pwd)
export DBT_TARGET=${TARGET:-"dev"}
CURRENT_BRANCH="$(cd "$DBT_PROFILE_DIR" && git branch --show-current)"
if [ $DBT_TARGET = 'dev' ]; then
export DBT_SCHEMA_PREFIX="$(echo "__dbt__${DBT_TARGET}__${CURRENT_BRANCH}__" | sed -e 's/[\/-]/_/g')"
export DBT_MATERIALIZATION='view'
fi
# -------------------------------------------------
if [ $1 = '--drop-dev-schemas' ]; then
if [ $DBT_TARGET != 'dev' ]; then
echo "Can't drop dataset on target=${DBT_TARGET}. It works target='dev' only."
exit -1
fi
DROP_SCHEMA_PREFIX=${2:-$DBT_SCHEMA_PREFIX}
if [ $(echo $DROP_SCHEMA_PREFIX | grep '^__dbt__dev__..*' | wc -l) -le 0 ]; then
echo "Not matched."
exit -1
fi
DATASETS=$(gcloud alpha bq datasets list --all --page-size=1024 --limit=1024 --impersonate-service-account="${DBT_IMPERSONATE_SA}" \
| grep "${DROP_SCHEMA_PREFIX}" | sed -r 's/[: ]+/,/g' \
| awk -F, '{print "\"`" $1 "." $2 "`\""}' | jq -s -c
)
if [[ $(echo ${DATASETS} | jq -r '.[]' | wc -l) -le 0 ]]; then
echo "No dataset matched to '${DROP_SCHEMA_PREFIX}'. "
exit
fi
echo "DROP schemas ..."
echo ${DATASETS} | jq -r '.[]'
echo ""
env | grep "^DBT_"
echo ""
ARGS="{schemas: ${DATASETS}}"
dbt run-operation drop_dev_schemas --arg "${ARGS}" --target "${DBT_TARGET}"
echo "Completed."
exit
fi
# run dbt command ----------------------------------
env | grep "^DBT_"
echo ""
echo dbt "$@" --target "${DBT_TARGET}"
echo ""
dbt "$@" --target "${DBT_TARGET}"
profiles.yml
dbt_project:
target: prod
outputs:
dev:
type: bigquery
method: oauth
project: "<gcp-project>"
dataset: "default-dataset"
impersonate_service_account: "{{ env_var('DBT_IMPERSONATE_SA') }}"
priority: interactive
fixed_retries: 1
location: US
threads: 10
timeout_seconds: 300
prod:
type: bigquery
method: oauth
project: "<gcp-project>"
dataset: "default-dataset"
impersonate_service_account: "{{ env_var('DBT_IMPERSONATE_SA') }}"
priority: interactive
fixed_retries: 1
location: US
threads: 10
timeout_seconds: 300
config:
send_anonymous_usage_stats: False
dbt_project.yml
name: 'dbt_project'
version: '1.2.0'
config-version: 2
profile: 'dbt_project'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
- "target"
- "dbt_packages"
log-path: "{{ env_var('DBT_LOG_PATH', 'logs') }}"
# Configuring models
models:
dbt_project:
staging:
+schema: sample_stg
+materialized: ephemeral
base:
+materialized: ephemeral
marts:
+schema: sample_marts
+materialized: "{{ env_var('DBT_MATERIALIZATION', 'table') }}"
get_custom_schema.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ env_var('DBT_SCHEMA_PREFIX', '') ~ default_schema | trim}}
{%- else -%}
{{ env_var('DBT_SCHEMA_PREFIX', '') ~ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}
drop_dev_schemas.sql
{% macro drop_dev_schemas(schemas) %}
{%- call statement('states', fetch_result=True) -%}
{% for schema in schemas %}
{{ log("DROP SCHEMA IF EXISTS " ~ schema ~ " CASCADE;", info=True) }}
DROP SCHEMA IF EXISTS {{ schema }} CASCADE;
{% endfor %}
{%- endcall -%}
{% endmacro %}