changed macro into model and built first fct_ model to check for dag issues

This commit is contained in:
graciegoheen
2022-01-11 13:53:51 -05:00
parent f3aca96ba2
commit 2c5672a2fa
5 changed files with 143 additions and 93 deletions

View File

@@ -1,93 +0,0 @@
{% macro get_dependencies() %}
{%- set sql -%}
-- one record for each node in the DAG (models and sources) and its direct parent
with direct_relationships as (
{%- for model in graph.nodes.values() | selectattr("resource_type", "equalto", "model") -%}
{%- set outer_loop = loop -%}
{%- if model.depends_on.nodes|length == 0 -%}
select
'{{model.name}}' as model,
NULL as direct_parent,
NULL as direct_parent_type {# if this field still useful? if not could simply by looping through depends_on.nodes instead of refs & sources seperately #}
{%- else -%}
{%- for model_parent in model.refs -%}
select
'{{model.name}}' as model,
'{{model_parent.0}}' as direct_parent,
'model' as direct_parent_type
{% if not loop.last %}union all{% endif %}
{% endfor -%}
{%- for source_parent in model.sources -%}
{% if loop.first and model.refs|length > 0 %}union all{% endif %}
select
'{{model.name}}' as model,
'{{source_parent.0}}.{{source_parent.1}}' as direct_parent,
'source' as direct_parent_type
{% if not loop.last %}union all{% endif %}
{% endfor -%}
{%- endif -%}
{% if not outer_loop.last %}union all{% endif %}
{% endfor -%}
{%- for source in graph.sources.values() -%}
{% if loop.first and graph.nodes|length > 0 %}union all{% endif %}
select
'{{source.source_name}}.{{source.name}}' as model,
NULL as direct_parent,
NULL as direct_parent_type
{% if not loop.last %}union all{% endif %}
{% endfor -%}
),
-- recursive CTE
-- one record for every node and each of its downstream children (including itself)
all_relationships as (
-- anchor
select distinct
model as parent,
model as child,
0 as distance,
array_construct(child) as path {# snowflake-specific, but helpful for troubleshooting right now #}
from direct_relationships
-- where direct_parent is null {# optional lever to change filtering of anchor clause to only include root nodes #}
union all
-- recursive clause
select
all_relationships.parent as parent,
direct_relationships.model as child,
all_relationships.distance+1 as distance,
array_append(all_relationships.path, direct_relationships.model) as path
from direct_relationships
inner join all_relationships
on all_relationships.child = direct_relationships.direct_parent
)
select * from all_relationships
order by parent, distance
{%- endset -%}
{% do log(sql, info=true) %}
{% endmacro %}

9
models/audit/dag/dag.yml Normal file
View File

@@ -0,0 +1,9 @@
version: 2
models:
- name: stg_dag_relationships
description: "This table shows one record for every node and each of its downstream children (including itself)."
- name: fct_source_fanout
description: "This table shows each parent/child relationship where a source is the direct parent of multiple nodes in the DAG."
tests:
- is_empty

View File

@@ -0,0 +1,28 @@
-- this model finds cases where a source is used in multiple direct downstream models
with direct_source_relationships as (
select
*
from {{ ref('stg_dag_relationships') }}
where distance = 1
and parent_type = 'source'
),
source_fanout as (
select
parent,
count(*)
from direct_source_relationships
group by 1
having count(*) > 1
),
final as (
select
direct_source_relationships.*
from direct_source_relationships
inner join source_fanout
on direct_source_relationships.parent = source_fanout.parent
order by direct_source_relationships.parent
)
select * from final

View File

@@ -0,0 +1,95 @@
-- TO DO: only include ENABLED nodes
-- TO DO: exclude models that are part of the audit package
-- can use package_name attribute in final version
-- TO DO: fix whitespace
-- one record for each node in the DAG (models and sources) and its direct parent
with direct_relationships as (
{%- for model in graph.nodes.values() | selectattr("resource_type", "equalto", "model") -%}
{%- set outer_loop = loop -%}
{%- if model.depends_on.nodes|length == 0 -%}
select
'{{model.name}}' as node,
'{{model.unique_id}}' as node_id,
'model' as node_type,
NULL as direct_parent_id
{%- else -%}
{%- for model_parent in model.depends_on.nodes -%}
select
'{{model.name}}' as node,
'{{model.unique_id}}' as node_id,
'model' as node_type,
'{{model_parent}}' as direct_parent_id
{% if not loop.last %}union all{% endif %}
{% endfor -%}
{%- endif %}
{% if not outer_loop.last %}union all{% endif %}
{% endfor -%}
{%- for source in graph.sources.values() -%}
{% if loop.first and graph.nodes|length > 0 %}union all{% endif %}
select
'{{source.source_name}}.{{source.name}}' as node,
'{{source.unique_id}}' as node_id,
'source' as node_type,
NULL as direct_parent_id
{% if not loop.last %}union all{% endif %}
{% endfor -%}
),
-- recursive CTE
-- one record for every node and each of its downstream children (including itself)
all_relationships as (
-- anchor
select distinct
node as parent,
node_id as parent_id,
node_type as parent_type,
node as child,
node_id as child_id,
0 as distance,
array_construct(child) as path {# snowflake-specific, but helpful for troubleshooting right now #}
from direct_relationships
-- where direct_parent is null {# optional lever to change filtering of anchor clause to only include root nodes #}
union all
-- recursive clause
select
all_relationships.parent as parent,
all_relationships.parent_id as parent_id,
all_relationships.parent_type as parent_type,
direct_relationships.node as child,
direct_relationships.node_id as child_id,
all_relationships.distance+1 as distance,
array_append(all_relationships.path, direct_relationships.node) as path
from direct_relationships
inner join all_relationships
on all_relationships.child_id = direct_relationships.direct_parent_id
),
final as (
select
parent,
parent_type,
child,
distance,
path
from all_relationships
)
select * from final
order by parent, distance

View File

@@ -0,0 +1,11 @@
{% test is_empty(model) %}
{{ config (
severity = 'warn',
fail_calc = "n_records"
) }}
select count(*) as n_records
from {{ model }}
{% endtest %}