mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-19 18:51:28 +00:00
Compare commits
7 Commits
enable-pos
...
ct-2644-re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ed53fe36a0 | ||
|
|
aa4bfb1826 | ||
|
|
ba7ac8e513 | ||
|
|
98cbdac2f0 | ||
|
|
36053d11b2 | ||
|
|
8cc2092246 | ||
|
|
2cb65912a5 |
6
.changes/unreleased/Fixes-20230621-185452.yaml
Normal file
6
.changes/unreleased/Fixes-20230621-185452.yaml
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
kind: Fixes
|
||||||
|
body: Replace use of sqlparse by hand construction of ephemeral CTEs
|
||||||
|
time: 2023-06-21T18:54:52.246578-04:00
|
||||||
|
custom:
|
||||||
|
Author: gshank
|
||||||
|
Issue: "7791"
|
||||||
@@ -181,7 +181,6 @@ class Linker:
|
|||||||
self.add_node(source.unique_id)
|
self.add_node(source.unique_id)
|
||||||
for semantic_model in manifest.semantic_models.values():
|
for semantic_model in manifest.semantic_models.values():
|
||||||
self.add_node(semantic_model.unique_id)
|
self.add_node(semantic_model.unique_id)
|
||||||
|
|
||||||
for node in manifest.nodes.values():
|
for node in manifest.nodes.values():
|
||||||
self.link_node(node, manifest)
|
self.link_node(node, manifest)
|
||||||
for exposure in manifest.exposures.values():
|
for exposure in manifest.exposures.values():
|
||||||
@@ -301,62 +300,6 @@ class Compiler:
|
|||||||
relation_cls = adapter.Relation
|
relation_cls = adapter.Relation
|
||||||
return relation_cls.add_ephemeral_prefix(name)
|
return relation_cls.add_ephemeral_prefix(name)
|
||||||
|
|
||||||
def _inject_ctes_into_sql(self, sql: str, ctes: List[InjectedCTE]) -> str:
|
|
||||||
"""
|
|
||||||
`ctes` is a list of InjectedCTEs like:
|
|
||||||
|
|
||||||
[
|
|
||||||
InjectedCTE(
|
|
||||||
id="cte_id_1",
|
|
||||||
sql="__dbt__cte__ephemeral as (select * from table)",
|
|
||||||
),
|
|
||||||
InjectedCTE(
|
|
||||||
id="cte_id_2",
|
|
||||||
sql="__dbt__cte__events as (select id, type from events)",
|
|
||||||
),
|
|
||||||
]
|
|
||||||
|
|
||||||
Given `sql` like:
|
|
||||||
|
|
||||||
"with internal_cte as (select * from sessions)
|
|
||||||
select * from internal_cte"
|
|
||||||
|
|
||||||
This will spit out:
|
|
||||||
|
|
||||||
"with __dbt__cte__ephemeral as (select * from table),
|
|
||||||
__dbt__cte__events as (select id, type from events),
|
|
||||||
with internal_cte as (select * from sessions)
|
|
||||||
select * from internal_cte"
|
|
||||||
|
|
||||||
(Whitespace enhanced for readability.)
|
|
||||||
"""
|
|
||||||
if len(ctes) == 0:
|
|
||||||
return sql
|
|
||||||
|
|
||||||
parsed_stmts = sqlparse.parse(sql)
|
|
||||||
parsed = parsed_stmts[0]
|
|
||||||
|
|
||||||
with_stmt = None
|
|
||||||
for token in parsed.tokens:
|
|
||||||
if token.is_keyword and token.normalized == "WITH":
|
|
||||||
with_stmt = token
|
|
||||||
break
|
|
||||||
|
|
||||||
if with_stmt is None:
|
|
||||||
# no with stmt, add one, and inject CTEs right at the beginning
|
|
||||||
first_token = parsed.token_first()
|
|
||||||
with_stmt = sqlparse.sql.Token(sqlparse.tokens.Keyword, "with")
|
|
||||||
parsed.insert_before(first_token, with_stmt)
|
|
||||||
else:
|
|
||||||
# stmt exists, add a comma (which will come after injected CTEs)
|
|
||||||
trailing_comma = sqlparse.sql.Token(sqlparse.tokens.Punctuation, ",")
|
|
||||||
parsed.insert_after(with_stmt, trailing_comma)
|
|
||||||
|
|
||||||
token = sqlparse.sql.Token(sqlparse.tokens.Keyword, ", ".join(c.sql for c in ctes))
|
|
||||||
parsed.insert_after(with_stmt, token)
|
|
||||||
|
|
||||||
return str(parsed)
|
|
||||||
|
|
||||||
def _recursively_prepend_ctes(
|
def _recursively_prepend_ctes(
|
||||||
self,
|
self,
|
||||||
model: ManifestSQLNode,
|
model: ManifestSQLNode,
|
||||||
@@ -431,7 +374,7 @@ class Compiler:
|
|||||||
|
|
||||||
_add_prepended_cte(prepended_ctes, InjectedCTE(id=cte.id, sql=sql))
|
_add_prepended_cte(prepended_ctes, InjectedCTE(id=cte.id, sql=sql))
|
||||||
|
|
||||||
injected_sql = self._inject_ctes_into_sql(
|
injected_sql = inject_ctes_into_sql(
|
||||||
model.compiled_code,
|
model.compiled_code,
|
||||||
prepended_ctes,
|
prepended_ctes,
|
||||||
)
|
)
|
||||||
@@ -582,3 +525,69 @@ class Compiler:
|
|||||||
if write:
|
if write:
|
||||||
self._write_node(node)
|
self._write_node(node)
|
||||||
return node
|
return node
|
||||||
|
|
||||||
|
|
||||||
|
def inject_ctes_into_sql(sql: str, ctes: List[InjectedCTE]) -> str:
|
||||||
|
"""
|
||||||
|
`ctes` is a list of InjectedCTEs like:
|
||||||
|
|
||||||
|
[
|
||||||
|
InjectedCTE(
|
||||||
|
id="cte_id_1",
|
||||||
|
sql="__dbt__cte__ephemeral as (select * from table)",
|
||||||
|
),
|
||||||
|
InjectedCTE(
|
||||||
|
id="cte_id_2",
|
||||||
|
sql="__dbt__cte__events as (select id, type from events)",
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
Given `sql` like:
|
||||||
|
|
||||||
|
"with internal_cte as (select * from sessions)
|
||||||
|
select * from internal_cte"
|
||||||
|
|
||||||
|
This will spit out:
|
||||||
|
|
||||||
|
"with __dbt__cte__ephemeral as (select * from table),
|
||||||
|
__dbt__cte__events as (select id, type from events),
|
||||||
|
internal_cte as (select * from sessions)
|
||||||
|
select * from internal_cte"
|
||||||
|
|
||||||
|
(Whitespace enhanced for readability.)
|
||||||
|
"""
|
||||||
|
if len(ctes) == 0:
|
||||||
|
return sql
|
||||||
|
|
||||||
|
parsed_stmts = sqlparse.parse(sql)
|
||||||
|
parsed = parsed_stmts[0]
|
||||||
|
|
||||||
|
with_stmt = None
|
||||||
|
for token in parsed.tokens:
|
||||||
|
if token.is_keyword and token.normalized == "WITH":
|
||||||
|
with_stmt = token
|
||||||
|
break
|
||||||
|
|
||||||
|
if with_stmt is None:
|
||||||
|
# no with stmt, add one, and inject CTEs right at the beginning
|
||||||
|
# [original_sql]
|
||||||
|
first_token = parsed.token_first()
|
||||||
|
with_token = sqlparse.sql.Token(sqlparse.tokens.Keyword, "with")
|
||||||
|
parsed.insert_before(first_token, with_token)
|
||||||
|
# [with][original_sql]
|
||||||
|
joined_ctes = ", ".join(c.sql for c in ctes) + " "
|
||||||
|
token = sqlparse.sql.Token(sqlparse.tokens.Keyword, joined_ctes)
|
||||||
|
parsed.insert_after(with_token, token)
|
||||||
|
# [with][joined_ctes][original_sql]
|
||||||
|
else:
|
||||||
|
# stmt exists, add a comma (which will come after injected CTEs)
|
||||||
|
# [with][original_sql]
|
||||||
|
joined_ctes = ", ".join(c.sql for c in ctes)
|
||||||
|
joined_ctes_token = sqlparse.sql.Token(sqlparse.tokens.Keyword, joined_ctes)
|
||||||
|
parsed.insert_after(with_stmt, joined_ctes_token)
|
||||||
|
# [with][joined_ctes][original_sql]
|
||||||
|
comma_token = sqlparse.sql.Token(sqlparse.tokens.Punctuation, ", ")
|
||||||
|
parsed.insert_after(joined_ctes_token, comma_token)
|
||||||
|
# [with][joined_ctes][, ][original_sql]
|
||||||
|
|
||||||
|
return str(parsed)
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ setup(
|
|||||||
# ----
|
# ----
|
||||||
# There is a difficult-to-reproduce bug in sqlparse==0.4.4 for ephemeral model compilation
|
# There is a difficult-to-reproduce bug in sqlparse==0.4.4 for ephemeral model compilation
|
||||||
# For context: dbt-core#7396 + dbt-core#7515
|
# For context: dbt-core#7396 + dbt-core#7515
|
||||||
"sqlparse>=0.2.3,<0.4.4",
|
"sqlparse~=0.4.4",
|
||||||
# ----
|
# ----
|
||||||
# These are major-version-0 packages also maintained by dbt-labs. Accept patches.
|
# These are major-version-0 packages also maintained by dbt-labs. Accept patches.
|
||||||
"dbt-extractor~=0.4.1",
|
"dbt-extractor~=0.4.1",
|
||||||
|
|||||||
@@ -1,10 +1,11 @@
|
|||||||
import json
|
import json
|
||||||
import pathlib
|
import pathlib
|
||||||
import pytest
|
import pytest
|
||||||
|
import re
|
||||||
|
|
||||||
from dbt.cli.main import dbtRunner
|
from dbt.cli.main import dbtRunner
|
||||||
from dbt.exceptions import DbtRuntimeError, TargetNotFoundError
|
from dbt.exceptions import DbtRuntimeError, TargetNotFoundError
|
||||||
from dbt.tests.util import run_dbt, run_dbt_and_capture
|
from dbt.tests.util import run_dbt, run_dbt_and_capture, read_file
|
||||||
from tests.functional.compile.fixtures import (
|
from tests.functional.compile.fixtures import (
|
||||||
first_model_sql,
|
first_model_sql,
|
||||||
second_model_sql,
|
second_model_sql,
|
||||||
@@ -16,9 +17,13 @@ from tests.functional.compile.fixtures import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_lines(model_name):
|
def norm_whitespace(string):
|
||||||
from dbt.tests.util import read_file
|
_RE_COMBINE_WHITESPACE = re.compile(r"\s+")
|
||||||
|
string = _RE_COMBINE_WHITESPACE.sub(" ", string).strip()
|
||||||
|
return string
|
||||||
|
|
||||||
|
|
||||||
|
def get_lines(model_name):
|
||||||
f = read_file("target", "compiled", "test", "models", model_name + ".sql")
|
f = read_file("target", "compiled", "test", "models", model_name + ".sql")
|
||||||
return [line for line in f.splitlines() if line]
|
return [line for line in f.splitlines() if line]
|
||||||
|
|
||||||
@@ -88,21 +93,22 @@ class TestEphemeralModels:
|
|||||||
def test_no_selector(self, project):
|
def test_no_selector(self, project):
|
||||||
run_dbt(["compile"])
|
run_dbt(["compile"])
|
||||||
|
|
||||||
assert get_lines("first_ephemeral_model") == ["select 1 as fun"]
|
sql = read_file("target", "compiled", "test", "models", "first_ephemeral_model.sql")
|
||||||
assert get_lines("second_ephemeral_model") == [
|
assert norm_whitespace(sql) == norm_whitespace("select 1 as fun")
|
||||||
"with __dbt__cte__first_ephemeral_model as (",
|
sql = read_file("target", "compiled", "test", "models", "second_ephemeral_model.sql")
|
||||||
"select 1 as fun",
|
expected_sql = """with __dbt__cte__first_ephemeral_model as (
|
||||||
")select * from __dbt__cte__first_ephemeral_model",
|
select 1 as fun
|
||||||
]
|
) select * from __dbt__cte__first_ephemeral_model"""
|
||||||
assert get_lines("third_ephemeral_model") == [
|
assert norm_whitespace(sql) == norm_whitespace(expected_sql)
|
||||||
"with __dbt__cte__first_ephemeral_model as (",
|
sql = read_file("target", "compiled", "test", "models", "third_ephemeral_model.sql")
|
||||||
"select 1 as fun",
|
expected_sql = """with __dbt__cte__first_ephemeral_model as (
|
||||||
"), __dbt__cte__second_ephemeral_model as (",
|
select 1 as fun
|
||||||
"select * from __dbt__cte__first_ephemeral_model",
|
), __dbt__cte__second_ephemeral_model as (
|
||||||
")select * from __dbt__cte__second_ephemeral_model",
|
select * from __dbt__cte__first_ephemeral_model
|
||||||
"union all",
|
) select * from __dbt__cte__second_ephemeral_model
|
||||||
"select 2 as fun",
|
union all
|
||||||
]
|
select 2 as fun"""
|
||||||
|
assert norm_whitespace(sql) == norm_whitespace(expected_sql)
|
||||||
|
|
||||||
|
|
||||||
class TestCompile:
|
class TestCompile:
|
||||||
|
|||||||
147
tests/unit/test_inject_ctes.py
Normal file
147
tests/unit/test_inject_ctes.py
Normal file
@@ -0,0 +1,147 @@
|
|||||||
|
from dbt.compilation import inject_ctes_into_sql
|
||||||
|
from dbt.contracts.graph.nodes import InjectedCTE
|
||||||
|
import re
|
||||||
|
|
||||||
|
|
||||||
|
def norm_whitespace(string):
|
||||||
|
_RE_COMBINE_WHITESPACE = re.compile(r"\s+")
|
||||||
|
string = _RE_COMBINE_WHITESPACE.sub(" ", string).strip()
|
||||||
|
return string
|
||||||
|
|
||||||
|
|
||||||
|
def test_inject_ctes_0():
|
||||||
|
starting_sql = "select * from __dbt__cte__base"
|
||||||
|
ctes = [
|
||||||
|
InjectedCTE(
|
||||||
|
id="model.test.base",
|
||||||
|
sql=" __dbt__cte__base as (\n\n\nselect * from test16873767336887004702_test_ephemeral.seed\n)",
|
||||||
|
)
|
||||||
|
]
|
||||||
|
expected_sql = """with __dbt__cte__base as (
|
||||||
|
select * from test16873767336887004702_test_ephemeral.seed
|
||||||
|
) select * from __dbt__cte__base"""
|
||||||
|
|
||||||
|
generated_sql = inject_ctes_into_sql(starting_sql, ctes)
|
||||||
|
assert norm_whitespace(generated_sql) == norm_whitespace(expected_sql)
|
||||||
|
|
||||||
|
|
||||||
|
def test_inject_ctes_1():
|
||||||
|
starting_sql = "select * from __dbt__cte__ephemeral_level_two"
|
||||||
|
ctes = [
|
||||||
|
InjectedCTE(
|
||||||
|
id="model.test.ephemeral_level_two",
|
||||||
|
sql=' __dbt__cte__ephemeral_level_two as (\n\nselect * from "dbt"."test16873757769710148165_test_ephemeral"."source_table"\n)',
|
||||||
|
)
|
||||||
|
]
|
||||||
|
expected_sql = """with __dbt__cte__ephemeral_level_two as (
|
||||||
|
select * from "dbt"."test16873757769710148165_test_ephemeral"."source_table"
|
||||||
|
) select * from __dbt__cte__ephemeral_level_two"""
|
||||||
|
|
||||||
|
generated_sql = inject_ctes_into_sql(starting_sql, ctes)
|
||||||
|
assert norm_whitespace(generated_sql) == norm_whitespace(expected_sql)
|
||||||
|
|
||||||
|
|
||||||
|
def test_inject_ctes_2():
|
||||||
|
|
||||||
|
starting_sql = "select * from __dbt__cte__ephemeral"
|
||||||
|
ctes = [
|
||||||
|
InjectedCTE(
|
||||||
|
id="model.test.ephemeral_level_two",
|
||||||
|
sql=' __dbt__cte__ephemeral_level_two as (\n\nselect * from "dbt"."test16873735573223965828_test_ephemeral"."source_table"\n)',
|
||||||
|
),
|
||||||
|
InjectedCTE(
|
||||||
|
id="model.test.ephemeral",
|
||||||
|
sql=" __dbt__cte__ephemeral as (\n\nselect * from __dbt__cte__ephemeral_level_two\n)",
|
||||||
|
),
|
||||||
|
]
|
||||||
|
expected_sql = """with __dbt__cte__ephemeral_level_two as (
|
||||||
|
select * from "dbt"."test16873735573223965828_test_ephemeral"."source_table"
|
||||||
|
), __dbt__cte__ephemeral as (
|
||||||
|
select * from __dbt__cte__ephemeral_level_two
|
||||||
|
) select * from __dbt__cte__ephemeral"""
|
||||||
|
|
||||||
|
generated_sql = inject_ctes_into_sql(starting_sql, ctes)
|
||||||
|
assert norm_whitespace(generated_sql) == norm_whitespace(expected_sql)
|
||||||
|
|
||||||
|
|
||||||
|
def test_inject_ctes_3():
|
||||||
|
starting_sql = """select * from __dbt__cte__female_only
|
||||||
|
union all
|
||||||
|
select * from "dbt"."test16873757723266827902_test_ephemeral"."double_dependent" where gender = 'Male'"""
|
||||||
|
ctes = [
|
||||||
|
InjectedCTE(
|
||||||
|
id="model.test.base",
|
||||||
|
sql=" __dbt__cte__base as (\n\n\nselect * from test16873757723266827902_test_ephemeral.seed\n)",
|
||||||
|
),
|
||||||
|
InjectedCTE(
|
||||||
|
id="model.test.base_copy",
|
||||||
|
sql=" __dbt__cte__base_copy as (\n\n\nselect * from __dbt__cte__base\n)",
|
||||||
|
),
|
||||||
|
InjectedCTE(
|
||||||
|
id="model.test.female_only",
|
||||||
|
sql=" __dbt__cte__female_only as (\n\n\nselect * from __dbt__cte__base_copy where gender = 'Female'\n)",
|
||||||
|
),
|
||||||
|
]
|
||||||
|
expected_sql = """with __dbt__cte__base as (
|
||||||
|
select * from test16873757723266827902_test_ephemeral.seed
|
||||||
|
), __dbt__cte__base_copy as (
|
||||||
|
select * from __dbt__cte__base
|
||||||
|
), __dbt__cte__female_only as (
|
||||||
|
select * from __dbt__cte__base_copy where gender = 'Female'
|
||||||
|
) select * from __dbt__cte__female_only
|
||||||
|
union all
|
||||||
|
select * from "dbt"."test16873757723266827902_test_ephemeral"."double_dependent" where gender = 'Male'"""
|
||||||
|
|
||||||
|
generated_sql = inject_ctes_into_sql(starting_sql, ctes)
|
||||||
|
assert norm_whitespace(generated_sql) == norm_whitespace(expected_sql)
|
||||||
|
|
||||||
|
|
||||||
|
def test_inject_ctes_4():
|
||||||
|
starting_sql = """
|
||||||
|
with internal_cte as (select * from sessions)
|
||||||
|
select * from internal_cte
|
||||||
|
"""
|
||||||
|
ctes = [
|
||||||
|
InjectedCTE(
|
||||||
|
id="cte_id_1",
|
||||||
|
sql="__dbt__cte__ephemeral as (select * from table)",
|
||||||
|
),
|
||||||
|
InjectedCTE(
|
||||||
|
id="cte_id_2",
|
||||||
|
sql="__dbt__cte__events as (select id, type from events)",
|
||||||
|
),
|
||||||
|
]
|
||||||
|
expected_sql = """with __dbt__cte__ephemeral as (select * from table),
|
||||||
|
__dbt__cte__events as (select id, type from events),
|
||||||
|
internal_cte as (select * from sessions)
|
||||||
|
select * from internal_cte"""
|
||||||
|
|
||||||
|
generated_sql = inject_ctes_into_sql(starting_sql, ctes)
|
||||||
|
assert norm_whitespace(generated_sql) == norm_whitespace(expected_sql)
|
||||||
|
|
||||||
|
|
||||||
|
def test_inject_ctes_5():
|
||||||
|
starting_sql = """with my_other_cool_cte as (
|
||||||
|
select id, name from __dbt__cte__ephemeral
|
||||||
|
where id > 1000
|
||||||
|
)
|
||||||
|
select name, id from my_other_cool_cte"""
|
||||||
|
ctes = [
|
||||||
|
InjectedCTE(
|
||||||
|
id="model.singular_tests_ephemeral.ephemeral",
|
||||||
|
sql=' __dbt__cte__ephemeral as (\n\n\nwith my_cool_cte as (\n select name, id from "dbt"."test16873917221900185954_test_singular_tests_ephemeral"."base"\n)\nselect id, name from my_cool_cte where id is not null\n)',
|
||||||
|
)
|
||||||
|
]
|
||||||
|
expected_sql = """with __dbt__cte__ephemeral as (
|
||||||
|
with my_cool_cte as (
|
||||||
|
select name, id from "dbt"."test16873917221900185954_test_singular_tests_ephemeral"."base"
|
||||||
|
)
|
||||||
|
select id, name from my_cool_cte where id is not null
|
||||||
|
), my_other_cool_cte as (
|
||||||
|
select id, name from __dbt__cte__ephemeral
|
||||||
|
where id > 1000
|
||||||
|
)
|
||||||
|
select name, id from my_other_cool_cte"""
|
||||||
|
|
||||||
|
generated_sql = inject_ctes_into_sql(starting_sql, ctes)
|
||||||
|
assert norm_whitespace(generated_sql) == norm_whitespace(expected_sql)
|
||||||
Reference in New Issue
Block a user