Compare commits

...

52 Commits

Author SHA1 Message Date
Jeremy Cohen
5cce911842 Ongoing experiment 2023-01-29 21:32:19 +01:00
lostmygithubaccount
158aa81b0c update per suggestions 2022-11-23 09:06:33 -08:00
lostmygithubaccount
5ddb088049 Merge remote-tracking branch 'origin/main' into cody/ibis 2022-11-22 21:54:05 -08:00
Gerda Shank
7d7066466d CT 1537 fix event test and rename a couple of fields (#6293)
* Rename MacroEvent to JinjaLog

* Rename ConnectionClosed/2

* Fix LogSeedResult

* Rename ConnectionLeftOpen events, fix test_events.py

* Update events README.md, add "category" to EventInfo

* Rename GeneralMacroWarning to JinjaLogWarning
2022-11-22 14:54:20 -05:00
Emily Rockman
517576c088 add back in conditional node length check (#6298) 2022-11-21 21:20:55 -08:00
lostmygithubaccount
3edc9e53ad initial implementation based on prql pr 2022-11-20 17:55:34 -08:00
leahwicz
987764858b Revert "Bump python from 3.10.7-slim-bullseye to 3.11.0-slim-bullseye in /docker (#6180)" (#6281)
This reverts commit 8e28f5906e.
2022-11-17 09:14:22 -05:00
FishtownBuildBot
a235abd176 Add new index.html and changelog yaml files from dbt-docs (#6265) 2022-11-16 17:00:33 +01:00
dependabot[bot]
9297e4d55c Update pathspec requirement from ~=0.9.0 to >=0.9,<0.11 in /core (#5917)
* Update pathspec requirement from ~=0.9.0 to >=0.9,<0.11 in /core

Updates the requirements on [pathspec](https://github.com/cpburnz/python-pathspec) to permit the latest version.
- [Release notes](https://github.com/cpburnz/python-pathspec/releases)
- [Changelog](https://github.com/cpburnz/python-pathspec/blob/master/CHANGES.rst)
- [Commits](https://github.com/cpburnz/python-pathspec/compare/v0.9.0...v0.10.1)

---
updated-dependencies:
- dependency-name: pathspec
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>

* Add automated changelog yaml from template for bot PR

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Github Build Bot <buildbot@fishtownanalytics.com>
2022-11-15 22:02:37 -05:00
Michelle Ark
eae98677b9 s/gitlab/github for flake8 precommit repo (#6252) 2022-11-15 10:30:00 -05:00
Matthew McKnight
66ac107409 [CT-1262] Convert dbt_debug (#6125)
* init pr for dbt_debug test conversion

* removal of old test

* minor test format change

* add new Base class and Test classes

* reformatting test, new method for capsys and error messgae to check, todo fix badproject

* refomatting tests, ready for review

* checking yaml file, and small reformat

* modifying since update wasn't working in ci/cd
2022-11-14 14:22:48 -06:00
Michelle Ark
39c5c42215 converting 044_test_run_operations (#6122)
* converting 044_test_run_operations
2022-11-14 10:39:57 -05:00
dependabot[bot]
9f280a8469 Update colorama requirement from <0.4.6,>=0.3.9 to >=0.3.9,<0.4.7 in /core (#6144)
* Update colorama requirement in /core

Updates the requirements on [colorama](https://github.com/tartley/colorama) to permit the latest version.
- [Release notes](https://github.com/tartley/colorama/releases)
- [Changelog](https://github.com/tartley/colorama/blob/master/CHANGELOG.rst)
- [Commits](https://github.com/tartley/colorama/compare/0.3.9...0.4.6)

---
updated-dependencies:
- dependency-name: colorama
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>

* Add automated changelog yaml from template for bot PR

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Github Build Bot <buildbot@fishtownanalytics.com>
2022-11-13 09:57:33 -05:00
Joe Berni
73116fb816 feature/favor-state-node (#5859) 2022-11-09 10:58:01 -06:00
Stu Kilgore
f02243506d Convert postgres index tests (#6228) 2022-11-08 15:30:29 -06:00
Stu Kilgore
d5e9ce1797 Convert color tests to pytest (#6230) 2022-11-08 15:25:57 -06:00
Stu Kilgore
4e786184d2 Convert threading tests to pytest (#6226) 2022-11-08 08:56:10 -06:00
Chenyu Li
930bd3541e properly track hook running (#6059) 2022-11-07 10:44:29 -06:00
Gerda Shank
6c76137da4 CT 1443 remove root path (#6172)
* Remove root_path

* Bump manifest schema to 8

* Update tests and compability utility for v8, root_path removal
2022-11-04 16:38:26 -04:00
Gerda Shank
68d06d8a9c Combine various print result log events with different levels (#6174)
* Combine various print result log events with different levels

* Changie

* more merge cleanup

* Specify DynamicLevel for event classes that must specify level
2022-11-04 14:26:37 -04:00
Rachel
d0543c9242 Updates lib to use new profile name functionality (#6202)
* Updates lib to use new profile name functionality

* Adds changie entry

* Fixes formatting
2022-11-04 10:05:24 -07:00
Michelle Ark
cfad27f963 add typing to DepsTask.run (#6192) 2022-11-03 17:35:16 -04:00
Emily Rockman
c3ccbe3357 add python version and upgrade action (#6204) 2022-11-03 09:13:00 -05:00
dependabot[bot]
8e28f5906e Bump python from 3.10.7-slim-bullseye to 3.11.0-slim-bullseye in /docker (#6180)
* Bump python from 3.10.7-slim-bullseye to 3.11.0-slim-bullseye in /docker

Bumps python from 3.10.7-slim-bullseye to 3.11.0-slim-bullseye.

---
updated-dependencies:
- dependency-name: python
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Add automated changelog yaml from template for bot PR

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Github Build Bot <buildbot@fishtownanalytics.com>
2022-11-02 08:40:51 -07:00
FishtownBuildBot
d23285b4ba Add new index.html and changelog yaml files from dbt-docs (#6112) 2022-11-02 08:36:56 -07:00
Michelle Ark
a42748433d converting 023_exit_codes_tests (#6105)
* converting 023_exit_codes_tests

* use packages fixture, clean up test names
2022-11-01 16:26:12 -04:00
Emily Rockman
be4a91a0fe Convert messages to struct logs (#6064)
* Initial structured logging changes

* remove "this" from core/dbt/events/functions.py

* CT-1047: Fix execution_time definitions to use float

* CT-1047: Revert unintended checking of changes to functions.py

* WIP

* first pass to resolve circular deps

* more circular dep resolution

* remove a bunch of duplication

* move message into log line

* update comments

* fix field that wen missing during rebase

* remove double import

* remove some comments and extra code

* fix pre-commit

* rework deprecations

* WIP converting messages

* WIP converting messages

* remove stray comment

* WIP more message conversion

* WIP more message conversion

* tweak the messages

* convert last message

* rename

* remove warn_or_raise as never used

* add fake calls to all new events

* fix some tests

* put back deprecation

* restore deprecation fully

* fix unit test

* fix log levels

* remove some skipped ids

* fix macro log function

* fix how messages are built to match expected outcome

* fix expected test message

* small fixes from reviews

* fix conflict resolution in UI

Co-authored-by: Gerda Shank <gerda@dbtlabs.com>
Co-authored-by: Peter Allen Webb <peter.webb@dbtlabs.com>
2022-10-31 12:04:56 -05:00
Emily Rockman
8145eed603 revert to community action (#6163) 2022-10-27 16:10:58 -05:00
Emily Rockman
fc00239f36 point to correct workflow (#6161)
* point to correct workflow

* add inputs
2022-10-27 14:05:09 -05:00
Ian Knox
77dfec7214 more ergonomic profile name handling (#6157) 2022-10-27 10:49:27 -05:00
Emily Rockman
7b73264ec8 switch out to use internal action for triage labels (#6120)
* switch out to use our action

* point to main
2022-10-27 08:33:15 -05:00
Mila Page
1916784287 Ct 1167/030 statement tests conversion (#6109)
* Convert test to functional set.

* Remove old statement tests from integration test set.

* Nix whitespace

Co-authored-by: Mila Page <versusfacit@users.noreply.github.com>
2022-10-26 03:37:44 -07:00
Ian Knox
c2856017a1 [BUGFIX] Force tox to update pip (fixes psycopg2-binary @ 2.9.5) (#6134) 2022-10-25 13:01:38 -05:00
Michelle Ark
17b82661d2 convert 027 cycle test (#6094)
* convert 027 cycle test

* remove no-op expect_pass=False

* remove postgres from test names
2022-10-21 11:41:51 -04:00
Michelle Ark
6c8609499a Add 'michelleark' to changie's core_team list (#6084) 2022-10-20 14:41:41 -04:00
Peter Webb
53ae325576 CT-1099: Migrate test 071_commented_yaml_regression_3568_tests (#6106) 2022-10-20 12:43:30 -04:00
Mila Page
a7670a3ab9 Add unit tests for recent stringifier functors added to events library. (#6095)
Co-authored-by: Mila Page <versusfacit@users.noreply.github.com>
2022-10-19 22:52:32 -07:00
Mila Page
ff2f1f42c3 Working solution serialization bug. (#5874)
* Create functors to initialize event types with str-type member attributes. Before this change, the spec of various classes expected base_msg and msg params to be str's. This assumption did not always hold true. post_init hooks ensures the spec is obeyed.
* Add new changelog.
* Add msg type change functor to a few other events that could use it.

Co-authored-by: Mila Page <versusfacit@users.noreply.github.com>
2022-10-18 12:20:30 -07:00
Luke Bassett
35f7975d8f Updated string formatting on non-f-strings. (#6086)
* Updated string formatting on non-f-strings.

Found all cases of strings separated by white space on a single line and
removed white space separation. EX: "hello " "world" -> "hello world".

* add changelog entry
2022-10-17 15:58:31 -05:00
Eve Johns
a9c8bc0e0a f-string cleanup #6068 (#6082)
* fix f string issue

* removed one space

* Add changelog

* fixed return format

Co-authored-by: Leah Antkiewicz <leah.antkiewicz@fishtownanalytics.com>
2022-10-17 16:58:04 -04:00
Maximilian Roos
e0c32f425d Merge branch 'main' into prql 2022-10-11 11:18:13 -07:00
Maximilian Roos
90223ed279 Merge branch 'main' into prql 2022-10-06 13:00:37 -07:00
Maximilian Roos
472940423c Remove unused PrqlNode & friends 2022-10-05 18:35:06 -07:00
Maximilian Roos
dddb0bff5a Merge branch 'main' into prql 2022-10-05 18:02:20 -07:00
Maximilian Roos
bc8b65095e Add language on error nodes 2022-10-05 11:34:15 -07:00
Maximilian Roos
86eb68f40d Add test to test_graph.py 2022-10-05 11:34:15 -07:00
Maximilian Roos
8eece383ea flake 2022-10-05 11:34:15 -07:00
Maximilian Roos
c9572c3106 Always use the mock method to align the snapshot tests 2022-10-05 11:34:15 -07:00
Maximilian Roos
ebff2ceb72 Revert to importing builtins from typing 2022-10-05 11:34:15 -07:00
Maximilian Roos
5a8fd1e90d Ignore types in the import hacks
(tests still fail b/c typing_extensions is not installed)
2022-10-05 11:34:15 -07:00
Maximilian Roos
fa3f17200f Add a mock return from prql_python 2022-10-05 11:34:15 -07:00
Maximilian Roos
506f2c939a A very-WIP implementation of the PRQL parser 2022-10-05 11:34:08 -07:00
190 changed files with 10379 additions and 3376 deletions

View File

@@ -0,0 +1,7 @@
kind: "Dependency"
body: "Update pathspec requirement from ~=0.9.0 to >=0.9,<0.11 in /core"
time: 2022-09-23T00:06:46.00000Z
custom:
Author: dependabot[bot]
Issue: 4904
PR: 5917

View File

@@ -0,0 +1,7 @@
kind: "Dependency"
body: "Update colorama requirement from <0.4.6,>=0.3.9 to >=0.3.9,<0.4.7 in /core"
time: 2022-10-26T00:09:10.00000Z
custom:
Author: dependabot[bot]
Issue: 4904
PR: 6144

View File

@@ -0,0 +1,6 @@
kind: Docs
time: 2022-10-17T17:14:11.715348-05:00
custom:
Author: paulbenschmidt
Issue: "5880"
PR: "324"

View File

@@ -0,0 +1,7 @@
kind: Docs
body: Fix rendering of sample code for metrics
time: 2022-11-16T15:57:43.204201+01:00
custom:
Author: jtcohen6
Issue: "323"
PR: "346"

View File

@@ -0,0 +1,8 @@
kind: Features
body: Added favor-state flag to optionally favor state nodes even if unselected node
exists
time: 2022-04-08T16:54:59.696564+01:00
custom:
Author: daniel-murray josephberni
Issue: "2968"
PR: "5859"

View File

@@ -0,0 +1,8 @@
kind: Features
body: This pulls the profile name from args when constructing a RuntimeConfig in lib.py,
enabling the dbt-server to override the value that's in the dbt_project.yml
time: 2022-11-02T15:00:03.000805-05:00
custom:
Author: racheldaniel
Issue: "6201"
PR: "6202"

View File

@@ -0,0 +1,8 @@
kind: Fixes
body: Add functors to ensure event types with str-type attributes are initialized
to spec, even when provided non-str type params.
time: 2022-10-16T17:37:42.846683-07:00
custom:
Author: versusfacit
Issue: "5436"
PR: "5874"

View File

@@ -0,0 +1,7 @@
kind: Fixes
body: Allow hooks to fail without halting execution flow
time: 2022-11-07T09:53:14.340257-06:00
custom:
Author: ChenyuLInx
Issue: "5625"
PR: "6059"

View File

@@ -0,0 +1,7 @@
kind: Under the Hood
body: Fixed extra whitespace in strings introduced by black.
time: 2022-10-17T15:15:11.499246-05:00
custom:
Author: luke-bassett
Issue: "1350"
PR: "6086"

View File

@@ -0,0 +1,7 @@
kind: Under the Hood
body: Clean up string formatting
time: 2022-10-17T15:58:44.676549-04:00
custom:
Author: eve-johns
Issue: "6068"
PR: "6082"

View File

@@ -0,0 +1,7 @@
kind: Under the Hood
body: Remove the 'root_path' field from most nodes
time: 2022-10-28T10:48:37.687886-04:00
custom:
Author: gshank
Issue: "6171"
PR: "6172"

View File

@@ -0,0 +1,7 @@
kind: Under the Hood
body: Combine certain logging events with different levels
time: 2022-10-28T11:03:44.887836-04:00
custom:
Author: gshank
Issue: "6173"
PR: "6174"

View File

@@ -0,0 +1,7 @@
kind: Under the Hood
body: Convert threading tests to pytest
time: 2022-11-08T07:45:50.589147-06:00
custom:
Author: stu-k
Issue: "5942"
PR: "6226"

View File

@@ -0,0 +1,7 @@
kind: Under the Hood
body: Convert postgres index tests to pytest
time: 2022-11-08T11:56:33.743042-06:00
custom:
Author: stu-k
Issue: "5770"
PR: "6228"

View File

@@ -0,0 +1,7 @@
kind: Under the Hood
body: Convert use color tests to pytest
time: 2022-11-08T13:31:04.788547-06:00
custom:
Author: stu-k
Issue: "5771"
PR: "6230"

View File

@@ -44,7 +44,7 @@ custom:
footerFormat: |
{{- $contributorDict := dict }}
{{- /* any names added to this list should be all lowercase for later matching purposes */}}
{{- $core_team := list "peterallenwebb" "emmyoop" "nathaniel-may" "gshank" "leahwicz" "chenyulinx" "stu-k" "iknox-fa" "versusfacit" "mcknight-42" "jtcohen6" "dependabot[bot]" "snyk-bot" "colin-rogers-dbt" }}
{{- $core_team := list "michelleark" "peterallenwebb" "emmyoop" "nathaniel-may" "gshank" "leahwicz" "chenyulinx" "stu-k" "iknox-fa" "versusfacit" "mcknight-42" "jtcohen6" "dependabot[bot]" "snyk-bot" "colin-rogers-dbt" }}
{{- range $change := .Changes }}
{{- $authorList := splitList " " $change.Custom.Author }}
{{- /* loop through all authors for a PR */}}

View File

@@ -45,7 +45,9 @@ jobs:
uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
uses: actions/setup-python@v4.3.0
with:
python-version: '3.8'
- name: Install python dependencies
run: |
@@ -82,7 +84,7 @@ jobs:
uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4.3.0
with:
python-version: ${{ matrix.python-version }}
@@ -137,7 +139,7 @@ jobs:
uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4.3.0
with:
python-version: ${{ matrix.python-version }}
@@ -190,9 +192,9 @@ jobs:
uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
uses: actions/setup-python@v4.3.0
with:
python-version: 3.8
python-version: '3.8'
- name: Install python dependencies
run: |

View File

@@ -30,7 +30,7 @@ repos:
args:
- "--check"
- "--diff"
- repo: https://gitlab.com/pycqa/flake8
- repo: https://github.com/pycqa/flake8
rev: 4.0.1
hooks:
- id: flake8

View File

@@ -41,10 +41,10 @@ from dbt.events.functions import fire_event
from dbt.events.types import (
NewConnection,
ConnectionReused,
ConnectionLeftOpenInCleanup,
ConnectionLeftOpen,
ConnectionLeftOpen2,
ConnectionClosedInCleanup,
ConnectionClosed,
ConnectionClosed2,
Rollback,
RollbackFailed,
)
@@ -306,9 +306,9 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
with self.lock:
for connection in self.thread_connections.values():
if connection.state not in {"closed", "init"}:
fire_event(ConnectionLeftOpen(conn_name=cast_to_str(connection.name)))
fire_event(ConnectionLeftOpenInCleanup(conn_name=cast_to_str(connection.name)))
else:
fire_event(ConnectionClosed(conn_name=cast_to_str(connection.name)))
fire_event(ConnectionClosedInCleanup(conn_name=cast_to_str(connection.name)))
self.close(connection)
# garbage collect these connections
@@ -345,10 +345,10 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
"""Perform the actual close operation."""
# On windows, sometimes connection handles don't have a close() attr.
if hasattr(connection.handle, "close"):
fire_event(ConnectionClosed2(conn_name=cast_to_str(connection.name)))
fire_event(ConnectionClosed(conn_name=cast_to_str(connection.name)))
connection.handle.close()
else:
fire_event(ConnectionLeftOpen2(conn_name=cast_to_str(connection.name)))
fire_event(ConnectionLeftOpen(conn_name=cast_to_str(connection.name)))
@classmethod
def _rollback(cls, connection: Connection) -> None:

View File

@@ -41,13 +41,13 @@ from dbt.clients.jinja import MacroGenerator
from dbt.contracts.graph.compiled import CompileResultNode, CompiledSeedNode
from dbt.contracts.graph.manifest import Manifest, MacroManifest
from dbt.contracts.graph.parsed import ParsedSeedNode
from dbt.exceptions import warn_or_error
from dbt.events.functions import fire_event
from dbt.events.functions import fire_event, warn_or_error
from dbt.events.types import (
CacheMiss,
ListRelations,
CodeExecution,
CodeExecutionStatus,
CatalogGenerationError,
)
from dbt.utils import filter_null_values, executor, cast_to_str
@@ -581,7 +581,7 @@ class BaseAdapter(metaclass=AdapterMeta):
:rtype: List[self.Relation]
"""
raise NotImplementedException(
"`list_relations_without_caching` is not implemented for this " "adapter!"
"`list_relations_without_caching` is not implemented for this adapter!"
)
###
@@ -1327,7 +1327,7 @@ def catch_as_completed(
elif isinstance(exc, KeyboardInterrupt) or not isinstance(exc, Exception):
raise exc
else:
warn_or_error(f"Encountered an error while generating catalog: {str(exc)}")
warn_or_error(CatalogGenerationError(exc=str(exc)))
# exc is not None, derives from Exception, and isn't ctrl+c
exceptions.append(exc)
return merge_tables(tables), exceptions

View File

@@ -367,9 +367,9 @@ class BlockIterator:
if self.current:
linecount = self.data[: self.current.end].count("\n") + 1
dbt.exceptions.raise_compiler_error(
(
"Reached EOF without finding a close tag for " "{} (searched from line {})"
).format(self.current.block_type_name, linecount)
("Reached EOF without finding a close tag for {} (searched from line {})").format(
self.current.block_type_name, linecount
)
)
if collect_raw_data:

View File

@@ -29,10 +29,12 @@ from dbt.exceptions import (
from dbt.graph import Graph
from dbt.events.functions import fire_event
from dbt.events.types import FoundStats, CompilingNode, WritingInjectedSQLForNode
from dbt.node_types import NodeType, ModelLanguage
from dbt.node_types import NodeType
from dbt.events.format import pluralize
import dbt.tracking
from dbt.parser.languages import get_language_provider_by_name
graph_file_name = "graph.gpickle"
@@ -363,42 +365,19 @@ class Compiler:
{
"compiled": False,
"compiled_code": None,
"compiled_language": None,
"extra_ctes_injected": False,
"extra_ctes": [],
}
)
compiled_node = _compiled_type_for(node).from_dict(data)
if compiled_node.language == ModelLanguage.python:
# TODO could we also 'minify' this code at all? just aesthetic, not functional
# quoating seems like something very specific to sql so far
# for all python implementations we are seeing there's no quating.
# TODO try to find better way to do this, given that
original_quoting = self.config.quoting
self.config.quoting = {key: False for key in original_quoting.keys()}
context = self._create_node_context(compiled_node, manifest, extra_context)
postfix = jinja.get_rendered(
"{{ py_script_postfix(model) }}",
context,
node,
)
# we should NOT jinja render the python model's 'raw code'
compiled_node.compiled_code = f"{node.raw_code}\n\n{postfix}"
# restore quoting settings in the end since context is lazy evaluated
self.config.quoting = original_quoting
else:
context = self._create_node_context(compiled_node, manifest, extra_context)
compiled_node.compiled_code = jinja.get_rendered(
node.raw_code,
context,
node,
)
context = self._create_node_context(compiled_node, manifest, extra_context)
provider = get_language_provider_by_name(node.language)
compiled_node.compiled_code = provider.get_compiled_code(node, context)
compiled_node.relation_name = self._get_relation_name(node)
compiled_node.compiled_language = provider.compiled_language()
compiled_node.compiled = True
return compiled_node
@@ -514,6 +493,8 @@ class Compiler:
fire_event(WritingInjectedSQLForNode(unique_id=node.unique_id))
if node.compiled_code:
# TODO: should compiled_path depend on the compiled_language?
# e.g. "model.prql" (source) -> "model.sql" (compiled)
node.compiled_path = node.write_node(
self.config.target_path, "compiled", node.compiled_code
)

View File

@@ -248,7 +248,7 @@ class PartialProject(RenderComponents):
project_name: Optional[str] = field(
metadata=dict(
description=(
"The name of the project. This should always be set and will not " "be rendered"
"The name of the project. This should always be set and will not be rendered"
)
)
)
@@ -668,7 +668,7 @@ class Project:
def get_selector(self, name: str) -> Union[SelectionSpec, bool]:
if name not in self.selectors:
raise RuntimeException(
f"Could not find selector named {name}, expected one of " f"{list(self.selectors)}"
f"Could not find selector named {name}, expected one of {list(self.selectors)}"
)
return self.selectors[name]["definition"]

View File

@@ -3,31 +3,41 @@ import os
from copy import deepcopy
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Any, Optional, Mapping, Iterator, Iterable, Tuple, List, MutableSet, Type
from typing import (
Any,
Dict,
Iterable,
Iterator,
Mapping,
MutableSet,
Optional,
Tuple,
Type,
Union,
)
from .profile import Profile
from .project import Project
from .renderer import DbtProjectYamlRenderer, ProfileRenderer
from .utils import parse_cli_vars
from dbt import flags
from dbt.adapters.factory import get_relation_class_by_name, get_include_paths
from dbt.helper_types import FQNPath, PathSet, DictDefaultEmptyStr
from dbt.adapters.factory import get_include_paths, get_relation_class_by_name
from dbt.config.profile import read_user_config
from dbt.contracts.connection import AdapterRequiredConfig, Credentials
from dbt.contracts.graph.manifest import ManifestMetadata
from dbt.contracts.relation import ComponentName
from dbt.ui import warning_tag
from dbt.contracts.project import Configuration, UserConfig
from dbt.exceptions import (
RuntimeException,
DbtProjectError,
validator_error_message,
warn_or_error,
raise_compiler_error,
)
from dbt.contracts.relation import ComponentName
from dbt.dataclass_schema import ValidationError
from dbt.exceptions import (
DbtProjectError,
RuntimeException,
raise_compiler_error,
validator_error_message,
)
from dbt.events.functions import warn_or_error
from dbt.events.types import UnusedResourceConfigPath
from dbt.helper_types import DictDefaultEmptyStr, FQNPath, PathSet
from .profile import Profile
from .project import Project, PartialProject
from .renderer import DbtProjectYamlRenderer, ProfileRenderer
from .utils import parse_cli_vars
def _project_quoting_dict(proj: Project, profile: Profile) -> Dict[ComponentName, bool]:
@@ -190,28 +200,52 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
@classmethod
def collect_parts(cls: Type["RuntimeConfig"], args: Any) -> Tuple[Project, Profile]:
# profile_name from the project
project_root = args.project_dir if args.project_dir else os.getcwd()
version_check = bool(flags.VERSION_CHECK)
partial = Project.partial_load(project_root, verify_version=version_check)
# build the profile using the base renderer and the one fact we know
# Note: only the named profile section is rendered. The rest of the
# profile is ignored.
cli_vars: Dict[str, Any] = parse_cli_vars(getattr(args, "vars", "{}"))
profile = cls.collect_profile(args=args)
project_renderer = DbtProjectYamlRenderer(profile, cli_vars)
project = cls.collect_project(args=args, project_renderer=project_renderer)
assert type(project) is Project
return (project, profile)
@classmethod
def collect_profile(
cls: Type["RuntimeConfig"], args: Any, profile_name: Optional[str] = None
) -> Profile:
cli_vars: Dict[str, Any] = parse_cli_vars(getattr(args, "vars", "{}"))
profile_renderer = ProfileRenderer(cli_vars)
profile_name = partial.render_profile_name(profile_renderer)
# build the profile using the base renderer and the one fact we know
if profile_name is None:
# Note: only the named profile section is rendered here. The rest of the
# profile is ignored.
partial = cls.collect_project(args)
assert type(partial) is PartialProject
profile_name = partial.render_profile_name(profile_renderer)
profile = cls._get_rendered_profile(args, profile_renderer, profile_name)
# Save env_vars encountered in rendering for partial parsing
profile.profile_env_vars = profile_renderer.ctx_obj.env_vars
return profile
# get a new renderer using our target information and render the
# project
project_renderer = DbtProjectYamlRenderer(profile, cli_vars)
project = partial.render(project_renderer)
# Save env_vars encountered in rendering for partial parsing
project.project_env_vars = project_renderer.ctx_obj.env_vars
return (project, profile)
@classmethod
def collect_project(
cls: Type["RuntimeConfig"],
args: Any,
project_renderer: Optional[DbtProjectYamlRenderer] = None,
) -> Union[Project, PartialProject]:
project_root = args.project_dir if args.project_dir else os.getcwd()
version_check = bool(flags.VERSION_CHECK)
partial = Project.partial_load(project_root, verify_version=version_check)
if project_renderer is None:
return partial
else:
project = partial.render(project_renderer)
project.project_env_vars = project_renderer.ctx_obj.env_vars
return project
# Called in main.py, lib.py, task/base.py
@classmethod
@@ -280,11 +314,11 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
"exposures": self._get_config_paths(self.exposures),
}
def get_unused_resource_config_paths(
def warn_for_unused_resource_config_paths(
self,
resource_fqns: Mapping[str, PathSet],
disabled: PathSet,
) -> List[FQNPath]:
) -> None:
"""Return a list of lists of strings, where each inner list of strings
represents a type + FQN path of a resource configuration that is not
used.
@@ -298,23 +332,13 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
for config_path in config_paths:
if not _is_config_used(config_path, fqns):
unused_resource_config_paths.append((resource_type,) + config_path)
return unused_resource_config_paths
resource_path = ".".join(i for i in ((resource_type,) + config_path))
unused_resource_config_paths.append(resource_path)
def warn_for_unused_resource_config_paths(
self,
resource_fqns: Mapping[str, PathSet],
disabled: PathSet,
) -> None:
unused = self.get_unused_resource_config_paths(resource_fqns, disabled)
if len(unused) == 0:
if len(unused_resource_config_paths) == 0:
return
msg = UNUSED_RESOURCE_CONFIGURATION_PATH_MESSAGE.format(
len(unused), "\n".join("- {}".format(".".join(u)) for u in unused)
)
warn_or_error(msg, log_fmt=warning_tag("{}"))
warn_or_error(UnusedResourceConfigPath(unused_config_paths=unused_resource_config_paths))
def load_dependencies(self, base_only=False) -> Mapping[str, "RuntimeConfig"]:
if self.dependencies is None:
@@ -591,14 +615,6 @@ class UnsetProfileConfig(RuntimeConfig):
return cls.from_parts(project=project, profile=profile, args=args)
UNUSED_RESOURCE_CONFIGURATION_PATH_MESSAGE = """\
Configuration paths exist in your dbt_project.yml file which do not \
apply to any resources.
There are {} unused configuration paths:
{}
"""
def _is_config_used(path, fqns):
if fqns:
for fqn in fqns:

View File

@@ -1,3 +1,10 @@
SECRET_ENV_PREFIX = "DBT_ENV_SECRET_"
DEFAULT_ENV_PLACEHOLDER = "DBT_DEFAULT_PLACEHOLDER"
METADATA_ENV_PREFIX = "DBT_ENV_CUSTOM_ENV_"
MAXIMUM_SEED_SIZE = 1 * 1024 * 1024
MAXIMUM_SEED_SIZE_NAME = "1MB"
PIN_PACKAGE_URL = (
"https://docs.getdbt.com/docs/package-management#section-specifying-package-versions"
)

View File

@@ -16,7 +16,7 @@ from dbt.exceptions import (
disallow_secret_env_var,
)
from dbt.events.functions import fire_event, get_invocation_id
from dbt.events.types import MacroEventInfo, MacroEventDebug
from dbt.events.types import JinjaLogInfo, JinjaLogDebug
from dbt.version import __version__ as dbt_version
# These modules are added to the context. Consider alternative
@@ -126,7 +126,7 @@ class ContextMeta(type):
class Var:
UndefinedVarError = "Required var '{}' not found in config:\nVars " "supplied to {} = {}"
UndefinedVarError = "Required var '{}' not found in config:\nVars supplied to {} = {}"
_VAR_NOTSET = object()
def __init__(
@@ -557,9 +557,9 @@ class BaseContext(metaclass=ContextMeta):
{% endmacro %}"
"""
if info:
fire_event(MacroEventInfo(msg=msg))
fire_event(JinjaLogInfo(msg=msg))
else:
fire_event(MacroEventDebug(msg=msg))
fire_event(JinjaLogDebug(msg=msg))
return ""
@contextproperty

View File

@@ -53,7 +53,6 @@ from dbt.exceptions import (
raise_compiler_error,
ref_invalid_args,
metric_invalid_args,
ref_target_not_found,
target_not_found,
ref_bad_context,
wrapped_exports,
@@ -182,7 +181,7 @@ class BaseDatabaseWrapper:
return macro
searched = ", ".join(repr(a) for a in attempts)
msg = f"In dispatch: No macro named '{macro_name}' found\n" f" Searched for: {searched}"
msg = f"In dispatch: No macro named '{macro_name}' found\n Searched for: {searched}"
raise CompilationException(msg)
@@ -220,12 +219,12 @@ class BaseRefResolver(BaseResolver):
def validate_args(self, name: str, package: Optional[str]):
if not isinstance(name, str):
raise CompilationException(
f"The name argument to ref() must be a string, got " f"{type(name)}"
f"The name argument to ref() must be a string, got {type(name)}"
)
if package is not None and not isinstance(package, str):
raise CompilationException(
f"The package argument to ref() must be a string or None, got " f"{type(package)}"
f"The package argument to ref() must be a string or None, got {type(package)}"
)
def __call__(self, *args: str) -> RelationProxy:
@@ -476,10 +475,11 @@ class RuntimeRefResolver(BaseRefResolver):
)
if target_model is None or isinstance(target_model, Disabled):
ref_target_not_found(
self.model,
target_name,
target_package,
target_not_found(
node=self.model,
target_name=target_name,
target_kind="node",
target_package=target_package,
disabled=isinstance(target_model, Disabled),
)
self.validate(target_model, target_name, target_package)
@@ -803,6 +803,7 @@ class ProviderContext(ManifestContext):
raise_compiler_error(
"can only load_agate_table for seeds (got a {})".format(self.model.resource_type)
)
assert self.model.root_path
path = os.path.join(self.model.root_path, self.model.original_file_path)
column_types = self.model.config.column_types
try:
@@ -1315,7 +1316,7 @@ class ModelContext(ProviderContext):
# only doing this in sql model for backward compatible
if (
getattr(self.model, "extra_ctes_injected", None)
and self.model.language == ModelLanguage.sql # type: ignore[union-attr]
and self.model.compiled_language == ModelLanguage.sql # type: ignore[union-attr]
):
# TODO CT-211
return self.model.compiled_code # type: ignore[union-attr]

View File

@@ -94,7 +94,7 @@ class Connection(ExtensibleDbtClassMixin, Replaceable):
self._handle.resolve(self)
except RecursionError as exc:
raise InternalException(
"A connection's open() method attempted to read the " "handle value"
"A connection's open() method attempted to read the handle value"
) from exc
return self._handle

View File

@@ -1,18 +1,16 @@
import hashlib
import os
from dataclasses import dataclass, field
from mashumaro.types import SerializableType
from typing import List, Optional, Union, Dict, Any
from dbt.constants import MAXIMUM_SEED_SIZE
from dbt.dataclass_schema import dbtClassMixin, StrEnum
from .util import SourceKey
MAXIMUM_SEED_SIZE = 1 * 1024 * 1024
MAXIMUM_SEED_SIZE_NAME = "1MB"
class ParseFileType(StrEnum):
Macro = "macro"
Model = "model"
@@ -24,6 +22,7 @@ class ParseFileType(StrEnum):
Documentation = "docs"
Schema = "schema"
Hook = "hook" # not a real filetype, from dbt_project.yml
language: str = "sql"
parse_file_type_to_parser = {
@@ -196,6 +195,7 @@ class SourceFile(BaseSourceFile):
docs: List[str] = field(default_factory=list)
macros: List[str] = field(default_factory=list)
env_vars: List[str] = field(default_factory=list)
language: str = "sql"
@classmethod
def big_seed(cls, path: FilePath) -> "SourceFile":

View File

@@ -42,6 +42,7 @@ class CompiledNodeMixin(dbtClassMixin):
@dataclass
class CompiledNode(ParsedNode, CompiledNodeMixin):
compiled_code: Optional[str] = None
compiled_language: Optional[str] = None # TODO: ModelLanguage
extra_ctes_injected: bool = False
extra_ctes: List[InjectedCTE] = field(default_factory=list)
relation_name: Optional[str] = None
@@ -97,6 +98,7 @@ class CompiledSeedNode(CompiledNode):
# keep this in sync with ParsedSeedNode!
resource_type: NodeType = field(metadata={"restrict": [NodeType.Seed]})
config: SeedConfig = field(default_factory=SeedConfig)
root_path: Optional[str] = None
@property
def empty(self):

View File

@@ -499,7 +499,7 @@ def _update_into(dest: MutableMapping[str, T], new_item: T):
existing = dest[unique_id]
if new_item.original_file_path != existing.original_file_path:
raise dbt.exceptions.RuntimeException(
f"cannot update a {new_item.resource_type} to have a new file " f"path!"
f"cannot update a {new_item.resource_type} to have a new file path!"
)
dest[unique_id] = new_item
@@ -1011,6 +1011,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
adapter,
other: "WritableManifest",
selected: AbstractSet[UniqueID],
favor_state: bool = False,
) -> None:
"""Given the selected unique IDs and a writable manifest, update this
manifest by replacing any unselected nodes with their counterpart.
@@ -1025,7 +1026,10 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
node.resource_type in refables
and not node.is_ephemeral
and unique_id not in selected
and not adapter.get_relation(current.database, current.schema, current.identifier)
and (
not adapter.get_relation(current.database, current.schema, current.identifier)
or favor_state
)
):
merged.add(unique_id)
self.nodes[unique_id] = node.replace(deferred=True)
@@ -1183,7 +1187,7 @@ AnyManifest = Union[Manifest, MacroManifest]
@dataclass
@schema_version("manifest", 7)
@schema_version("manifest", 8)
class WritableManifest(ArtifactMixin):
nodes: Mapping[UniqueID, ManifestNode] = field(
metadata=dict(description=("The nodes defined in the dbt project and its dependencies"))
@@ -1229,7 +1233,7 @@ class WritableManifest(ArtifactMixin):
@classmethod
def compatible_previous_versions(self):
return [("manifest", 4), ("manifest", 5), ("manifest", 6)]
return [("manifest", 4), ("manifest", 5), ("manifest", 6), ("manifest", 7)]
def __post_serialize__(self, dct):
for unique_id, node in dct["nodes"].items():

View File

@@ -18,7 +18,7 @@ from typing import (
from dbt.dataclass_schema import dbtClassMixin, ExtensibleDbtClassMixin
from dbt.clients.system import write_file
from dbt.contracts.files import FileHash, MAXIMUM_SEED_SIZE_NAME
from dbt.contracts.files import FileHash
from dbt.contracts.graph.unparsed import (
UnparsedNode,
UnparsedDocumentation,
@@ -41,7 +41,13 @@ from dbt.contracts.graph.unparsed import (
)
from dbt.contracts.util import Replaceable, AdditionalPropertiesMixin
from dbt.events.proto_types import NodeInfo
from dbt.exceptions import warn_or_error
from dbt.events.functions import warn_or_error
from dbt.events.types import (
SeedIncreased,
SeedExceedsLimitSamePath,
SeedExceedsLimitAndPathChanged,
SeedExceedsLimitChecksumChanged,
)
from dbt import flags
from dbt.node_types import ModelLanguage, NodeType
@@ -375,30 +381,28 @@ def same_seeds(first: ParsedNode, second: ParsedNode) -> bool:
if first.checksum.name == "path":
msg: str
if second.checksum.name != "path":
msg = (
f"Found a seed ({first.package_name}.{first.name}) "
f">{MAXIMUM_SEED_SIZE_NAME} in size. The previous file was "
f"<={MAXIMUM_SEED_SIZE_NAME}, so it has changed"
warn_or_error(
SeedIncreased(package_name=first.package_name, name=first.name), node=first
)
elif result:
msg = (
f"Found a seed ({first.package_name}.{first.name}) "
f">{MAXIMUM_SEED_SIZE_NAME} in size at the same path, dbt "
f"cannot tell if it has changed: assuming they are the same"
warn_or_error(
SeedExceedsLimitSamePath(package_name=first.package_name, name=first.name),
node=first,
)
elif not result:
msg = (
f"Found a seed ({first.package_name}.{first.name}) "
f">{MAXIMUM_SEED_SIZE_NAME} in size. The previous file was in "
f"a different location, assuming it has changed"
warn_or_error(
SeedExceedsLimitAndPathChanged(package_name=first.package_name, name=first.name),
node=first,
)
else:
msg = (
f"Found a seed ({first.package_name}.{first.name}) "
f">{MAXIMUM_SEED_SIZE_NAME} in size. The previous file had a "
f"checksum type of {second.checksum.name}, so it has changed"
warn_or_error(
SeedExceedsLimitChecksumChanged(
package_name=first.package_name,
name=first.name,
checksum_name=second.checksum.name,
),
node=first,
)
warn_or_error(msg, node=first)
return result
@@ -408,6 +412,9 @@ class ParsedSeedNode(ParsedNode):
# keep this in sync with CompiledSeedNode!
resource_type: NodeType = field(metadata={"restrict": [NodeType.Seed]})
config: SeedConfig = field(default_factory=SeedConfig)
# seeds need the root_path because the contents are not loaded initially
# and we need the root_path to load the seed later
root_path: Optional[str] = None
@property
def empty(self):

View File

@@ -24,7 +24,6 @@ from typing import Optional, List, Union, Dict, Any, Sequence
@dataclass
class UnparsedBaseNode(dbtClassMixin, Replaceable):
package_name: str
root_path: str
path: str
original_file_path: str
@@ -364,7 +363,6 @@ class SourcePatch(dbtClassMixin, Replaceable):
@dataclass
class UnparsedDocumentation(dbtClassMixin, Replaceable):
package_name: str
root_path: str
path: str
original_file_path: str

View File

@@ -12,9 +12,7 @@ from dataclasses import dataclass, field
from typing import Optional, List, Dict, Union, Any
from mashumaro.types import SerializableType
PIN_PACKAGE_URL = (
"https://docs.getdbt.com/docs/package-management#section-specifying-package-versions" # noqa
)
DEFAULT_SEND_ANONYMOUS_USAGE_STATS = True

View File

@@ -220,7 +220,9 @@ class RunResultsArtifact(ExecutionResult, ArtifactMixin):
generated_at: datetime,
args: Dict,
):
processed_results = [process_run_result(result) for result in results]
processed_results = [
process_run_result(result) for result in results if isinstance(result, RunResult)
]
meta = RunResultsMetadata(
dbt_schema_version=str(cls.dbt_schema_version),
generated_at=generated_at,
@@ -339,7 +341,7 @@ def process_freshness_result(result: FreshnessNodeResult) -> FreshnessNodeOutput
criteria = result.node.freshness
if criteria is None:
raise InternalException(
"Somehow evaluated a freshness result for a source " "that has no freshness criteria!"
"Somehow evaluated a freshness result for a source that has no freshness criteria!"
)
return SourceFreshnessOutput(
unique_id=unique_id,

View File

@@ -240,13 +240,32 @@ def rename_sql_attr(node_content: dict) -> dict:
def upgrade_manifest_json(manifest: dict) -> dict:
for node_content in manifest.get("nodes", {}).values():
node_content = rename_sql_attr(node_content)
if node_content["resource_type"] != "seed" and "root_path" in node_content:
del node_content["root_path"]
for disabled in manifest.get("disabled", {}).values():
# There can be multiple disabled nodes for the same unique_id
# so make sure all the nodes get the attr renamed
disabled = [rename_sql_attr(n) for n in disabled]
for node_content in disabled:
rename_sql_attr(node_content)
if node_content["resource_type"] != "seed" and "root_path" in node_content:
del node_content["root_path"]
for metric_content in manifest.get("metrics", {}).values():
# handle attr renames + value translation ("expression" -> "derived")
metric_content = rename_metric_attr(metric_content)
if "root_path" in metric_content:
del metric_content["root_path"]
for exposure_content in manifest.get("exposures", {}).values():
if "root_path" in exposure_content:
del exposure_content["root_path"]
for source_content in manifest.get("sources", {}).values():
if "root_path" in exposure_content:
del source_content["root_path"]
for macro_content in manifest.get("macros", {}).values():
if "root_path" in macro_content:
del macro_content["root_path"]
for doc_content in manifest.get("docs", {}).values():
if "root_path" in doc_content:
del doc_content["root_path"]
return manifest
@@ -291,7 +310,7 @@ class VersionedSchema(dbtClassMixin):
expected=str(cls.dbt_schema_version),
found=previous_schema_version,
)
if get_manifest_schema_version(data) <= 6:
if get_manifest_schema_version(data) <= 7:
data = upgrade_manifest_json(data)
return cls.from_dict(data) # type: ignore

View File

@@ -1,14 +1,14 @@
import abc
from typing import Optional, Set, List, Dict, ClassVar
import dbt.exceptions
from dbt import ui
import dbt.tracking
class DBTDeprecation:
_name: ClassVar[Optional[str]] = None
_description: ClassVar[Optional[str]] = None
_event: ClassVar[Optional[str]] = None
@property
def name(self) -> str:
@@ -21,66 +21,50 @@ class DBTDeprecation:
dbt.tracking.track_deprecation_warn({"deprecation_name": self.name})
@property
def description(self) -> str:
if self._description is not None:
return self._description
raise NotImplementedError("description not implemented for {}".format(self))
def event(self) -> abc.ABCMeta:
if self._event is not None:
module_path = dbt.events.types
class_name = self._event
try:
return getattr(module_path, class_name)
except AttributeError:
msg = f"Event Class `{class_name}` is not defined in `{module_path}`"
raise NameError(msg)
raise NotImplementedError("event not implemented for {}".format(self._event))
def show(self, *args, **kwargs) -> None:
if self.name not in active_deprecations:
desc = self.description.format(**kwargs)
msg = ui.line_wrap_message(desc, prefix="Deprecated functionality\n\n")
dbt.exceptions.warn_or_error(msg, log_fmt=ui.warning_tag("{}"))
event = self.event(**kwargs)
dbt.events.functions.warn_or_error(event)
self.track_deprecation_warn()
active_deprecations.add(self.name)
class PackageRedirectDeprecation(DBTDeprecation):
_name = "package-redirect"
_description = """\
The `{old_name}` package is deprecated in favor of `{new_name}`. Please update
your `packages.yml` configuration to use `{new_name}` instead.
"""
_event = "PackageRedirectDeprecation"
class PackageInstallPathDeprecation(DBTDeprecation):
_name = "install-packages-path"
_description = """\
The default package install path has changed from `dbt_modules` to `dbt_packages`.
Please update `clean-targets` in `dbt_project.yml` and check `.gitignore` as well.
Or, set `packages-install-path: dbt_modules` if you'd like to keep the current value.
"""
_event = "PackageInstallPathDeprecation"
class ConfigPathDeprecation(DBTDeprecation):
_description = """\
The `{deprecated_path}` config has been renamed to `{exp_path}`.
Please update your `dbt_project.yml` configuration to reflect this change.
"""
class ConfigSourcePathDeprecation(ConfigPathDeprecation):
class ConfigSourcePathDeprecation(DBTDeprecation):
_name = "project-config-source-paths"
_event = "ConfigSourcePathDeprecation"
class ConfigDataPathDeprecation(ConfigPathDeprecation):
class ConfigDataPathDeprecation(DBTDeprecation):
_name = "project-config-data-paths"
_adapter_renamed_description = """\
The adapter function `adapter.{old_name}` is deprecated and will be removed in
a future release of dbt. Please use `adapter.{new_name}` instead.
Documentation for {new_name} can be found here:
https://docs.getdbt.com/docs/adapter
"""
_event = "ConfigDataPathDeprecation"
def renamed_method(old_name: str, new_name: str):
class AdapterDeprecationWarning(DBTDeprecation):
_name = "adapter:{}".format(old_name)
_description = _adapter_renamed_description.format(old_name=old_name, new_name=new_name)
_event = "AdapterDeprecationWarning"
dep = AdapterDeprecationWarning()
deprecations_list.append(dep)
@@ -89,26 +73,12 @@ def renamed_method(old_name: str, new_name: str):
class MetricAttributesRenamed(DBTDeprecation):
_name = "metric-attr-renamed"
_description = """\
dbt-core v1.3 renamed attributes for metrics:
\n 'sql' -> 'expression'
\n 'type' -> 'calculation_method'
\n 'type: expression' -> 'calculation_method: derived'
\nThe old metric parameter names will be fully deprecated in v1.4.
\nPlease remove them from the metric definition of metric '{metric_name}'
\nRelevant issue here: https://github.com/dbt-labs/dbt-core/issues/5849
"""
_event = "MetricAttributesRenamed"
class ExposureNameDeprecation(DBTDeprecation):
_name = "exposure-name"
_description = """\
Starting in v1.3, the 'name' of an exposure should contain only letters, numbers, and underscores.
Exposures support a new property, 'label', which may contain spaces, capital letters, and special characters.
{exposure} does not follow this pattern.
Please update the 'name', and use the 'label' property for a human-friendly title.
This will raise an error in a future version of dbt-core.
"""
_event = "ExposureNameDeprecation"
def warn(name, *args, **kwargs):
@@ -125,12 +95,12 @@ def warn(name, *args, **kwargs):
active_deprecations: Set[str] = set()
deprecations_list: List[DBTDeprecation] = [
ExposureNameDeprecation(),
PackageRedirectDeprecation(),
PackageInstallPathDeprecation(),
ConfigSourcePathDeprecation(),
ConfigDataPathDeprecation(),
PackageInstallPathDeprecation(),
PackageRedirectDeprecation(),
MetricAttributesRenamed(),
ExposureNameDeprecation(),
]
deprecations: Dict[str, DBTDeprecation] = {d.name: d for d in deprecations_list}

View File

@@ -74,7 +74,7 @@ class PinnedPackage(BasePackage):
raise NotImplementedError
@abc.abstractmethod
def install(self, project):
def install(self, project, renderer):
raise NotImplementedError
@abc.abstractmethod

View File

@@ -9,14 +9,9 @@ from dbt.contracts.project import (
GitPackage,
)
from dbt.deps.base import PinnedPackage, UnpinnedPackage, get_downloads_path
from dbt.exceptions import ExecutableError, warn_or_error, raise_dependency_error
from dbt.events.functions import fire_event
from dbt.events.types import EnsureGitInstalled
from dbt import ui
PIN_PACKAGE_URL = (
"https://docs.getdbt.com/docs/package-management#section-specifying-package-versions" # noqa
)
from dbt.exceptions import ExecutableError, raise_dependency_error
from dbt.events.functions import fire_event, warn_or_error
from dbt.events.types import EnsureGitInstalled, DepsUnpinned
def md5sum(s: str):
@@ -62,14 +57,6 @@ class GitPinnedPackage(GitPackageMixin, PinnedPackage):
else:
return "revision {}".format(self.revision)
def unpinned_msg(self):
if self.revision == "HEAD":
return "not pinned, using HEAD (default branch)"
elif self.revision in ("main", "master"):
return f'pinned to the "{self.revision}" branch'
else:
return None
def _checkout(self):
"""Performs a shallow clone of the repository into the downloads
directory. This function can be called repeatedly. If the project has
@@ -92,14 +79,8 @@ class GitPinnedPackage(GitPackageMixin, PinnedPackage):
def _fetch_metadata(self, project, renderer) -> ProjectPackageMetadata:
path = self._checkout()
if self.unpinned_msg() and self.warn_unpinned:
warn_or_error(
'The git package "{}" \n\tis {}.\n\tThis can introduce '
"breaking changes into your project without warning!\n\nSee {}".format(
self.git, self.unpinned_msg(), PIN_PACKAGE_URL
),
log_fmt=ui.yellow("WARNING: {}"),
)
if (self.revision == "HEAD" or self.revision in ("main", "master")) and self.warn_unpinned:
warn_or_error(DepsUnpinned(git=self.git))
loaded = Project.from_project_root(path, renderer)
return ProjectPackageMetadata.from_project(loaded)

View File

@@ -8,9 +8,10 @@ The event module provides types that represent what is happening in dbt in `even
When events are processed via `fire_event`, nearly everything is logged. Whether or not the user has enabled the debug flag, all debug messages are still logged to the file. However, some events are particularly time consuming to construct because they return a huge amount of data. Today, the only messages in this category are cache events and are only logged if the `--log-cache-events` flag is on. This is important because these messages should not be created unless they are going to be logged, because they cause a noticable performance degredation. These events use a "fire_event_if" functions.
# Adding a New Event
New events need to have a proto message definition created in core/dbt/events/types.proto. Every message must include EventInfo as the first field, named "info" and numbered 1. To update the proto_types.py file, in the core/dbt/events directory: ```protoc --python_betterproto_out . types.proto```
A matching class needs to be created in the core/dbt/events/types.py file, which will have two superclasses, the "Level" mixin and the generated class from proto_types.py. These classes will also generally have two methods, a "code" method that returns the event code, and a "message" method that is used to construct the "msg" from the event fields. In addition the "Level" mixin will provide a "level_tag" method to set the level (which can also be overridden using the "info" convenience function from functions.py)
* Add a new message in types.proto with an EventInfo field first
* run the protoc compiler to update proto_types.py: ```protoc --python_betterproto_out . types.proto```
* Add a wrapping class in core/dbt/event/types.py with a Level superclass and the superclass from proto_types.py, plus code and message methods
* Add the class to tests/unit/test_events.py
Note that no attributes can exist in these event classes except for fields defined in the protobuf definitions, because the betterproto metaclass will throw an error. Betterproto provides a to_dict() method to convert the generated classes to a dictionary and from that to json. However some attributes will successfully convert to dictionaries but not to serialized protobufs, so we need to test both output formats.

View File

@@ -9,6 +9,7 @@ from dbt.events.types import (
)
# N.B. No guarantees for what type param msg is.
@dataclass
class AdapterLogger:
name: str

View File

@@ -49,7 +49,9 @@ class BaseEvent:
def __post_init__(self):
super().__post_init__()
self.info.level = self.level_tag()
if not self.info.level:
self.info.level = self.level_tag()
assert self.info.level in ["info", "warn", "error", "debug", "test"]
if not hasattr(self.info, "msg") or not self.info.msg:
self.info.msg = self.message()
self.info.invocation_id = get_invocation_id()
@@ -60,13 +62,25 @@ class BaseEvent:
self.info.code = self.code()
self.info.name = type(self).__name__
def level_tag(self):
raise Exception("level_tag() not implemented for event")
def level_tag(self) -> str:
return "debug"
# This is here because although we know that info should always
# exist, mypy doesn't.
def log_level(self) -> str:
return self.info.level # type: ignore
def message(self):
raise Exception("message() not implemented for event")
# DynamicLevel requires that the level be supplied on the
# event construction call using the "info" function from functions.py
@dataclass # type: ignore[misc]
class DynamicLevel(BaseEvent):
pass
@dataclass
class TestLevel(BaseEvent):
__test__ = False
@@ -99,6 +113,23 @@ class ErrorLevel(BaseEvent):
return "error"
# Included to ensure classes with str-type message members are initialized correctly.
@dataclass # type: ignore[misc]
class AdapterEventStringFunctor:
def __post_init__(self):
super().__post_init__()
if not isinstance(self.base_msg, str):
self.base_msg = str(self.base_msg)
@dataclass # type: ignore[misc]
class EventStringFunctor:
def __post_init__(self):
super().__post_init__()
if not isinstance(self.msg, str):
self.msg = str(self.msg)
# prevents an event from going to the file
# This should rarely be used in core code. It is currently
# only used in integration tests and for the 'clean' command.

View File

@@ -1,9 +1,13 @@
import betterproto
from colorama import Style
from dbt.events.base_types import NoStdOut, BaseEvent, NoFile, Cache
from dbt.events.types import EventBufferFull, MainReportVersion, EmptyLine
from dbt.events.proto_types import EventInfo
from dbt.events.helpers import env_secrets, scrub_secrets
import dbt.flags as flags
from dbt.constants import SECRET_ENV_PREFIX, METADATA_ENV_PREFIX
from dbt.constants import METADATA_ENV_PREFIX
from dbt.logger import make_log_dir_if_missing, GLOBAL_LOGGER
from datetime import datetime
@@ -18,7 +22,8 @@ from logging.handlers import RotatingFileHandler
import os
import uuid
import threading
from typing import List, Optional, Union, Callable, Dict
from typing import Optional, Union, Callable, Dict
from collections import deque
LOG_VERSION = 3
@@ -108,19 +113,6 @@ def stop_capture_stdout_logs() -> None:
]
def env_secrets() -> List[str]:
return [v for k, v in os.environ.items() if k.startswith(SECRET_ENV_PREFIX) and v.strip()]
def scrub_secrets(msg: str, secrets: List[str]) -> str:
scrubbed = msg
for secret in secrets:
scrubbed = scrubbed.replace(secret, "*****")
return scrubbed
# returns a dictionary representation of the event fields.
# the message may contain secrets which must be scrubbed at the usage site.
def event_to_json(
@@ -168,7 +160,7 @@ def create_debug_text_log_line(e: BaseEvent) -> str:
ts: str = get_ts().strftime("%H:%M:%S.%f")
scrubbed_msg: str = scrub_secrets(e.message(), env_secrets())
# Make the levels all 5 characters so they line up
level: str = f"{e.level_tag():<5}"
level: str = f"{e.log_level():<5}"
thread = ""
if threading.current_thread().name:
thread_name = threading.current_thread().name
@@ -200,26 +192,35 @@ def create_log_line(e: BaseEvent, file_output=False) -> Optional[str]:
# allows for reuse of this obnoxious if else tree.
# do not use for exceptions, it doesn't pass along exc_info, stack_info, or extra
def send_to_logger(l: Union[Logger, logbook.Logger], level_tag: str, log_line: str):
def send_to_logger(l: Union[Logger, logbook.Logger], level: str, log_line: str):
if not log_line:
return
if level_tag == "test":
if level == "test":
# TODO after implmenting #3977 send to new test level
l.debug(log_line)
elif level_tag == "debug":
elif level == "debug":
l.debug(log_line)
elif level_tag == "info":
elif level == "info":
l.info(log_line)
elif level_tag == "warn":
elif level == "warn":
l.warning(log_line)
elif level_tag == "error":
elif level == "error":
l.error(log_line)
else:
raise AssertionError(
f"While attempting to log {log_line}, encountered the unhandled level: {level_tag}"
f"While attempting to log {log_line}, encountered the unhandled level: {level}"
)
def warn_or_error(event, node=None):
if flags.WARN_ERROR:
from dbt.exceptions import raise_compiler_error
raise_compiler_error(scrub_secrets(event.info.msg, env_secrets()), node)
else:
fire_event(event)
# an alternative to fire_event which only creates and logs the event value
# if the condition is met. Does nothing otherwise.
def fire_event_if(conditional: bool, lazy_e: Callable[[], BaseEvent]) -> None:
@@ -244,7 +245,7 @@ def fire_event(e: BaseEvent) -> None:
# destination
log_line = create_log_line(e)
if log_line:
send_to_logger(GLOBAL_LOGGER, e.level_tag(), log_line)
send_to_logger(GLOBAL_LOGGER, level=e.log_level(), log_line=log_line)
return # exit the function to avoid using the current logger as well
# always logs debug level regardless of user input
@@ -252,19 +253,19 @@ def fire_event(e: BaseEvent) -> None:
log_line = create_log_line(e, file_output=True)
# doesn't send exceptions to exception logger
if log_line:
send_to_logger(FILE_LOG, level_tag=e.level_tag(), log_line=log_line)
send_to_logger(FILE_LOG, level=e.log_level(), log_line=log_line)
if not isinstance(e, NoStdOut):
# explicitly checking the debug flag here so that potentially expensive-to-construct
# log messages are not constructed if debug messages are never shown.
if e.level_tag() == "debug" and not flags.DEBUG:
if e.log_level() == "debug" and not flags.DEBUG:
return # eat the message in case it was one of the expensive ones
if e.level_tag() != "error" and flags.QUIET:
if e.log_level() != "error" and flags.QUIET:
return # eat all non-exception messages in quiet mode
log_line = create_log_line(e)
if log_line:
send_to_logger(STDOUT_LOG, level_tag=e.level_tag(), log_line=log_line)
send_to_logger(STDOUT_LOG, level=e.log_level(), log_line=log_line)
def get_metadata_vars() -> Dict[str, str]:
@@ -325,3 +326,11 @@ def add_to_event_history(event):
def reset_event_history():
global EVENT_HISTORY
EVENT_HISTORY = deque(maxlen=flags.EVENT_BUFFER_SIZE)
# Currently used to set the level in EventInfo, so logging events can
# provide more than one "level". Might be used in the future to set
# more fields in EventInfo, once some of that information is no longer global
def info(level="info"):
info = EventInfo(level=level)
return info

View File

@@ -0,0 +1,16 @@
import os
from typing import List
from dbt.constants import SECRET_ENV_PREFIX
def env_secrets() -> List[str]:
return [v for k, v in os.environ.items() if k.startswith(SECRET_ENV_PREFIX) and v.strip()]
def scrub_secrets(msg: str, secrets: List[str]) -> str:
scrubbed = msg
for secret in secrets:
scrubbed = scrubbed.replace(secret, "*****")
return scrubbed

View File

@@ -23,6 +23,7 @@ class EventInfo(betterproto.Message):
extra: Dict[str, str] = betterproto.map_field(
9, betterproto.TYPE_STRING, betterproto.TYPE_STRING
)
category: str = betterproto.string_field(10)
@dataclass
@@ -52,7 +53,6 @@ class NodeInfo(betterproto.Message):
class RunResultMsg(betterproto.Message):
"""RunResult"""
# status: Union[RunStatus, TestStatus, FreshnessStatus]
status: str = betterproto.string_field(1)
message: str = betterproto.string_field(2)
timing_info: List["TimingInfoMsg"] = betterproto.message_field(3)
@@ -281,6 +281,65 @@ class ProjectCreated(betterproto.Message):
slack_url: str = betterproto.string_field(4)
@dataclass
class PackageRedirectDeprecation(betterproto.Message):
"""D001"""
info: "EventInfo" = betterproto.message_field(1)
old_name: str = betterproto.string_field(2)
new_name: str = betterproto.string_field(3)
@dataclass
class PackageInstallPathDeprecation(betterproto.Message):
"""D002"""
info: "EventInfo" = betterproto.message_field(1)
@dataclass
class ConfigSourcePathDeprecation(betterproto.Message):
"""D003"""
info: "EventInfo" = betterproto.message_field(1)
deprecated_path: str = betterproto.string_field(2)
exp_path: str = betterproto.string_field(3)
@dataclass
class ConfigDataPathDeprecation(betterproto.Message):
"""D004"""
info: "EventInfo" = betterproto.message_field(1)
deprecated_path: str = betterproto.string_field(2)
exp_path: str = betterproto.string_field(3)
@dataclass
class AdapterDeprecationWarning(betterproto.Message):
"""D005"""
info: "EventInfo" = betterproto.message_field(1)
old_name: str = betterproto.string_field(2)
new_name: str = betterproto.string_field(3)
@dataclass
class MetricAttributesRenamed(betterproto.Message):
"""D006"""
info: "EventInfo" = betterproto.message_field(1)
metric_name: str = betterproto.string_field(2)
@dataclass
class ExposureNameDeprecation(betterproto.Message):
"""D007"""
info: "EventInfo" = betterproto.message_field(1)
exposure: str = betterproto.string_field(2)
@dataclass
class AdapterEventDebug(betterproto.Message):
"""E001"""
@@ -340,7 +399,7 @@ class ConnectionReused(betterproto.Message):
@dataclass
class ConnectionLeftOpen(betterproto.Message):
class ConnectionLeftOpenInCleanup(betterproto.Message):
"""E007"""
info: "EventInfo" = betterproto.message_field(1)
@@ -348,7 +407,7 @@ class ConnectionLeftOpen(betterproto.Message):
@dataclass
class ConnectionClosed(betterproto.Message):
class ConnectionClosedInCleanup(betterproto.Message):
"""E008"""
info: "EventInfo" = betterproto.message_field(1)
@@ -365,7 +424,7 @@ class RollbackFailed(betterproto.Message):
@dataclass
class ConnectionClosed2(betterproto.Message):
class ConnectionClosed(betterproto.Message):
"""E010"""
info: "EventInfo" = betterproto.message_field(1)
@@ -373,7 +432,7 @@ class ConnectionClosed2(betterproto.Message):
@dataclass
class ConnectionLeftOpen2(betterproto.Message):
class ConnectionLeftOpen(betterproto.Message):
"""E011"""
info: "EventInfo" = betterproto.message_field(1)
@@ -629,6 +688,14 @@ class CodeExecutionStatus(betterproto.Message):
elapsed: float = betterproto.float_field(3)
@dataclass
class CatalogGenerationError(betterproto.Message):
"""E040"""
info: "EventInfo" = betterproto.message_field(1)
exc: str = betterproto.string_field(2)
@dataclass
class WriteCatalogFailure(betterproto.Message):
"""E041"""
@@ -1066,17 +1133,119 @@ class PartialParsingDeletedExposure(betterproto.Message):
@dataclass
class InvalidDisabledSourceInTestNode(betterproto.Message):
class InvalidDisabledTargetInTestNode(betterproto.Message):
"""I050"""
info: "EventInfo" = betterproto.message_field(1)
msg: str = betterproto.string_field(2)
resource_type_title: str = betterproto.string_field(2)
unique_id: str = betterproto.string_field(3)
original_file_path: str = betterproto.string_field(4)
target_kind: str = betterproto.string_field(5)
target_name: str = betterproto.string_field(6)
target_package: str = betterproto.string_field(7)
@dataclass
class InvalidRefInTestNode(betterproto.Message):
class UnusedResourceConfigPath(betterproto.Message):
"""I051"""
info: "EventInfo" = betterproto.message_field(1)
unused_config_paths: List[str] = betterproto.string_field(2)
@dataclass
class SeedIncreased(betterproto.Message):
"""I052"""
info: "EventInfo" = betterproto.message_field(1)
package_name: str = betterproto.string_field(2)
name: str = betterproto.string_field(3)
@dataclass
class SeedExceedsLimitSamePath(betterproto.Message):
"""I053"""
info: "EventInfo" = betterproto.message_field(1)
package_name: str = betterproto.string_field(2)
name: str = betterproto.string_field(3)
@dataclass
class SeedExceedsLimitAndPathChanged(betterproto.Message):
"""I054"""
info: "EventInfo" = betterproto.message_field(1)
package_name: str = betterproto.string_field(2)
name: str = betterproto.string_field(3)
@dataclass
class SeedExceedsLimitChecksumChanged(betterproto.Message):
"""I055"""
info: "EventInfo" = betterproto.message_field(1)
package_name: str = betterproto.string_field(2)
name: str = betterproto.string_field(3)
checksum_name: str = betterproto.string_field(4)
@dataclass
class UnusedTables(betterproto.Message):
"""I056"""
info: "EventInfo" = betterproto.message_field(1)
unused_tables: List[str] = betterproto.string_field(2)
@dataclass
class WrongResourceSchemaFile(betterproto.Message):
"""I057"""
info: "EventInfo" = betterproto.message_field(1)
patch_name: str = betterproto.string_field(2)
resource_type: str = betterproto.string_field(3)
plural_resource_type: str = betterproto.string_field(4)
yaml_key: str = betterproto.string_field(5)
file_path: str = betterproto.string_field(6)
@dataclass
class NoNodeForYamlKey(betterproto.Message):
"""I058"""
info: "EventInfo" = betterproto.message_field(1)
patch_name: str = betterproto.string_field(2)
yaml_key: str = betterproto.string_field(3)
file_path: str = betterproto.string_field(4)
@dataclass
class MacroPatchNotFound(betterproto.Message):
"""I059"""
info: "EventInfo" = betterproto.message_field(1)
patch_name: str = betterproto.string_field(2)
@dataclass
class NodeNotFoundOrDisabled(betterproto.Message):
"""I060"""
info: "EventInfo" = betterproto.message_field(1)
original_file_path: str = betterproto.string_field(2)
unique_id: str = betterproto.string_field(3)
resource_type_title: str = betterproto.string_field(4)
target_name: str = betterproto.string_field(5)
target_kind: str = betterproto.string_field(6)
target_package: str = betterproto.string_field(7)
disabled: str = betterproto.string_field(8)
@dataclass
class JinjaLogWarning(betterproto.Message):
"""I061"""
info: "EventInfo" = betterproto.message_field(1)
msg: str = betterproto.string_field(2)
@@ -1166,7 +1335,7 @@ class SelectorReportInvalidSelector(betterproto.Message):
@dataclass
class MacroEventInfo(betterproto.Message):
class JinjaLogInfo(betterproto.Message):
"""M011"""
info: "EventInfo" = betterproto.message_field(1)
@@ -1174,7 +1343,7 @@ class MacroEventInfo(betterproto.Message):
@dataclass
class MacroEventDebug(betterproto.Message):
class JinjaLogDebug(betterproto.Message):
"""M012"""
info: "EventInfo" = betterproto.message_field(1)
@@ -1309,6 +1478,23 @@ class DepsSetDownloadDirectory(betterproto.Message):
path: str = betterproto.string_field(2)
@dataclass
class DepsUnpinned(betterproto.Message):
"""M029"""
info: "EventInfo" = betterproto.message_field(1)
revision: str = betterproto.string_field(2)
git: str = betterproto.string_field(3)
@dataclass
class NoNodesForSelectionCriteria(betterproto.Message):
"""M030"""
info: "EventInfo" = betterproto.message_field(1)
spec_raw: str = betterproto.string_field(2)
@dataclass
class RunningOperationCaughtError(betterproto.Message):
"""Q001"""
@@ -1357,57 +1543,21 @@ class SQLRunnerException(betterproto.Message):
@dataclass
class PrintErrorTestResult(betterproto.Message):
class LogTestResult(betterproto.Message):
"""Q007"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
name: str = betterproto.string_field(3)
index: int = betterproto.int32_field(4)
num_models: int = betterproto.int32_field(5)
execution_time: float = betterproto.float_field(6)
status: str = betterproto.string_field(4)
index: int = betterproto.int32_field(5)
num_models: int = betterproto.int32_field(6)
execution_time: float = betterproto.float_field(7)
num_failures: int = betterproto.int32_field(8)
@dataclass
class PrintPassTestResult(betterproto.Message):
"""Q008"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
name: str = betterproto.string_field(3)
index: int = betterproto.int32_field(4)
num_models: int = betterproto.int32_field(5)
execution_time: float = betterproto.float_field(6)
@dataclass
class PrintWarnTestResult(betterproto.Message):
"""Q009"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
name: str = betterproto.string_field(3)
index: int = betterproto.int32_field(4)
num_models: int = betterproto.int32_field(5)
execution_time: float = betterproto.float_field(6)
num_failures: int = betterproto.int32_field(7)
@dataclass
class PrintFailureTestResult(betterproto.Message):
"""Q010"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
name: str = betterproto.string_field(3)
index: int = betterproto.int32_field(4)
num_models: int = betterproto.int32_field(5)
execution_time: float = betterproto.float_field(6)
num_failures: int = betterproto.int32_field(7)
@dataclass
class PrintStartLine(betterproto.Message):
class LogStartLine(betterproto.Message):
"""Q011"""
info: "EventInfo" = betterproto.message_field(1)
@@ -1418,7 +1568,7 @@ class PrintStartLine(betterproto.Message):
@dataclass
class PrintModelResultLine(betterproto.Message):
class LogModelResult(betterproto.Message):
"""Q012"""
info: "EventInfo" = betterproto.message_field(1)
@@ -1427,40 +1577,11 @@ class PrintModelResultLine(betterproto.Message):
status: str = betterproto.string_field(4)
index: int = betterproto.int32_field(5)
total: int = betterproto.int32_field(6)
execution_time: float = betterproto.float_field(7)
execution_time: int = betterproto.int32_field(7)
@dataclass
class PrintModelErrorResultLine(betterproto.Message):
"""Q013"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
description: str = betterproto.string_field(3)
status: str = betterproto.string_field(4)
index: int = betterproto.int32_field(5)
total: int = betterproto.int32_field(6)
execution_time: float = betterproto.float_field(7)
@dataclass
class PrintSnapshotErrorResultLine(betterproto.Message):
"""Q014"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
description: str = betterproto.string_field(3)
status: str = betterproto.string_field(4)
index: int = betterproto.int32_field(5)
total: int = betterproto.int32_field(6)
execution_time: float = betterproto.float_field(7)
cfg: Dict[str, str] = betterproto.map_field(
8, betterproto.TYPE_STRING, betterproto.TYPE_STRING
)
@dataclass
class PrintSnapshotResultLine(betterproto.Message):
class LogSnapshotResult(betterproto.Message):
"""Q015"""
info: "EventInfo" = betterproto.message_field(1)
@@ -1476,87 +1597,36 @@ class PrintSnapshotResultLine(betterproto.Message):
@dataclass
class PrintSeedErrorResultLine(betterproto.Message):
class LogSeedResult(betterproto.Message):
"""Q016"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
status: str = betterproto.string_field(3)
index: int = betterproto.int32_field(4)
total: int = betterproto.int32_field(5)
execution_time: float = betterproto.float_field(6)
schema: str = betterproto.string_field(7)
relation: str = betterproto.string_field(8)
result_message: str = betterproto.string_field(4)
index: int = betterproto.int32_field(5)
total: int = betterproto.int32_field(6)
execution_time: float = betterproto.float_field(7)
schema: str = betterproto.string_field(8)
relation: str = betterproto.string_field(9)
@dataclass
class PrintSeedResultLine(betterproto.Message):
"""Q017"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
status: str = betterproto.string_field(3)
index: int = betterproto.int32_field(4)
total: int = betterproto.int32_field(5)
execution_time: float = betterproto.float_field(6)
schema: str = betterproto.string_field(7)
relation: str = betterproto.string_field(8)
@dataclass
class PrintFreshnessErrorLine(betterproto.Message):
class LogFreshnessResult(betterproto.Message):
"""Q018"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
index: int = betterproto.int32_field(3)
total: int = betterproto.int32_field(4)
execution_time: float = betterproto.float_field(5)
source_name: str = betterproto.string_field(6)
table_name: str = betterproto.string_field(7)
status: str = betterproto.string_field(2)
node_info: "NodeInfo" = betterproto.message_field(3)
index: int = betterproto.int32_field(4)
total: int = betterproto.int32_field(5)
execution_time: float = betterproto.float_field(6)
source_name: str = betterproto.string_field(7)
table_name: str = betterproto.string_field(8)
@dataclass
class PrintFreshnessErrorStaleLine(betterproto.Message):
"""Q019"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
index: int = betterproto.int32_field(3)
total: int = betterproto.int32_field(4)
execution_time: float = betterproto.float_field(5)
source_name: str = betterproto.string_field(6)
table_name: str = betterproto.string_field(7)
@dataclass
class PrintFreshnessWarnLine(betterproto.Message):
"""Q020"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
index: int = betterproto.int32_field(3)
total: int = betterproto.int32_field(4)
execution_time: float = betterproto.float_field(5)
source_name: str = betterproto.string_field(6)
table_name: str = betterproto.string_field(7)
@dataclass
class PrintFreshnessPassLine(betterproto.Message):
"""Q021"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
index: int = betterproto.int32_field(3)
total: int = betterproto.int32_field(4)
execution_time: float = betterproto.float_field(5)
source_name: str = betterproto.string_field(6)
table_name: str = betterproto.string_field(7)
@dataclass
class PrintCancelLine(betterproto.Message):
class LogCancelLine(betterproto.Message):
"""Q022"""
info: "EventInfo" = betterproto.message_field(1)
@@ -1642,7 +1712,7 @@ class NodeExecuting(betterproto.Message):
@dataclass
class PrintHookStartLine(betterproto.Message):
class LogHookStartLine(betterproto.Message):
"""Q032"""
info: "EventInfo" = betterproto.message_field(1)
@@ -1653,7 +1723,7 @@ class PrintHookStartLine(betterproto.Message):
@dataclass
class PrintHookEndLine(betterproto.Message):
class LogHookEndLine(betterproto.Message):
"""Q033"""
info: "EventInfo" = betterproto.message_field(1)
@@ -1678,6 +1748,13 @@ class SkippingDetails(betterproto.Message):
total: int = betterproto.int32_field(7)
@dataclass
class NothingToDo(betterproto.Message):
"""Q035"""
info: "EventInfo" = betterproto.message_field(1)
@dataclass
class RunningOperationUncaughtError(betterproto.Message):
"""Q036"""
@@ -1697,6 +1774,13 @@ class EndRunResult(betterproto.Message):
success: bool = betterproto.bool_field(5)
@dataclass
class NoNodesSelected(betterproto.Message):
"""Q038"""
info: "EventInfo" = betterproto.message_field(1)
@dataclass
class CatchableExceptionOnRun(betterproto.Message):
"""W002"""
@@ -1824,7 +1908,7 @@ class TimingInfoCollected(betterproto.Message):
@dataclass
class PrintDebugStackTrace(betterproto.Message):
class LogDebugStackTrace(betterproto.Message):
"""Z011"""
info: "EventInfo" = betterproto.message_field(1)
@@ -1991,7 +2075,7 @@ class EndOfRunSummary(betterproto.Message):
@dataclass
class PrintSkipBecauseError(betterproto.Message):
class LogSkipBecauseError(betterproto.Message):
"""Z034"""
info: "EventInfo" = betterproto.message_field(1)
@@ -2066,34 +2150,16 @@ class TrackingInitializeFailure(betterproto.Message):
exc_info: str = betterproto.string_field(2)
@dataclass
class GeneralWarningMsg(betterproto.Message):
"""Z046"""
info: "EventInfo" = betterproto.message_field(1)
msg: str = betterproto.string_field(2)
log_fmt: str = betterproto.string_field(3)
@dataclass
class GeneralWarningException(betterproto.Message):
"""Z047"""
info: "EventInfo" = betterproto.message_field(1)
exc: str = betterproto.string_field(2)
log_fmt: str = betterproto.string_field(3)
@dataclass
class EventBufferFull(betterproto.Message):
"""Z048"""
"""Z045"""
info: "EventInfo" = betterproto.message_field(1)
@dataclass
class RunResultWarningMessage(betterproto.Message):
"""Z049"""
"""Z046"""
info: "EventInfo" = betterproto.message_field(1)
msg: str = betterproto.string_field(2)

View File

@@ -61,18 +61,3 @@ class UnitTestInfo(InfoLevel, NoFile, pl.UnitTestInfo):
def message(self) -> str:
return f"Unit Test: {self.msg}"
# since mypy doesn't run on every file we need to suggest to mypy that every
# class gets instantiated. But we don't actually want to run this code.
# making the conditional `if False` causes mypy to skip it as dead code so
# we need to skirt around that by computing something it doesn't check statically.
#
# TODO remove these lines once we run mypy everywhere.
if 1 == 0:
IntegrationTestInfo(msg="")
IntegrationTestDebug(msg="")
IntegrationTestWarn(msg="")
IntegrationTestError(msg="")
IntegrationTestException(msg="")
UnitTestInfo(msg="")

View File

@@ -15,6 +15,7 @@ message EventInfo {
string thread = 7;
google.protobuf.Timestamp ts = 8;
map<string, string> extra = 9;
string category = 10;
}
// TimingInfo
@@ -38,7 +39,6 @@ message NodeInfo {
// RunResult
message RunResultMsg {
// status: Union[RunStatus, TestStatus, FreshnessStatus]
string status = 1;
string message = 2;
repeated TimingInfoMsg timing_info = 3;
@@ -213,6 +213,53 @@ message ProjectCreated {
string slack_url = 4;
}
// D - Deprecation
// D001
message PackageRedirectDeprecation {
EventInfo info = 1;
string old_name = 2;
string new_name = 3;
}
// D002
message PackageInstallPathDeprecation {
EventInfo info = 1;
}
// D003
message ConfigSourcePathDeprecation {
EventInfo info = 1;
string deprecated_path = 2;
string exp_path = 3;
}
// D004
message ConfigDataPathDeprecation {
EventInfo info = 1;
string deprecated_path = 2;
string exp_path = 3;
}
//D005
message AdapterDeprecationWarning {
EventInfo info = 1;
string old_name = 2;
string new_name = 3;
}
//D006
message MetricAttributesRenamed {
EventInfo info = 1;
string metric_name = 2;
}
//D007
message ExposureNameDeprecation {
EventInfo info = 1;
string exposure = 2;
}
// E - DB Adapter
// E001
@@ -262,13 +309,13 @@ message ConnectionReused {
}
// E007
message ConnectionLeftOpen {
message ConnectionLeftOpenInCleanup {
EventInfo info = 1;
string conn_name = 2;
}
// E008
message ConnectionClosed {
message ConnectionClosedInCleanup {
EventInfo info = 1;
string conn_name = 2;
}
@@ -281,13 +328,13 @@ message RollbackFailed {
}
// E010
message ConnectionClosed2 {
message ConnectionClosed {
EventInfo info = 1;
string conn_name = 2;
}
// E011
message ConnectionLeftOpen2 {
message ConnectionLeftOpen {
EventInfo info = 1;
string conn_name = 2;
}
@@ -455,7 +502,6 @@ message AdapterImportError {
message PluginLoadError {
EventInfo info = 1;
string exc_info = 2;
}
// E037
@@ -478,7 +524,11 @@ message CodeExecutionStatus {
float elapsed = 3;
}
// Skipped E040
// E040
message CatalogGenerationError {
EventInfo info = 1;
string exc = 2;
}
// E041
message WriteCatalogFailure {
@@ -806,18 +856,99 @@ message PartialParsingDeletedExposure {
}
// I050
message InvalidDisabledSourceInTestNode {
message InvalidDisabledTargetInTestNode {
EventInfo info = 1;
string msg = 2;
string resource_type_title = 2;
string unique_id = 3;
string original_file_path = 4;
string target_kind = 5;
string target_name = 6;
string target_package = 7;
}
// I051
message InvalidRefInTestNode {
message UnusedResourceConfigPath {
EventInfo info = 1;
repeated string unused_config_paths = 2;
}
// I052
message SeedIncreased {
EventInfo info = 1;
string package_name = 2;
string name = 3;
}
// I053
message SeedExceedsLimitSamePath {
EventInfo info = 1;
string package_name = 2;
string name = 3;
}
// I054
message SeedExceedsLimitAndPathChanged {
EventInfo info = 1;
string package_name = 2;
string name = 3;
}
// I055
message SeedExceedsLimitChecksumChanged {
EventInfo info = 1;
string package_name = 2;
string name = 3;
string checksum_name = 4;
}
// I056
message UnusedTables {
EventInfo info = 1;
repeated string unused_tables = 2;
}
// I057
message WrongResourceSchemaFile {
EventInfo info = 1;
string patch_name = 2;
string resource_type = 3;
string plural_resource_type = 4;
string yaml_key = 5;
string file_path = 6;
}
// I058
message NoNodeForYamlKey {
EventInfo info = 1;
string patch_name = 2;
string yaml_key = 3;
string file_path = 4;
}
// I059
message MacroPatchNotFound {
EventInfo info = 1;
string patch_name = 2;
}
// I060
message NodeNotFoundOrDisabled {
EventInfo info = 1;
string original_file_path = 2;
string unique_id = 3;
string resource_type_title = 4;
string target_name = 5;
string target_kind = 6;
string target_package = 7;
string disabled = 8;
}
// I061
message JinjaLogWarning {
EventInfo info = 1;
string msg = 2;
}
// M - Deps generation
// M001
@@ -885,13 +1016,13 @@ message SelectorReportInvalidSelector {
}
// M011
message MacroEventInfo {
message JinjaLogInfo {
EventInfo info = 1;
string msg = 2;
}
// M012
message MacroEventDebug {
message JinjaLogDebug {
EventInfo info = 1;
string msg = 2;
}
@@ -992,6 +1123,19 @@ message DepsSetDownloadDirectory {
string path = 2;
}
// M029
message DepsUnpinned {
EventInfo info = 1;
string revision = 2;
string git = 3;
}
// M030
message NoNodesForSelectionCriteria {
EventInfo info = 1;
string spec_raw = 2;
}
// Q - Node execution
// Q001
@@ -1030,49 +1174,23 @@ message SQLRunnerException {
}
// Q007
message PrintErrorTestResult {
message LogTestResult {
EventInfo info = 1;
NodeInfo node_info = 2;
string name = 3;
int32 index = 4;
int32 num_models = 5;
float execution_time = 6;
string status = 4;
int32 index = 5;
int32 num_models = 6;
float execution_time = 7;
int32 num_failures = 8;
}
// Q008
message PrintPassTestResult {
EventInfo info = 1;
NodeInfo node_info = 2;
string name = 3;
int32 index = 4;
int32 num_models = 5;
float execution_time = 6;
}
// Q009
message PrintWarnTestResult {
EventInfo info = 1;
NodeInfo node_info = 2;
string name = 3;
int32 index = 4;
int32 num_models = 5;
float execution_time = 6;
int32 num_failures = 7;
}
// Skipped Q008, Q009, Q010
// Q010
message PrintFailureTestResult {
EventInfo info = 1;
NodeInfo node_info = 2;
string name = 3;
int32 index = 4;
int32 num_models = 5;
float execution_time = 6;
int32 num_failures = 7;
}
// Q011
message PrintStartLine {
message LogStartLine {
EventInfo info = 1;
NodeInfo node_info = 2;
string description = 3;
@@ -1081,41 +1199,20 @@ message PrintStartLine {
}
// Q012
message PrintModelResultLine {
message LogModelResult {
EventInfo info = 1;
NodeInfo node_info = 2;
string description = 3;
string status = 4;
int32 index = 5;
int32 total = 6;
float execution_time = 7;
int32 execution_time = 7;
}
// Q013
message PrintModelErrorResultLine {
EventInfo info = 1;
NodeInfo node_info = 2;
string description = 3;
string status = 4;
int32 index = 5;
int32 total = 6;
float execution_time = 7;
}
// Q014
message PrintSnapshotErrorResultLine {
EventInfo info = 1;
NodeInfo node_info = 2;
string description = 3;
string status = 4;
int32 index = 5;
int32 total = 6;
float execution_time = 7;
map<string, string> cfg = 8;
}
// skipped Q013, Q014
// Q015
message PrintSnapshotResultLine {
message LogSnapshotResult {
EventInfo info = 1;
NodeInfo node_info = 2;
string description = 3;
@@ -1127,75 +1224,38 @@ message PrintSnapshotResultLine {
}
// Q016
message PrintSeedErrorResultLine {
message LogSeedResult {
EventInfo info = 1;
NodeInfo node_info = 2;
string status = 3;
int32 index = 4;
int32 total = 5;
float execution_time = 6;
string schema = 7;
string relation = 8;
string result_message = 4;
int32 index = 5;
int32 total = 6;
float execution_time = 7;
string schema = 8;
string relation = 9;
}
// Q017
message PrintSeedResultLine {
EventInfo info = 1;
NodeInfo node_info = 2;
string status = 3;
int32 index = 4;
int32 total = 5;
float execution_time = 6;
string schema = 7;
string relation = 8;
}
// Skipped Q017
// Q018
message PrintFreshnessErrorLine {
message LogFreshnessResult {
EventInfo info = 1;
NodeInfo node_info = 2;
int32 index = 3;
int32 total = 4;
float execution_time = 5;
string source_name = 6;
string table_name = 7;
string status = 2;
NodeInfo node_info = 3;
int32 index = 4;
int32 total = 5;
float execution_time = 6;
string source_name = 7;
string table_name = 8;
}
// Q019
message PrintFreshnessErrorStaleLine {
EventInfo info = 1;
NodeInfo node_info = 2;
int32 index = 3;
int32 total = 4;
float execution_time = 5;
string source_name = 6;
string table_name = 7;
}
// Q020
message PrintFreshnessWarnLine {
EventInfo info = 1;
NodeInfo node_info = 2;
int32 index = 3;
int32 total = 4;
float execution_time = 5;
string source_name = 6;
string table_name = 7;
}
// Skipped Q019, Q020, Q021
// Q021
message PrintFreshnessPassLine {
EventInfo info = 1;
NodeInfo node_info = 2;
int32 index = 3;
int32 total = 4;
float execution_time = 5;
string source_name = 6;
string table_name = 7;
}
// Q022
message PrintCancelLine {
message LogCancelLine {
EventInfo info = 1;
string conn_name = 2;
}
@@ -1261,7 +1321,7 @@ message NodeExecuting {
}
// Q032
message PrintHookStartLine {
message LogHookStartLine {
EventInfo info = 1;
NodeInfo node_info = 2;
string statement = 3;
@@ -1270,7 +1330,7 @@ message PrintHookStartLine {
}
// Q033
message PrintHookEndLine {
message LogHookEndLine {
EventInfo info = 1;
NodeInfo node_info = 2;
string statement = 3;
@@ -1291,7 +1351,10 @@ message SkippingDetails {
int32 total = 7;
}
// Skipped Q035
// Q035
message NothingToDo {
EventInfo info = 1;
}
// Q036
message RunningOperationUncaughtError {
@@ -1308,6 +1371,11 @@ message EndRunResult {
bool success = 5;
}
// Q038
message NoNodesSelected {
EventInfo info = 1;
}
// W - Node testing
// Skipped W001
@@ -1411,7 +1479,7 @@ message TimingInfoCollected {
}
// Z011
message PrintDebugStackTrace {
message LogDebugStackTrace {
EventInfo info = 1;
string exc_info = 2;
}
@@ -1538,7 +1606,7 @@ message EndOfRunSummary {
// Skipped Z031, Z032, Z033
// Z034
message PrintSkipBecauseError {
message LogSkipBecauseError {
EventInfo info = 1;
string schema = 2;
string relation = 3;
@@ -1593,28 +1661,12 @@ message TrackingInitializeFailure {
string exc_info = 2;
}
// Skipped Z045
// Z046
message GeneralWarningMsg {
EventInfo info = 1;
string msg = 2;
string log_fmt = 3;
}
// Z047
message GeneralWarningException {
EventInfo info = 1;
string exc = 2;
string log_fmt = 3;
}
// Z048
// Z045
message EventBufferFull {
EventInfo info = 1;
}
// Z049
// Z046
message RunResultWarningMessage {
EventInfo info = 1;
string msg = 2;

File diff suppressed because it is too large Load Diff

View File

@@ -2,11 +2,9 @@ import builtins
import functools
from typing import NoReturn, Optional, Mapping, Any
from dbt.events.functions import fire_event, scrub_secrets, env_secrets
from dbt.events.types import GeneralWarningMsg, GeneralWarningException
from dbt.events.helpers import env_secrets, scrub_secrets
from dbt.events.types import JinjaLogWarning
from dbt.node_types import NodeType
from dbt import flags
from dbt.ui import line_wrap_message, warning_tag
import dbt.dataclass_schema
@@ -570,74 +568,11 @@ def doc_target_not_found(
raise_compiler_error(msg, model)
def _get_target_failure_msg(
def get_not_found_or_disabled_msg(
original_file_path,
unique_id,
resource_type_title,
target_name: str,
target_model_package: Optional[str],
include_path: bool,
reason: str,
target_kind: str,
) -> str:
target_package_string = ""
if target_model_package is not None:
target_package_string = "in package '{}' ".format(target_model_package)
source_path_string = ""
if include_path:
source_path_string = " ({})".format(original_file_path)
return "{} '{}'{} depends on a {} named '{}' {}which {}".format(
resource_type_title,
unique_id,
source_path_string,
target_kind,
target_name,
target_package_string,
reason,
)
def get_target_not_found_or_disabled_msg(
node,
target_name: str,
target_package: Optional[str],
disabled: Optional[bool] = None,
) -> str:
if disabled is None:
reason = "was not found or is disabled"
elif disabled is True:
reason = "is disabled"
else:
reason = "was not found"
return _get_target_failure_msg(
node.original_file_path,
node.unique_id,
node.resource_type.title(),
target_name,
target_package,
include_path=True,
reason=reason,
target_kind="node",
)
def ref_target_not_found(
model,
target_model_name: str,
target_model_package: Optional[str],
disabled: Optional[bool] = None,
) -> NoReturn:
msg = get_target_not_found_or_disabled_msg(
model, target_model_name, target_model_package, disabled
)
raise_compiler_error(msg, model)
def get_not_found_or_disabled_msg(
node,
target_name: str,
target_kind: str,
target_package: Optional[str] = None,
disabled: Optional[bool] = None,
@@ -648,15 +583,19 @@ def get_not_found_or_disabled_msg(
reason = "is disabled"
else:
reason = "was not found"
return _get_target_failure_msg(
node.original_file_path,
node.unique_id,
node.resource_type.title(),
target_package_string = ""
if target_package is not None:
target_package_string = "in package '{}' ".format(target_package)
return "{} '{}' ({}) depends on a {} named '{}' {}which {}".format(
resource_type_title,
unique_id,
original_file_path,
target_kind,
target_name,
target_package,
include_path=True,
reason=reason,
target_kind=target_kind,
target_package_string,
reason,
)
@@ -668,7 +607,9 @@ def target_not_found(
disabled: Optional[bool] = None,
) -> NoReturn:
msg = get_not_found_or_disabled_msg(
node=node,
original_file_path=node.original_file_path,
unique_id=node.unique_id,
resource_type_title=node.resource_type.title(),
target_name=target_name,
target_kind=target_kind,
target_package=target_package,
@@ -976,9 +917,7 @@ def raise_patch_targets_not_found(patches):
def _fix_dupe_msg(path_1: str, path_2: str, name: str, type_name: str) -> str:
if path_1 == path_2:
return (
f"remove one of the {type_name} entries for {name} in this file:\n" f" - {path_1!s}\n"
)
return f"remove one of the {type_name} entries for {name} in this file:\n - {path_1!s}\n"
else:
return (
f"remove the {type_name} entry for {name} in one of these files:\n"
@@ -1043,19 +982,6 @@ def raise_unrecognized_credentials_type(typename, supported_types):
)
def warn_invalid_patch(patch, resource_type):
msg = line_wrap_message(
f"""\
'{patch.name}' is a {resource_type} node, but it is
specified in the {patch.yaml_key} section of
{patch.original_file_path}.
To fix this error, place the `{patch.name}`
specification under the {resource_type.pluralize()} key instead.
"""
)
warn_or_error(msg, log_fmt=warning_tag("{}"))
def raise_not_implemented(msg):
raise NotImplementedException("ERROR: {}".format(msg))
@@ -1069,24 +995,8 @@ def raise_duplicate_alias(
raise AliasException(f'Got duplicate keys: ({key_names}) all map to "{canonical_key}"')
def warn_or_error(msg, node=None, log_fmt=None):
if flags.WARN_ERROR:
raise_compiler_error(scrub_secrets(msg, env_secrets()), node)
else:
fire_event(GeneralWarningMsg(msg=msg, log_fmt=log_fmt))
def warn_or_raise(exc, log_fmt=None):
if flags.WARN_ERROR:
raise exc
else:
fire_event(GeneralWarningException(exc=str(exc), log_fmt=log_fmt))
def warn(msg, node=None):
# there's no reason to expose log_fmt to macros - it's only useful for
# handling colors
warn_or_error(msg, node=node)
dbt.events.functions.warn_or_error(JinjaLogWarning(msg=msg), node=node)
return ""

View File

@@ -113,6 +113,7 @@ def env_set_path(key: str) -> Optional[Path]:
MACRO_DEBUGGING = env_set_truthy("DBT_MACRO_DEBUGGING")
DEFER_MODE = env_set_truthy("DBT_DEFER_TO_STATE")
FAVOR_STATE_MODE = env_set_truthy("DBT_FAVOR_STATE_STATE")
ARTIFACT_STATE_PATH = env_set_path("DBT_ARTIFACT_STATE_PATH")
ENABLE_LEGACY_LOGGER = env_set_truthy("DBT_ENABLE_LEGACY_LOGGER")

View File

@@ -90,7 +90,7 @@ class Graph:
for node in include_nodes:
if node not in new_graph:
raise ValueError(
"Couldn't find model '{}' -- does it exist or is " "it disabled?".format(node)
"Couldn't find model '{}' -- does it exist or is it disabled?".format(node)
)
return Graph(new_graph)

View File

@@ -5,13 +5,12 @@ from .queue import GraphQueue
from .selector_methods import MethodManager
from .selector_spec import SelectionCriteria, SelectionSpec, IndirectSelection
from dbt.events.functions import fire_event
from dbt.events.types import SelectorReportInvalidSelector
from dbt.events.functions import fire_event, warn_or_error
from dbt.events.types import SelectorReportInvalidSelector, NoNodesForSelectionCriteria
from dbt.node_types import NodeType
from dbt.exceptions import (
InternalException,
InvalidSelectorException,
warn_or_error,
)
from dbt.contracts.graph.compiled import GraphMemberNode
from dbt.contracts.graph.manifest import Manifest
@@ -24,11 +23,6 @@ def get_package_names(nodes):
return set([node.split(".")[1] for node in nodes])
def alert_non_existence(raw_spec, nodes):
if len(nodes) == 0:
warn_or_error(f"The selection criterion '{str(raw_spec)}' does not match" f" any nodes")
def can_select_indirectly(node):
"""If a node is not selected itself, but its parent(s) are, it may qualify
for indirect selection.
@@ -142,8 +136,8 @@ class NodeSelector(MethodManager):
direct_nodes = self.incorporate_indirect_nodes(initial_direct, indirect_nodes)
if spec.expect_exists:
alert_non_existence(spec.raw, direct_nodes)
if spec.expect_exists and len(direct_nodes) == 0:
warn_or_error(NoNodesForSelectionCriteria(spec_raw=str(spec.raw)))
return direct_nodes, indirect_nodes

View File

@@ -286,8 +286,6 @@ class PathSelectorMethod(SelectorMethod):
root = Path.cwd()
paths = set(p.relative_to(root) for p in root.glob(selector))
for node, real_node in self.all_nodes(included_nodes):
if Path(real_node.root_path) != root:
continue
ofp = Path(real_node.original_file_path)
if ofp in paths:
yield node

View File

@@ -78,9 +78,12 @@ class SelectionCriteria:
@classmethod
def default_method(cls, value: str) -> MethodName:
from dbt.parser.languages import get_file_extensions
extensions = tuple(get_file_extensions() + [".csv"])
if _probably_path(value):
return MethodName.Path
elif value.lower().endswith((".sql", ".py", ".csv")):
elif value.lower().endswith(extensions):
return MethodName.File
else:
return MethodName.FQN

View File

@@ -32,7 +32,7 @@ def source(*args, dbt_load_df_function):
{%- set config_dict = {} -%}
{%- for key in model.config.config_keys_used -%}
{# weird type testing with enum, would be much easier to write this logic in Python! #}
{%- if key == 'language' -%}
{%- if key in ('language', 'compiled_language') -%}
{%- set value = 'python' -%}
{%- endif -%}
{%- set value = model.config[key] -%}

File diff suppressed because one or more lines are too long

View File

@@ -1,4 +1,6 @@
import os
from dbt.config.project import Project
from dbt.config.renderer import DbtProjectYamlRenderer
from dbt.contracts.results import RunningStatus, collect_timing_info
from dbt.events.functions import fire_event
from dbt.events.types import NodeCompiling, NodeExecuting
@@ -71,16 +73,22 @@ def get_dbt_config(project_dir, args=None, single_threaded=False):
else:
profiles_dir = flags.DEFAULT_PROFILES_DIR
profile_name = getattr(args, "profile", None)
runtime_args = RuntimeArgs(
project_dir=project_dir,
profiles_dir=profiles_dir,
single_threaded=single_threaded,
profile=getattr(args, "profile", None),
profile=profile_name,
target=getattr(args, "target", None),
)
# Construct a RuntimeConfig from phony args
config = RuntimeConfig.from_args(runtime_args)
profile = RuntimeConfig.collect_profile(args=runtime_args, profile_name=profile_name)
project_renderer = DbtProjectYamlRenderer(profile, None)
project = RuntimeConfig.collect_project(args=runtime_args, project_renderer=project_renderer)
assert type(project) is Project
config = RuntimeConfig.from_parts(project, profile, runtime_args)
# Set global flags from arguments
flags.set_from_args(args, config)

View File

@@ -28,9 +28,7 @@ if sys.platform == "win32" and (not os.getenv("TERM") or os.getenv("TERM") == "N
colorama.init(wrap=True)
STDOUT_LOG_FORMAT = "{record.message}"
DEBUG_LOG_FORMAT = (
"{record.time:%Y-%m-%d %H:%M:%S.%f%z} " "({record.thread_name}): " "{record.message}"
)
DEBUG_LOG_FORMAT = "{record.time:%Y-%m-%d %H:%M:%S.%f%z} ({record.thread_name}): {record.message}"
def get_secret_env() -> List[str]:

View File

@@ -501,6 +501,20 @@ def _add_defer_argument(*subparsers):
)
def _add_favor_state_argument(*subparsers):
for sub in subparsers:
sub.add_optional_argument_inverse(
"--favor-state",
enable_help="""
If set, defer to the state variable for resolving unselected nodes, even if node exist as a database object in the current environment.
""",
disable_help="""
If defer is set, expect standard defer behaviour.
""",
default=flags.FAVOR_STATE_MODE,
)
def _build_run_subparser(subparsers, base_subparser):
run_sub = subparsers.add_parser(
"run",
@@ -1173,6 +1187,8 @@ def parse_args(args, cls=DBTArgumentParser):
_add_selection_arguments(run_sub, compile_sub, generate_sub, test_sub, snapshot_sub, seed_sub)
# --defer
_add_defer_argument(run_sub, test_sub, build_sub, snapshot_sub, compile_sub)
# --favor-state
_add_favor_state_argument(run_sub, test_sub, build_sub, snapshot_sub)
# --full-refresh
_add_table_mutability_arguments(run_sub, compile_sub, build_sub)

View File

@@ -66,5 +66,8 @@ class RunHookType(StrEnum):
class ModelLanguage(StrEnum):
# TODO: how to make this dynamic?
python = "python"
sql = "sql"
ibis = "ibis"
prql = "prql"

View File

@@ -23,6 +23,8 @@ from dbt import hooks
from dbt.node_types import NodeType, ModelLanguage
from dbt.parser.search import FileBlock
from dbt.parser.languages import get_language_providers, get_language_provider_by_name
# internally, the parser may store a less-restrictive type that will be
# transformed into the final type. But it will have to be derived from
# ParsedNode to be operable.
@@ -157,7 +159,7 @@ class ConfiguredParser(
config[key] = [hooks.get_hook_dict(h) for h in config[key]]
def _create_error_node(
self, name: str, path: str, original_file_path: str, raw_code: str, language: str = "sql"
self, name: str, path: str, original_file_path: str, raw_code: str, language: str
) -> UnparsedNode:
"""If we hit an error before we've actually parsed a node, provide some
level of useful information by attaching this to the exception.
@@ -169,7 +171,6 @@ class ConfiguredParser(
resource_type=self.resource_type,
path=path,
original_file_path=original_file_path,
root_path=self.project.project_root,
package_name=self.project.project_name,
raw_code=raw_code,
language=language,
@@ -190,11 +191,14 @@ class ConfiguredParser(
"""
if name is None:
name = block.name
if block.path.relative_path.endswith(".py"):
language = ModelLanguage.python
else:
# this is not ideal but we have a lot of tests to adjust if don't do it
language = ModelLanguage.sql
# this is pretty silly, but we need "sql" to be the default
# even for seeds etc (.csv)
# otherwise this breaks a lot of tests
language = ModelLanguage.sql
for provider in get_language_providers():
if block.path.relative_path.endswith(provider.file_ext()):
language = ModelLanguage[provider.name()]
dct = {
"alias": name,
@@ -202,7 +206,6 @@ class ConfiguredParser(
"database": self.default_database,
"fqn": fqn,
"name": name,
"root_path": self.project.project_root,
"resource_type": self.resource_type,
"path": path,
"original_file_path": block.path.original_file_path,
@@ -225,23 +228,13 @@ class ConfiguredParser(
path=path,
original_file_path=block.path.original_file_path,
raw_code=block.contents,
language=language,
)
raise ParsingException(msg, node=node)
def _context_for(self, parsed_node: IntermediateNode, config: ContextConfig) -> Dict[str, Any]:
return generate_parser_model_context(parsed_node, self.root_project, self.manifest, config)
def render_with_context(self, parsed_node: IntermediateNode, config: ContextConfig):
# Given the parsed node and a ContextConfig to use during parsing,
# render the node's sql with macro capture enabled.
# Note: this mutates the config object when config calls are rendered.
context = self._context_for(parsed_node, config)
# this goes through the process of rendering, but just throws away
# the rendered result. The "macro capture" is the point?
get_rendered(parsed_node.raw_code, context, parsed_node, capture_macros=True)
return context
# This is taking the original config for the node, converting it to a dict,
# updating the config with new config passed in, then re-creating the
# config from the dict in the node.
@@ -347,7 +340,7 @@ class ConfiguredParser(
)
else:
raise InternalException(
f"Got an unexpected project version={config_version}, " f"expected 2"
f"Got an unexpected project version={config_version}, expected 2"
)
def config_dict(
@@ -360,7 +353,10 @@ class ConfiguredParser(
def render_update(self, node: IntermediateNode, config: ContextConfig) -> None:
try:
context = self.render_with_context(node, config)
provider = get_language_provider_by_name(node.language)
provider.validate_raw_code(node)
context = self._context_for(node, config)
context = provider.update_context(node, config, context)
self.update_parsed_node_config(node, config, context=context)
except ValidationError as exc:
# we got a ValidationError - probably bad types in config()
@@ -407,6 +403,18 @@ class SimpleParser(
return node
# TODO: rename these to be more generic (not just SQL)
# The full inheritance order for models is:
# dbt.parser.models.ModelParser,
# dbt.parser.base.SimpleSQLParser,
# dbt.parser.base.SQLParser,
# dbt.parser.base.ConfiguredParser,
# dbt.parser.base.Parser,
# dbt.parser.base.BaseParser,
# These fine-grained class distinctions exist to support other parsers
# e.g. SnapshotParser overrides both 'parse_file' + 'transform'
class SQLParser(
ConfiguredParser[FileBlock, IntermediateNode, FinalNode], Generic[IntermediateNode, FinalNode]
):

View File

@@ -32,7 +32,6 @@ class DocumentationParser(Parser[ParsedDocumentation]):
contents = get_rendered(block.contents, {}).strip()
doc = ParsedDocumentation(
root_path=self.project.project_root,
path=block.file.path.relative_path,
original_file_path=block.path.original_file_path,
package_name=self.project.project_name,

View File

@@ -35,7 +35,6 @@ class GenericTestParser(BaseParser[ParsedGenericTestNode]):
macro_sql=block.full_block,
original_file_path=base_node.original_file_path,
package_name=base_node.package_name,
root_path=base_node.root_path,
resource_type=base_node.resource_type,
name=name,
unique_id=unique_id,
@@ -96,7 +95,6 @@ class GenericTestParser(BaseParser[ParsedGenericTestNode]):
original_file_path=original_file_path,
package_name=self.project.project_name,
raw_code=source_file.contents,
root_path=self.project.project_root,
resource_type=NodeType.Macro,
language="sql",
)

View File

@@ -435,7 +435,7 @@ class TestBuilder(Generic[Testable]):
tags = [tags]
if not isinstance(tags, list):
raise_compiler_error(
f"got {tags} ({type(tags)}) for tags, expected a list of " f"strings"
f"got {tags} ({type(tags)}) for tags, expected a list of strings"
)
for tag in tags:
if not isinstance(tag, str):

View File

@@ -0,0 +1,25 @@
from .provider import LanguageProvider # noqa
from .jinja_sql import JinjaSQLProvider # noqa
from .python import PythonProvider # noqa
# TODO: how to make this discovery/registration pluggable?
from .prql import PrqlProvider # noqa
from .ibis import IbisProvider # noqa
def get_language_providers():
return LanguageProvider.__subclasses__()
def get_language_names():
return [provider.name() for provider in get_language_providers()]
def get_file_extensions():
return [provider.file_ext() for provider in get_language_providers()]
def get_language_provider_by_name(language_name: str) -> LanguageProvider:
return next(
iter(provider for provider in get_language_providers() if provider.name() == language_name)
)

View File

@@ -0,0 +1,116 @@
import ibis
import ast
from dbt.parser.languages.provider import LanguageProvider, dbt_function_calls
from dbt.parser.languages.python import PythonParseVisitor
from dbt.contracts.graph.compiled import ManifestNode
from dbt.exceptions import ParsingException, validator_error_message
from typing import Any, Dict
class IbisProvider(LanguageProvider):
@classmethod
def name(self) -> str:
return "ibis"
@classmethod
def file_ext(self) -> str:
return ".ibis"
@classmethod
def compiled_language(self) -> str:
return "sql"
@classmethod
def validate_raw_code(self, node) -> None:
# don't require the 'model' function for now
pass
@classmethod
def extract_dbt_function_calls(self, node: Any) -> dbt_function_calls:
"""
List all references (refs, sources, configs) in a given block.
"""
try:
tree = ast.parse(node.raw_code, filename=node.original_file_path)
except SyntaxError as exc:
msg = validator_error_message(exc)
raise ParsingException(f"{msg}\n{exc.text}", node=node) from exc
# don't worry about the 'model' function for now
# dbtValidator = PythonValidationVisitor()
# dbtValidator.visit(tree)
# dbtValidator.check_error(node)
dbtParser = PythonParseVisitor(node)
dbtParser.visit(tree)
return dbtParser.dbt_function_calls
@classmethod
def needs_compile_time_connection(self) -> bool:
# TODO: this is technically true, but Ibis won't actually use dbt's connection, it will make its own
return True
@classmethod
def get_compiled_code(self, node: ManifestNode, context: Dict[str, Any]) -> str:
resolved_references = self.get_resolved_references(node, context)
def ref(*args, dbt_load_df_function):
refs = resolved_references["refs"]
key = tuple(args)
return dbt_load_df_function(refs[key])
def source(*args, dbt_load_df_function):
sources = resolved_references["sources"]
key = tuple(args)
return dbt_load_df_function(sources[key])
config_dict = {}
for key in node.config.get("config_keys_used", []):
value = node.config[key]
config_dict.update({key: value})
class config:
def __init__(self, *args, **kwargs):
pass
@staticmethod
def get(key, default=None):
return config_dict.get(key, default)
class this:
"""dbt.this() or dbt.this.identifier"""
database = node.database
schema = node.schema
identifier = node.identifier
def __repr__(self):
return node.relation_name
class dbtObj:
def __init__(self, load_df_function) -> None:
self.source = lambda *args: source(*args, dbt_load_df_function=load_df_function)
self.ref = lambda *args: ref(*args, dbt_load_df_function=load_df_function)
self.config = config
self.this = this()
# self.is_incremental = TODO
# https://ibis-project.org/docs/dev/backends/PostgreSQL/#ibis.backends.postgres.Backend.do_connect
# TODO: this would need to live in the adapter somehow
target = context["target"]
con = ibis.postgres.connect(
database=target["database"],
user=target["user"],
)
# use for dbt.ref(), dbt.source(), etc
dbt = dbtObj(con.table) # noqa
# TODO: this is unsafe in so many ways
exec(node.raw_code)
compiled = str(eval(f"ibis.{context['target']['type']}.compile(model)"))
return compiled

View File

@@ -0,0 +1,34 @@
from dbt.clients import jinja
from dbt.context.context_config import ContextConfig
from dbt.parser.languages.provider import LanguageProvider
from dbt.contracts.graph.compiled import ManifestNode
from typing import Dict, Any
class JinjaSQLProvider(LanguageProvider):
@classmethod
def name(self) -> str:
return "sql"
@classmethod
def update_context(
cls, node: Any, config: ContextConfig, context: Dict[str, Any]
) -> Dict[str, Any]:
# this goes through the process of rendering, but we don't keep the rendered result
# the goal is to capture macros + update context as side effect
jinja.get_rendered(node.raw_code, context, node, capture_macros=True)
return context
@classmethod
def get_compiled_code(self, node: ManifestNode, context: Dict[str, Any]) -> str:
compiled_code = jinja.get_rendered(
node.raw_code,
context,
node,
)
return compiled_code
@classmethod
def needs_compile_time_connection(self) -> bool:
return True

View File

@@ -0,0 +1,97 @@
from __future__ import annotations
from typing import Dict, Tuple, List, Any
import abc
# for type hints
from dbt.contracts.graph.compiled import ManifestNode
from dbt.context.providers import RelationProxy
from dbt.context.context_config import ContextConfig
dbt_function_calls = List[Tuple[str, List[str], Dict[str, Any]]]
references_type = Dict[str, Dict[Tuple[str, ...], RelationProxy]]
class LanguageProvider(metaclass=abc.ABCMeta):
"""
A LanguageProvider is a class that can parse & compile a given language.
"""
@classmethod
def name(self) -> str:
return ""
@classmethod
def file_ext(self) -> str:
return f".{self.name()}"
@classmethod
def compiled_language(self) -> str:
return self.name()
@classmethod
@abc.abstractmethod
# TODO add type hints
def extract_dbt_function_calls(self, node: Any) -> dbt_function_calls:
"""
List all dbt function calls (ref, source, config) and their args/kwargs
"""
raise NotImplementedError("extract_dbt_function_calls")
@classmethod
def validate_raw_code(self, node: Any) -> None:
pass
@classmethod
def update_context(
cls, node: Any, config: ContextConfig, context: Dict[str, Any]
) -> Dict[str, Any]:
dbt_function_calls = cls.extract_dbt_function_calls(node)
config_keys_used = []
for (func, args, kwargs) in dbt_function_calls:
if func == "get":
config_keys_used.append(args[0])
continue
context[func](*args, **kwargs)
if config_keys_used:
# this is being used in macro build_config_dict
context["config"](config_keys_used=config_keys_used)
return context
@classmethod
@abc.abstractmethod
def needs_compile_time_connection(self) -> bool:
"""
Does this modeling language support introspective queries (requiring a database connection)
at compile time?
"""
raise NotImplementedError("needs_compile_time_connection")
@classmethod
def get_resolved_references(
self, node: ManifestNode, context: Dict[str, Any]
) -> references_type:
resolved_references: references_type = {
"sources": {},
"refs": {},
}
# TODO: do we need to support custom 'ref' + 'source' resolution logic for non-JinjaSQL languages?
# (i.e. user-defined 'ref' + 'source' macros)
# this approach will not work for that
refs: List[List[str]] = node.refs
sources: List[List[str]] = node.sources
for ref in refs:
resolved_ref: RelationProxy = context["ref"](*ref)
resolved_references["refs"].update({tuple(ref): resolved_ref})
for source in sources:
resolved_src: RelationProxy = context["source"](*source)
resolved_references["sources"].update({tuple(source): resolved_src})
return resolved_references
@classmethod
@abc.abstractmethod
def get_compiled_code(self, node: ManifestNode, context: Dict[str, Any]) -> str:
"""
For a given ManifestNode, return its compiled code.
"""
raise NotImplementedError("get_compiled_code")

View File

@@ -0,0 +1,170 @@
"""
This will be in the `dbt-prql` package, but including here during inital code review, so
we can test it without coordinating dependencies.
"""
from __future__ import annotations
import logging
import re
from dbt.parser.languages.provider import LanguageProvider, dbt_function_calls, references_type
# import prql_python
# This mocks the prqlc output for two cases which we currently use in tests, so we can
# test this without configuring dependencies. (Obv fix as we expand the tests, way
# before we merge.)
class prql_python: # type: ignore
@staticmethod
def to_sql(prql) -> str:
query_1 = "from employees"
query_1_compiled = """
SELECT
employees.*
FROM
employees
""".strip()
query_2 = """
from (dbt source.whatever.some_tbl)
join (dbt ref.test.foo) [id]
filter salary > 100
""".strip()
# hard coded for Jerco's Postgres database
query_2_resolved = """
from ("jerco"."salesforce"."in_process")
join ("jerco"."dbt_jcohen"."foo") [id]
filter salary > 100
""".strip()
query_2_compiled = """
SELECT
"jerco"."whatever"."some_tbl".*,
"jerco"."dbt_jcohen"."foo".*,
id
FROM
"jerco"."salesforce"."in_process"
JOIN "jerco"."dbt_jcohen"."foo" USING(id)
WHERE
salary > 100
""".strip()
lookup = dict(
{
query_1: query_1_compiled,
query_2: query_2_compiled,
query_2_resolved: query_2_compiled,
}
)
return lookup[prql]
logger = logging.getLogger(__name__)
word_regex = r"[\w\.\-_]+"
# TODO support single-argument form of 'ref'
references_regex = rf"\bdbt `?(\w+)\.({word_regex})\.({word_regex})`?"
def hack_compile(prql: str, references: references_type) -> str:
"""
>>> print(compile(
... "from (dbt source.salesforce.in_process) | join (dbt ref.foo.bar) [id]",
... references=dict(
... sources={('salesforce', 'in_process'): 'salesforce_schema.in_process_tbl'},
... refs={('foo', 'bar'): 'foo_schema.bar_tbl'}
... )
... ))
SELECT
"{{ source('salesforce', 'in_process') }}".*,
"{{ ref('foo', 'bar') }}".*,
id
FROM
{{ source('salesforce', 'in_process') }}
JOIN {{ ref('foo', 'bar') }} USING(id)
"""
subs = []
for k, v in references["sources"].items():
key = ".".join(k)
lookup = f"dbt source.{key}"
subs.append((lookup, str(v)))
for k, v in references["refs"].items():
key = ".".join(k)
lookup = f"dbt ref.{key}"
subs.append((lookup, str(v)))
for lookup, resolved in subs:
prql = prql.replace(lookup, resolved)
sql = prql_python.to_sql(prql)
return sql
def hack_list_references(prql):
"""
List all references (e.g. sources / refs) in a given block.
We need to decide:
— What should prqlc return given `dbt source.foo.bar`, so dbt-prql can find the
references?
 Should it just fill in something that looks like jinja for expediancy? (We
don't support jinja though)
>>> references = list_references("from (dbt source.salesforce.in_process) | join (dbt ref.foo.bar)")
>>> dict(references)
{'source': [('salesforce', 'in_process')], 'ref': [('foo', 'bar')]}
"""
out = []
for t, package, model in _hack_references_of_prql_query(prql):
out.append((t, [package, model], {}))
return out
def _hack_references_of_prql_query(prql) -> list[tuple[str, str, str]]:
"""
List the references in a prql query.
This would be implemented by prqlc.
>>> _hack_references_of_prql_query("from (dbt source.salesforce.in_process) | join (dbt ref.foo.bar)")
[('source', 'salesforce', 'in_process'), ('ref', 'foo', 'bar')]
"""
return re.findall(references_regex, prql)
class PrqlProvider(LanguageProvider):
def __init__(self) -> None:
# TODO: Uncomment when dbt-prql is released
# if not dbt_prql:
# raise ImportError(
# "dbt_prql is required and not found; try running `pip install dbt_prql`"
# )
pass
@classmethod
def name(self) -> str:
return "prql"
@classmethod
def compiled_language(self) -> str:
return "sql"
@classmethod
def extract_dbt_function_calls(self, node) -> dbt_function_calls:
return hack_list_references(node.raw_code)
@classmethod
def needs_compile_time_connection(self) -> bool:
return False
@classmethod
def get_compiled_code(self, node, context) -> str:
resolved_references = self.get_resolved_references(node, context)
return hack_compile(node.raw_code, references=resolved_references)

View File

@@ -0,0 +1,195 @@
import ast
from dbt.parser.languages.provider import LanguageProvider, dbt_function_calls
from dbt.exceptions import UndefinedMacroException, ParsingException, validator_error_message
from dbt.contracts.graph.compiled import ManifestNode
from typing import Dict, Any
dbt_function_key_words = set(["ref", "source", "config", "get"])
dbt_function_full_names = set(["dbt.ref", "dbt.source", "dbt.config", "dbt.config.get"])
class PythonValidationVisitor(ast.NodeVisitor):
def __init__(self):
super().__init__()
self.dbt_errors = []
self.num_model_def = 0
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
if node.name == "model":
self.num_model_def += 1
if node.args.args and not node.args.args[0].arg == "dbt":
self.dbt_errors.append("'dbt' not provided for model as the first argument")
if len(node.args.args) != 2:
self.dbt_errors.append(
"model function should have two args, `dbt` and a session to current warehouse"
)
# check we have a return and only one
if not isinstance(node.body[-1], ast.Return) or isinstance(
node.body[-1].value, ast.Tuple
):
self.dbt_errors.append(
"In current version, model function should return only one dataframe object"
)
def check_error(self, node):
if self.num_model_def != 1:
raise ParsingException("dbt only allow one model defined per python file", node=node)
if len(self.dbt_errors) != 0:
raise ParsingException("\n".join(self.dbt_errors), node=node)
class PythonParseVisitor(ast.NodeVisitor):
def __init__(self, dbt_node):
super().__init__()
self.dbt_node = dbt_node
self.dbt_function_calls = []
self.packages = []
@classmethod
def _flatten_attr(cls, node):
if isinstance(node, ast.Attribute):
return str(cls._flatten_attr(node.value)) + "." + node.attr
elif isinstance(node, ast.Name):
return str(node.id)
else:
pass
def _safe_eval(self, node):
try:
return ast.literal_eval(node)
except (SyntaxError, ValueError, TypeError, MemoryError, RecursionError) as exc:
msg = validator_error_message(
f"Error when trying to literal_eval an arg to dbt.ref(), dbt.source(), dbt.config() or dbt.config.get() \n{exc}\n"
"https://docs.python.org/3/library/ast.html#ast.literal_eval\n"
"In dbt python model, `dbt.ref`, `dbt.source`, `dbt.config`, `dbt.config.get` function args only support Python literal structures"
)
raise ParsingException(msg, node=self.dbt_node) from exc
def _get_call_literals(self, node):
# List of literals
arg_literals = []
kwarg_literals = {}
# TODO : Make sure this throws (and that we catch it)
# for non-literal inputs
for arg in node.args:
rendered = self._safe_eval(arg)
arg_literals.append(rendered)
for keyword in node.keywords:
key = keyword.arg
rendered = self._safe_eval(keyword.value)
kwarg_literals[key] = rendered
return arg_literals, kwarg_literals
def visit_Call(self, node: ast.Call) -> None:
# check weather the current call could be a dbt function call
if isinstance(node.func, ast.Attribute) and node.func.attr in dbt_function_key_words:
func_name = self._flatten_attr(node.func)
# check weather the current call really is a dbt function call
if func_name in dbt_function_full_names:
# drop the dot-dbt prefix
func_name = func_name.split(".")[-1]
args, kwargs = self._get_call_literals(node)
self.dbt_function_calls.append((func_name, args, kwargs))
# no matter what happened above, we should keep visiting the rest of the tree
# visit args and kwargs to see if there's call in it
for obj in node.args + [kwarg.value for kwarg in node.keywords]:
if isinstance(obj, ast.Call):
self.visit_Call(obj)
# support dbt.ref in list args, kwargs
elif isinstance(obj, ast.List) or isinstance(obj, ast.Tuple):
for el in obj.elts:
if isinstance(el, ast.Call):
self.visit_Call(el)
# support dbt.ref in dict args, kwargs
elif isinstance(obj, ast.Dict):
for value in obj.values:
if isinstance(value, ast.Call):
self.visit_Call(value)
# visit node.func.value if we are at an call attr
if isinstance(node.func, ast.Attribute):
self.attribute_helper(node.func)
def attribute_helper(self, node: ast.Attribute) -> None:
while isinstance(node, ast.Attribute):
node = node.value # type: ignore
if isinstance(node, ast.Call):
self.visit_Call(node)
def visit_Import(self, node: ast.Import) -> None:
for n in node.names:
self.packages.append(n.name.split(".")[0])
def visit_ImportFrom(self, node: ast.ImportFrom) -> None:
if node.module:
self.packages.append(node.module.split(".")[0])
class PythonProvider(LanguageProvider):
@classmethod
def name(self) -> str:
return "python"
@classmethod
def file_ext(self) -> str:
return ".py"
@classmethod
def extract_dbt_function_calls(self, node) -> dbt_function_calls:
"""
List all references (refs, sources, configs) in a given block.
"""
try:
tree = ast.parse(node.raw_code, filename=node.original_file_path)
except SyntaxError as exc:
msg = validator_error_message(exc)
raise ParsingException(f"{msg}\n{exc.text}", node=node) from exc
# We are doing a validator and a parser because visit_FunctionDef in parser
# would actually make the parser not doing the visit_Calls any more
dbtValidator = PythonValidationVisitor()
dbtValidator.visit(tree)
dbtValidator.check_error(node)
dbtParser = PythonParseVisitor(node)
dbtParser.visit(tree)
return dbtParser.dbt_function_calls
@classmethod
def validate_raw_code(self, node) -> None:
from dbt.clients.jinja import get_rendered
# TODO: add a test for this
try:
rendered_python = get_rendered(
node.raw_code,
{},
node,
)
if rendered_python != node.raw_code:
raise ParsingException("")
except (UndefinedMacroException, ParsingException):
raise ParsingException("No jinja in python model code is allowed", node=node)
@classmethod
def get_compiled_code(self, node: ManifestNode, context: Dict[str, Any]) -> str:
# needed for compilation - bad!!
from dbt.clients import jinja
postfix = jinja.get_rendered(
"{{ py_script_postfix(model) }}",
context,
node,
)
# we should NOT jinja render the python model's 'raw code'
return f"{node.raw_code}\n\n{postfix}"
@classmethod
def needs_compile_time_connection(self) -> bool:
return False

View File

@@ -41,7 +41,6 @@ class MacroParser(BaseParser[ParsedMacro]):
macro_sql=block.full_block,
original_file_path=base_node.original_file_path,
package_name=base_node.package_name,
root_path=base_node.root_path,
resource_type=base_node.resource_type,
name=name,
unique_id=unique_id,
@@ -103,7 +102,6 @@ class MacroParser(BaseParser[ParsedMacro]):
original_file_path=original_file_path,
package_name=self.project.project_name,
raw_code=source_file.contents,
root_path=self.project.project_root,
resource_type=NodeType.Macro,
language="sql",
)

View File

@@ -18,7 +18,7 @@ from dbt.adapters.factory import (
get_adapter_package_names,
)
from dbt.helper_types import PathSet
from dbt.events.functions import fire_event, get_invocation_id
from dbt.events.functions import fire_event, get_invocation_id, warn_or_error
from dbt.events.types import (
PartialParsingFullReparseBecauseOfError,
PartialParsingExceptionFile,
@@ -35,10 +35,10 @@ from dbt.events.types import (
PartialParsingNotEnabled,
ParsedFileLoadFailed,
PartialParseSaveFileNotFound,
InvalidDisabledSourceInTestNode,
InvalidRefInTestNode,
InvalidDisabledTargetInTestNode,
PartialParsingProjectEnvVarsChanged,
PartialParsingProfileEnvVarsChanged,
NodeNotFoundOrDisabled,
)
from dbt.logger import DbtProcessState
from dbt.node_types import NodeType
@@ -71,11 +71,7 @@ from dbt.contracts.graph.parsed import (
)
from dbt.contracts.util import Writable
from dbt.exceptions import (
ref_target_not_found,
get_target_not_found_or_disabled_msg,
target_not_found,
get_not_found_or_disabled_msg,
warn_or_error,
)
from dbt.parser.base import Parser
from dbt.parser.analysis import AnalysisParser
@@ -90,7 +86,6 @@ from dbt.parser.search import FileBlock
from dbt.parser.seeds import SeedParser
from dbt.parser.snapshots import SnapshotParser
from dbt.parser.sources import SourcePatcher
from dbt.ui import warning_tag
from dbt.version import __version__
from dbt.dataclass_schema import StrEnum, dbtClassMixin
@@ -955,65 +950,43 @@ class ManifestLoader:
self.manifest.rebuild_ref_lookup()
def invalid_ref_fail_unless_test(node, target_model_name, target_model_package, disabled):
def invalid_target_fail_unless_test(
node,
target_name: str,
target_kind: str,
target_package: Optional[str] = None,
disabled: Optional[bool] = None,
):
if node.resource_type == NodeType.Test:
msg = get_target_not_found_or_disabled_msg(
node=node,
target_name=target_model_name,
target_package=target_model_package,
disabled=disabled,
)
if disabled:
fire_event(InvalidRefInTestNode(msg=msg))
fire_event(
InvalidDisabledTargetInTestNode(
resource_type_title=node.resource_type.title(),
unique_id=node.unique_id,
original_file_path=node.original_file_path,
target_kind=target_kind,
target_name=target_name,
target_package=target_package if target_package else "",
)
)
else:
warn_or_error(msg, log_fmt=warning_tag("{}"))
else:
ref_target_not_found(
node,
target_model_name,
target_model_package,
disabled=disabled,
)
def invalid_source_fail_unless_test(node, target_name, target_table_name, disabled):
if node.resource_type == NodeType.Test:
msg = get_not_found_or_disabled_msg(
node=node,
target_name=f"{target_name}.{target_table_name}",
target_kind="source",
disabled=disabled,
)
if disabled:
fire_event(InvalidDisabledSourceInTestNode(msg=msg))
else:
warn_or_error(msg, log_fmt=warning_tag("{}"))
warn_or_error(
NodeNotFoundOrDisabled(
original_file_path=node.original_file_path,
unique_id=node.unique_id,
resource_type_title=node.resource_type.title(),
target_name=target_name,
target_kind=target_kind,
target_package=target_package if target_package else "",
disabled=str(disabled),
)
)
else:
target_not_found(
node=node,
target_name=f"{target_name}.{target_table_name}",
target_kind="source",
disabled=disabled,
)
def invalid_metric_fail_unless_test(node, target_metric_name, target_metric_package, disabled):
if node.resource_type == NodeType.Test:
msg = get_target_not_found_or_disabled_msg(
node=node,
target_name=target_metric_name,
target_package=target_metric_package,
disabled=disabled,
)
warn_or_error(msg, log_fmt=warning_tag("{}"))
else:
target_not_found(
node=node,
target_name=target_metric_name,
target_kind="metric",
target_package=target_metric_package,
target_name=target_name,
target_kind=target_kind,
target_package=target_package,
disabled=disabled,
)
@@ -1121,11 +1094,6 @@ def _process_docs_for_metrics(context: Dict[str, Any], metric: ParsedMetric) ->
metric.description = get_rendered(metric.description, context)
# TODO: this isn't actually referenced anywhere?
def _process_derived_metrics(context: Dict[str, Any], metric: ParsedMetric) -> None:
metric.description = get_rendered(metric.description, context)
def _process_refs_for_exposure(manifest: Manifest, current_project: str, exposure: ParsedExposure):
"""Given a manifest and exposure in that manifest, process its refs"""
for ref in exposure.refs:
@@ -1153,10 +1121,11 @@ def _process_refs_for_exposure(manifest: Manifest, current_project: str, exposur
# This may raise. Even if it doesn't, we don't want to add
# this exposure to the graph b/c there is no destination exposure
exposure.config.enabled = False
invalid_ref_fail_unless_test(
exposure,
target_model_name,
target_model_package,
invalid_target_fail_unless_test(
node=exposure,
target_name=target_model_name,
target_kind="node",
target_package=target_model_package,
disabled=(isinstance(target_model, Disabled)),
)
@@ -1195,13 +1164,13 @@ def _process_refs_for_metric(manifest: Manifest, current_project: str, metric: P
# This may raise. Even if it doesn't, we don't want to add
# this metric to the graph b/c there is no destination metric
metric.config.enabled = False
invalid_ref_fail_unless_test(
metric,
target_model_name,
target_model_package,
invalid_target_fail_unless_test(
node=metric,
target_name=target_model_name,
target_kind="node",
target_package=target_model_package,
disabled=(isinstance(target_model, Disabled)),
)
continue
target_model_id = target_model.unique_id
@@ -1239,13 +1208,13 @@ def _process_metrics_for_node(
# This may raise. Even if it doesn't, we don't want to add
# this node to the graph b/c there is no destination node
node.config.enabled = False
invalid_metric_fail_unless_test(
node,
target_metric_name,
target_metric_package,
invalid_target_fail_unless_test(
node=node,
target_name=target_metric_name,
target_kind="source",
target_package=target_metric_package,
disabled=(isinstance(target_metric, Disabled)),
)
continue
target_metric_id = target_metric.unique_id
@@ -1280,13 +1249,13 @@ def _process_refs_for_node(manifest: Manifest, current_project: str, node: Manif
# This may raise. Even if it doesn't, we don't want to add
# this node to the graph b/c there is no destination node
node.config.enabled = False
invalid_ref_fail_unless_test(
node,
target_model_name,
target_model_package,
invalid_target_fail_unless_test(
node=node,
target_name=target_model_name,
target_kind="node",
target_package=target_model_package,
disabled=(isinstance(target_model, Disabled)),
)
continue
target_model_id = target_model.unique_id
@@ -1312,8 +1281,11 @@ def _process_sources_for_exposure(
)
if target_source is None or isinstance(target_source, Disabled):
exposure.config.enabled = False
invalid_source_fail_unless_test(
exposure, source_name, table_name, disabled=(isinstance(target_source, Disabled))
invalid_target_fail_unless_test(
node=exposure,
target_name=f"{source_name}.{table_name}",
target_kind="source",
disabled=(isinstance(target_source, Disabled)),
)
continue
target_source_id = target_source.unique_id
@@ -1332,8 +1304,11 @@ def _process_sources_for_metric(manifest: Manifest, current_project: str, metric
)
if target_source is None or isinstance(target_source, Disabled):
metric.config.enabled = False
invalid_source_fail_unless_test(
metric, source_name, table_name, disabled=(isinstance(target_source, Disabled))
invalid_target_fail_unless_test(
node=metric,
target_name=f"{source_name}.{table_name}",
target_kind="source",
disabled=(isinstance(target_source, Disabled)),
)
continue
target_source_id = target_source.unique_id
@@ -1354,8 +1329,11 @@ def _process_sources_for_node(manifest: Manifest, current_project: str, node: Ma
if target_source is None or isinstance(target_source, Disabled):
# this folows the same pattern as refs
node.config.enabled = False
invalid_source_fail_unless_test(
node, source_name, table_name, disabled=(isinstance(target_source, Disabled))
invalid_target_fail_unless_test(
node=node,
target_name=f"{source_name}.{table_name}",
target_kind="source",
disabled=(isinstance(target_source, Disabled)),
)
continue
target_source_id = target_source.unique_id

View File

@@ -17,7 +17,6 @@ from dbt.events.types import (
from dbt.node_types import NodeType, ModelLanguage
from dbt.parser.base import SimpleSQLParser
from dbt.parser.search import FileBlock
from dbt.clients.jinja import get_rendered
import dbt.tracking as tracking
from dbt import utils
from dbt_extractor import ExtractionError, py_extract_from_source # type: ignore
@@ -26,156 +25,6 @@ from itertools import chain
import random
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
# New for Python models :p
import ast
from dbt.dataclass_schema import ValidationError
from dbt.exceptions import ParsingException, validator_error_message, UndefinedMacroException
dbt_function_key_words = set(["ref", "source", "config", "get"])
dbt_function_full_names = set(["dbt.ref", "dbt.source", "dbt.config", "dbt.config.get"])
class PythonValidationVisitor(ast.NodeVisitor):
def __init__(self):
super().__init__()
self.dbt_errors = []
self.num_model_def = 0
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
if node.name == "model":
self.num_model_def += 1
if node.args.args and not node.args.args[0].arg == "dbt":
self.dbt_errors.append("'dbt' not provided for model as the first argument")
if len(node.args.args) != 2:
self.dbt_errors.append(
"model function should have two args, `dbt` and a session to current warehouse"
)
# check we have a return and only one
if not isinstance(node.body[-1], ast.Return) or isinstance(
node.body[-1].value, ast.Tuple
):
self.dbt_errors.append(
"In current version, model function should return only one dataframe object"
)
def check_error(self, node):
if self.num_model_def != 1:
raise ParsingException("dbt only allow one model defined per python file", node=node)
if len(self.dbt_errors) != 0:
raise ParsingException("\n".join(self.dbt_errors), node=node)
class PythonParseVisitor(ast.NodeVisitor):
def __init__(self, dbt_node):
super().__init__()
self.dbt_node = dbt_node
self.dbt_function_calls = []
self.packages = []
@classmethod
def _flatten_attr(cls, node):
if isinstance(node, ast.Attribute):
return str(cls._flatten_attr(node.value)) + "." + node.attr
elif isinstance(node, ast.Name):
return str(node.id)
else:
pass
def _safe_eval(self, node):
try:
return ast.literal_eval(node)
except (SyntaxError, ValueError, TypeError, MemoryError, RecursionError) as exc:
msg = validator_error_message(
f"Error when trying to literal_eval an arg to dbt.ref(), dbt.source(), dbt.config() or dbt.config.get() \n{exc}\n"
"https://docs.python.org/3/library/ast.html#ast.literal_eval\n"
"In dbt python model, `dbt.ref`, `dbt.source`, `dbt.config`, `dbt.config.get` function args only support Python literal structures"
)
raise ParsingException(msg, node=self.dbt_node) from exc
def _get_call_literals(self, node):
# List of literals
arg_literals = []
kwarg_literals = {}
# TODO : Make sure this throws (and that we catch it)
# for non-literal inputs
for arg in node.args:
rendered = self._safe_eval(arg)
arg_literals.append(rendered)
for keyword in node.keywords:
key = keyword.arg
rendered = self._safe_eval(keyword.value)
kwarg_literals[key] = rendered
return arg_literals, kwarg_literals
def visit_Call(self, node: ast.Call) -> None:
# check weather the current call could be a dbt function call
if isinstance(node.func, ast.Attribute) and node.func.attr in dbt_function_key_words:
func_name = self._flatten_attr(node.func)
# check weather the current call really is a dbt function call
if func_name in dbt_function_full_names:
# drop the dot-dbt prefix
func_name = func_name.split(".")[-1]
args, kwargs = self._get_call_literals(node)
self.dbt_function_calls.append((func_name, args, kwargs))
# no matter what happened above, we should keep visiting the rest of the tree
# visit args and kwargs to see if there's call in it
for obj in node.args + [kwarg.value for kwarg in node.keywords]:
if isinstance(obj, ast.Call):
self.visit_Call(obj)
# support dbt.ref in list args, kwargs
elif isinstance(obj, ast.List) or isinstance(obj, ast.Tuple):
for el in obj.elts:
if isinstance(el, ast.Call):
self.visit_Call(el)
# support dbt.ref in dict args, kwargs
elif isinstance(obj, ast.Dict):
for value in obj.values:
if isinstance(value, ast.Call):
self.visit_Call(value)
# visit node.func.value if we are at an call attr
if isinstance(node.func, ast.Attribute):
self.attribute_helper(node.func)
def attribute_helper(self, node: ast.Attribute) -> None:
while isinstance(node, ast.Attribute):
node = node.value # type: ignore
if isinstance(node, ast.Call):
self.visit_Call(node)
def visit_Import(self, node: ast.Import) -> None:
for n in node.names:
self.packages.append(n.name.split(".")[0])
def visit_ImportFrom(self, node: ast.ImportFrom) -> None:
if node.module:
self.packages.append(node.module.split(".")[0])
def merge_packages(original_packages_with_version, new_packages):
original_packages = [package.split("==")[0] for package in original_packages_with_version]
additional_packages = [package for package in new_packages if package not in original_packages]
return original_packages_with_version + list(set(additional_packages))
def verify_python_model_code(node):
# TODO: add a test for this
try:
rendered_python = get_rendered(
node.raw_code,
{},
node,
)
if rendered_python != node.raw_code:
raise ParsingException("")
except (UndefinedMacroException, ParsingException):
raise ParsingException("No jinja in python model code is allowed", node=node)
class ModelParser(SimpleSQLParser[ParsedModelNode]):
def parse_from_dict(self, dct, validate=True) -> ParsedModelNode:
@@ -191,49 +40,16 @@ class ModelParser(SimpleSQLParser[ParsedModelNode]):
def get_compiled_path(cls, block: FileBlock):
return block.path.relative_path
def parse_python_model(self, node, config, context):
try:
tree = ast.parse(node.raw_code, filename=node.original_file_path)
except SyntaxError as exc:
msg = validator_error_message(exc)
raise ParsingException(f"{msg}\n{exc.text}", node=node) from exc
# We are doing a validator and a parser because visit_FunctionDef in parser
# would actually make the parser not doing the visit_Calls any more
dbtValidator = PythonValidationVisitor()
dbtValidator.visit(tree)
dbtValidator.check_error(node)
dbtParser = PythonParseVisitor(node)
dbtParser.visit(tree)
config_keys_used = []
for (func, args, kwargs) in dbtParser.dbt_function_calls:
if func == "get":
config_keys_used.append(args[0])
continue
context[func](*args, **kwargs)
if config_keys_used:
# this is being used in macro build_config_dict
context["config"](config_keys_used=config_keys_used)
def render_update(self, node: ParsedModelNode, config: ContextConfig) -> None:
# TODO
if node.language != ModelLanguage.sql:
super().render_update(node, config)
# TODO move all the logic below into JinjaSQL provider
self.manifest._parsing_info.static_analysis_path_count += 1
if node.language == ModelLanguage.python:
try:
verify_python_model_code(node)
context = self._context_for(node, config)
self.parse_python_model(node, config, context)
self.update_parsed_node_config(node, config, context=context)
except ValidationError as exc:
# we got a ValidationError - probably bad types in config()
msg = validator_error_message(exc)
raise ParsingException(msg, node=node) from exc
return
elif not flags.STATIC_PARSER:
if not flags.STATIC_PARSER:
# jinja rendering
super().render_update(node, config)
fire_event(StaticParserCausedJinjaRendering(path=node.path))

View File

@@ -171,11 +171,15 @@ def read_files(project, files, parser_files, saved_files):
dbt_ignore_spec,
)
from dbt.parser.languages import get_file_extensions
model_extensions = get_file_extensions()
project_files["ModelParser"] = read_files_for_parser(
project,
files,
project.model_paths,
[".sql", ".py"],
model_extensions,
ParseFileType.Model,
saved_files,
dbt_ignore_spec,

View File

@@ -50,7 +50,6 @@ from dbt.contracts.graph.unparsed import (
UnparsedSourceDefinition,
)
from dbt.exceptions import (
warn_invalid_patch,
validator_error_message,
JSONValidationException,
raise_invalid_property_yml_version,
@@ -60,9 +59,10 @@ from dbt.exceptions import (
raise_duplicate_macro_patch_name,
InternalException,
raise_duplicate_source_patch_name,
warn_or_error,
CompilationException,
)
from dbt.events.functions import warn_or_error
from dbt.events.types import WrongResourceSchemaFile, NoNodeForYamlKey, MacroPatchNotFound
from dbt.node_types import NodeType
from dbt.parser.base import SimpleParser
from dbt.parser.search import FileBlock
@@ -74,7 +74,6 @@ from dbt.parser.generic_test_builders import (
TestBlock,
Testable,
)
from dbt.ui import warning_tag
from dbt.utils import get_pseudo_test_path, coerce_dict_str
@@ -245,7 +244,6 @@ class SchemaParser(SimpleParser[GenericTestBlock, ParsedGenericTestNode]):
"database": self.default_database,
"fqn": fqn,
"name": name,
"root_path": self.project.project_root,
"resource_type": self.resource_type,
"tags": tags,
"path": path,
@@ -272,6 +270,7 @@ class SchemaParser(SimpleParser[GenericTestBlock, ParsedGenericTestNode]):
path=path,
original_file_path=target.original_file_path,
raw_code=raw_code,
language="sql",
)
raise ParsingException(msg, node=node) from exc
@@ -298,7 +297,7 @@ class SchemaParser(SimpleParser[GenericTestBlock, ParsedGenericTestNode]):
except ParsingException as exc:
context = _trimmed(str(target))
msg = "Invalid test config given in {}:" "\n\t{}\n\t@: {}".format(
msg = "Invalid test config given in {}:\n\t{}\n\t@: {}".format(
target.original_file_path, exc.msg, context
)
raise ParsingException(msg) from exc
@@ -729,7 +728,6 @@ class SourceParser(YamlDocsReader):
table=table,
path=original_file_path,
original_file_path=original_file_path,
root_path=self.project.project_root,
package_name=package_name,
unique_id=unique_id,
resource_type=NodeType.Source,
@@ -873,7 +871,15 @@ class NodePatchParser(NonSourceParser[NodeTarget, ParsedNodePatch], Generic[Node
if unique_id:
resource_type = NodeType(unique_id.split(".")[0])
if resource_type.pluralize() != patch.yaml_key:
warn_invalid_patch(patch, resource_type)
warn_or_error(
WrongResourceSchemaFile(
patch_name=patch.name,
resource_type=resource_type,
plural_resource_type=resource_type.pluralize(),
yaml_key=patch.yaml_key,
file_path=patch.original_file_path,
)
)
return
elif patch.yaml_key == "analyses":
@@ -912,12 +918,13 @@ class NodePatchParser(NonSourceParser[NodeTarget, ParsedNodePatch], Generic[Node
node.patch(patch)
else:
msg = (
f"Did not find matching node for patch with name '{patch.name}' "
f"in the '{patch.yaml_key}' section of "
f"file '{source_file.path.original_file_path}'"
warn_or_error(
NoNodeForYamlKey(
patch_name=patch.name,
yaml_key=patch.yaml_key,
file_path=source_file.path.original_file_path,
)
)
warn_or_error(msg, log_fmt=warning_tag("{}"))
return
# patches can't be overwritten
@@ -977,8 +984,7 @@ class MacroPatchParser(NonSourceParser[UnparsedMacroUpdate, ParsedMacroPatch]):
unique_id = f"macro.{patch.package_name}.{patch.name}"
macro = self.manifest.macros.get(unique_id)
if not macro:
msg = f'Found patch for macro "{patch.name}" ' f"which was not found"
warn_or_error(msg, log_fmt=warning_tag("{}"))
warn_or_error(MacroPatchNotFound(patch_name=patch.name))
return
if macro.patch_path:
package_name, existing_file_path = macro.patch_path.split("://")
@@ -1024,7 +1030,6 @@ class ExposureParser(YamlReader):
parsed = ParsedExposure(
package_name=package_name,
root_path=self.project.project_root,
path=path,
original_file_path=self.yaml.path.original_file_path,
unique_id=unique_id,
@@ -1128,7 +1133,6 @@ class MetricParser(YamlReader):
parsed = ParsedMetric(
package_name=package_name,
root_path=self.project.project_root,
path=path,
original_file_path=self.yaml.path.original_file_path,
unique_id=unique_id,

View File

@@ -7,6 +7,8 @@ from dbt.parser.search import FileBlock
class SeedParser(SimpleSQLParser[ParsedSeedNode]):
def parse_from_dict(self, dct, validate=True) -> ParsedSeedNode:
# seeds need the root_path because the contents are not loaded
dct["root_path"] = self.project.project_root
if validate:
ParsedSeedNode.validate(dct)
return ParsedSeedNode.from_dict(dct)

View File

@@ -1,6 +1,6 @@
import itertools
from pathlib import Path
from typing import Iterable, Dict, Optional, Set, Any
from typing import Iterable, Dict, Optional, Set, Any, List
from dbt.adapters.factory import get_adapter
from dbt.config import RuntimeConfig
from dbt.context.context_config import (
@@ -24,11 +24,12 @@ from dbt.contracts.graph.unparsed import (
UnparsedColumn,
Time,
)
from dbt.exceptions import warn_or_error, InternalException
from dbt.events.functions import warn_or_error
from dbt.events.types import UnusedTables
from dbt.exceptions import InternalException
from dbt.node_types import NodeType
from dbt.parser.schemas import SchemaParser, ParserRef
from dbt import ui
# An UnparsedSourceDefinition is taken directly from the yaml
@@ -150,7 +151,7 @@ class SourcePatcher:
if not isinstance(config, SourceConfig):
raise InternalException(
f"Calculated a {type(config)} for a source, but expected " f"a SourceConfig"
f"Calculated a {type(config)} for a source, but expected a SourceConfig"
)
default_database = self.root_project.credentials.database
@@ -160,7 +161,6 @@ class SourcePatcher:
database=(source.database or default_database),
schema=(source.schema or source.name),
identifier=(table.identifier or table.name),
root_path=target.root_path,
path=target.path,
original_file_path=target.original_file_path,
columns=refs.column_info,
@@ -307,28 +307,27 @@ class SourcePatcher:
unused_tables[key] = unused
if unused_tables:
msg = self.get_unused_msg(unused_tables)
warn_or_error(msg, log_fmt=ui.warning_tag("{}"))
unused_tables_formatted = self.get_unused_msg(unused_tables)
warn_or_error(UnusedTables(unused_tables=unused_tables_formatted))
self.manifest.source_patches = {}
def get_unused_msg(
self,
unused_tables: Dict[SourceKey, Optional[Set[str]]],
) -> str:
msg = [
"During parsing, dbt encountered source overrides that had no " "target:",
]
) -> List:
unused_tables_formatted = []
for key, table_names in unused_tables.items():
patch = self.manifest.source_patches[key]
patch_name = f"{patch.overrides}.{patch.name}"
if table_names is None:
msg.append(f" - Source {patch_name} (in {patch.path})")
unused_tables_formatted.append(f" - Source {patch_name} (in {patch.path})")
else:
for table_name in sorted(table_names):
msg.append(f" - Source table {patch_name}.{table_name} " f"(in {patch.path})")
msg.append("")
return "\n".join(msg)
unused_tables_formatted.append(
f" - Source table {patch_name}.{table_name} " f"(in {patch.path})"
)
return unused_tables_formatted
def merge_freshness_time_thresholds(

View File

@@ -56,7 +56,6 @@ class SqlMacroParser(MacroParser):
package_name=self.project.project_name,
raw_code=contents,
language="sql",
root_path=self.project.project_root,
resource_type=NodeType.Macro,
)
for node in self.parse_unparsed_macros(base):

View File

@@ -37,9 +37,9 @@ from dbt.events.types import (
InternalExceptionOnRun,
GenericExceptionOnRun,
NodeConnectionReleaseError,
PrintDebugStackTrace,
LogDebugStackTrace,
SkippingDetails,
PrintSkipBecauseError,
LogSkipBecauseError,
NodeCompiling,
NodeExecuting,
)
@@ -309,9 +309,16 @@ class BaseRunner(metaclass=ABCMeta):
failures=None,
)
# some modeling languages don't need database connections for compilation,
# only for runtime (materialization)
def needs_connection(self):
return True
def compile_and_execute(self, manifest, ctx):
from contextlib import nullcontext
result = None
with self.adapter.connection_for(self.node):
with self.adapter.connection_for(self.node) if self.needs_connection() else nullcontext():
ctx.node._event_status["node_status"] = RunningStatus.Compiling
fire_event(
NodeCompiling(
@@ -362,7 +369,7 @@ class BaseRunner(metaclass=ABCMeta):
exc=str(e),
)
)
fire_event(PrintDebugStackTrace(exc_info=traceback.format_exc()))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
return str(e)
@@ -451,7 +458,7 @@ class BaseRunner(metaclass=ABCMeta):
# failure, print a special 'error skip' message.
if self._skip_caused_by_ephemeral_failure():
fire_event(
PrintSkipBecauseError(
LogSkipBecauseError(
schema=schema_name,
relation=node_name,
index=self.node_index,
@@ -461,7 +468,7 @@ class BaseRunner(metaclass=ABCMeta):
print_run_result_error(result=self.skip_cause, newline=False)
if self.skip_cause is None: # mypy appeasement
raise InternalException(
"Skip cause not set but skip was somehow caused by " "an ephemeral failure"
"Skip cause not set but skip was somehow caused by an ephemeral failure"
)
# set an error so dbt will exit with an error code
error_message = (

View File

@@ -20,6 +20,12 @@ class CompileRunner(BaseRunner):
def after_execute(self, result):
pass
def needs_connection(self):
from dbt.parser.languages import get_language_provider_by_name
provider = get_language_provider_by_name(self.node.language)
return provider.needs_compile_time_connection()
def execute(self, compiled_node, manifest):
return RunResult(
node=compiled_node,
@@ -64,7 +70,7 @@ class CompileTask(GraphRunnableTask):
state = self.previous_state
if state is None:
raise RuntimeException(
"Received a --defer argument, but no value was provided " "to --state"
"Received a --defer argument, but no value was provided to --state"
)
if state.manifest is None:
@@ -77,7 +83,7 @@ class CompileTask(GraphRunnableTask):
return
if self.manifest is None:
raise InternalException(
"Expected to defer to manifest, but there is no runtime " "manifest to defer from!"
"Expected to defer to manifest, but there is no runtime manifest to defer from!"
)
self.manifest.merge_from_artifact(
adapter=adapter,

View File

@@ -1,3 +1,5 @@
from typing import Optional
import dbt.utils
import dbt.deprecations
import dbt.exceptions
@@ -6,7 +8,9 @@ from dbt.config import UnsetProfileConfig
from dbt.config.renderer import DbtProjectYamlRenderer
from dbt.deps.base import downloads_directory
from dbt.deps.resolver import resolve_packages
from dbt.deps.registry import RegistryPinnedPackage
from dbt.events.proto_types import ListOfStrings
from dbt.events.functions import fire_event
from dbt.events.types import (
DepsNoPackagesFound,
@@ -29,7 +33,9 @@ class DepsTask(BaseTask):
def __init__(self, args, config: UnsetProfileConfig):
super().__init__(args=args, config=config)
def track_package_install(self, package_name: str, source_type: str, version: str) -> None:
def track_package_install(
self, package_name: str, source_type: str, version: Optional[str]
) -> None:
# Hub packages do not need to be hashed, as they are public
# Use the string 'local' for local package versions
if source_type == "local":
@@ -45,7 +51,7 @@ class DepsTask(BaseTask):
{"name": package_name, "source": source_type, "version": version},
)
def run(self):
def run(self) -> None:
system.make_directory(self.config.packages_install_path)
packages = self.config.packages.packages
if not packages:
@@ -66,7 +72,7 @@ class DepsTask(BaseTask):
fire_event(DepsStartPackageInstall(package_name=package_name))
package.install(self.config, renderer)
fire_event(DepsInstallInfo(version_name=package.nice_version_name()))
if source_type == "hub":
if isinstance(package, RegistryPinnedPackage):
version_latest = package.get_version_latest()
if version_latest != version:
packages_to_upgrade.append(package_name)
@@ -81,7 +87,7 @@ class DepsTask(BaseTask):
)
if packages_to_upgrade:
fire_event(EmptyLine())
fire_event(DepsNotifyUpdatesAvailable(packages=packages_to_upgrade))
fire_event(DepsNotifyUpdatesAvailable(packages=ListOfStrings(packages_to_upgrade)))
@classmethod
def from_args(cls, args):

View File

@@ -16,14 +16,11 @@ from dbt.contracts.results import (
FreshnessStatus,
)
from dbt.exceptions import RuntimeException, InternalException
from dbt.events.functions import fire_event
from dbt.events.functions import fire_event, info
from dbt.events.types import (
FreshnessCheckComplete,
PrintStartLine,
PrintFreshnessErrorLine,
PrintFreshnessErrorStaleLine,
PrintFreshnessWarnLine,
PrintFreshnessPassLine,
LogStartLine,
LogFreshnessResult,
)
from dbt.node_types import NodeType
@@ -41,7 +38,7 @@ class FreshnessRunner(BaseRunner):
def before_execute(self):
description = "freshness of {0.source_name}.{0.name}".format(self.node)
fire_event(
PrintStartLine(
LogStartLine(
description=description,
index=self.node_index,
total=self.num_nodes,
@@ -56,50 +53,19 @@ class FreshnessRunner(BaseRunner):
else:
source_name = result.source_name
table_name = result.table_name
if result.status == FreshnessStatus.RuntimeErr:
fire_event(
PrintFreshnessErrorLine(
source_name=source_name,
table_name=table_name,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
)
)
elif result.status == FreshnessStatus.Error:
fire_event(
PrintFreshnessErrorStaleLine(
source_name=source_name,
table_name=table_name,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
)
)
elif result.status == FreshnessStatus.Warn:
fire_event(
PrintFreshnessWarnLine(
source_name=source_name,
table_name=table_name,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
)
)
else:
fire_event(
PrintFreshnessPassLine(
source_name=source_name,
table_name=table_name,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
)
level = LogFreshnessResult.status_to_level(str(result.status))
fire_event(
LogFreshnessResult(
info=info(level=level),
status=result.status,
source_name=source_name,
table_name=table_name,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
)
)
def error_result(self, node, message, start_time, timing_info):
return self._build_run_result(
@@ -135,7 +101,7 @@ class FreshnessRunner(BaseRunner):
# broken, raise!
if compiled_node.loaded_at_field is None:
raise InternalException(
"Got to execute for source freshness of a source that has no " "loaded_at_field!"
"Got to execute for source freshness of a source that has no loaded_at_field!"
)
relation = self.adapter.Relation.create_from_source(compiled_node)

View File

@@ -5,7 +5,9 @@ from dbt.graph import ResourceTypeSelector
from dbt.task.runnable import GraphRunnableTask, ManifestTask
from dbt.task.test import TestSelector
from dbt.node_types import NodeType
from dbt.exceptions import RuntimeException, InternalException, warn_or_error
from dbt.events.functions import warn_or_error
from dbt.events.types import NoNodesSelected
from dbt.exceptions import RuntimeException, InternalException
from dbt.logger import log_manager
import logging
import dbt.events.functions as event_logger
@@ -69,7 +71,7 @@ class ListTask(GraphRunnableTask):
spec = self.get_selection_spec()
nodes = sorted(selector.get_selected(spec))
if not nodes:
warn_or_error("No nodes selected!")
warn_or_error(NoNodesSelected())
return
if self.manifest is None:
raise InternalException("manifest is None in _iterate_selected_nodes")

View File

@@ -120,6 +120,8 @@ def print_run_result_error(result, newline: bool = True, is_warning: bool = Fals
elif result.message is not None:
first = True
for line in result.message.split("\n"):
# TODO: why do we format like this? Is there a reason this needs to
# be split instead of sending it as a single log line?
if first:
fire_event(FirstRunResultError(msg=line))
first = False

View File

@@ -20,7 +20,7 @@ from dbt.context.providers import generate_runtime_model_context
from dbt.contracts.graph.compiled import CompileResultNode
from dbt.contracts.graph.model_config import Hook
from dbt.contracts.graph.parsed import ParsedHookNode
from dbt.contracts.results import NodeStatus, RunResult, RunStatus, RunningStatus
from dbt.contracts.results import NodeStatus, RunResult, RunStatus, RunningStatus, BaseResult
from dbt.exceptions import (
CompilationException,
InternalException,
@@ -28,17 +28,16 @@ from dbt.exceptions import (
ValidationException,
missing_materialization,
)
from dbt.events.functions import fire_event, get_invocation_id
from dbt.events.functions import fire_event, get_invocation_id, info
from dbt.events.types import (
DatabaseErrorRunningHook,
EmptyLine,
HooksRunning,
HookFinished,
PrintModelErrorResultLine,
PrintModelResultLine,
PrintStartLine,
PrintHookEndLine,
PrintHookStartLine,
LogModelResult,
LogStartLine,
LogHookEndLine,
LogHookStartLine,
)
from dbt.logger import (
TextOnly,
@@ -160,6 +159,9 @@ def _validate_materialization_relations_dict(inp: Dict[Any, Any], model) -> List
class ModelRunner(CompileRunner):
def needs_connection(self):
return True
def get_node_representation(self):
display_quote_policy = {"database": False, "schema": False, "identifier": False}
relation = self.adapter.Relation.create_from(
@@ -176,7 +178,7 @@ class ModelRunner(CompileRunner):
def print_start_line(self):
fire_event(
PrintStartLine(
LogStartLine(
description=self.describe_node(),
index=self.node_index,
total=self.num_nodes,
@@ -187,27 +189,22 @@ class ModelRunner(CompileRunner):
def print_result_line(self, result):
description = self.describe_node()
if result.status == NodeStatus.Error:
fire_event(
PrintModelErrorResultLine(
description=description,
status=result.status,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
)
)
status = result.status
level = "error"
else:
fire_event(
PrintModelResultLine(
description=description,
status=result.message,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
)
status = result.message
level = "info"
fire_event(
LogModelResult(
description=description,
status=status,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
info=info(level=level),
)
)
def before_execute(self):
self.print_start_line()
@@ -268,12 +265,12 @@ class ModelRunner(CompileRunner):
context_config = context["config"]
mat_has_supported_langs = hasattr(materialization_macro, "supported_languages")
model_lang_supported = model.language in materialization_macro.supported_languages
model_lang_supported = model.compiled_language in materialization_macro.supported_languages
if mat_has_supported_langs and not model_lang_supported:
str_langs = [str(lang) for lang in materialization_macro.supported_languages]
raise ValidationException(
f'Materialization "{materialization_macro.name}" only supports languages {str_langs}; '
f'got "{model.language}"'
f'got "{model.language}" which compiles to "{model.compiled_language}"'
)
hook_ctx = self.adapter.pre_model_hook(context_config)
@@ -355,7 +352,7 @@ class RunTask(CompileTask):
with UniqueID(hook.unique_id):
with hook_meta_ctx, startctx:
fire_event(
PrintHookStartLine(
LogHookStartLine(
statement=hook_text,
index=idx,
total=num_hooks,
@@ -375,7 +372,7 @@ class RunTask(CompileTask):
with finishctx, DbtModelState({"node_status": "passed"}):
hook._event_status["node_status"] = RunStatus.Success
fire_event(
PrintHookEndLine(
LogHookEndLine(
statement=hook_text,
status=status,
index=idx,
@@ -400,12 +397,22 @@ class RunTask(CompileTask):
) -> None:
try:
self.run_hooks(adapter, hook_type, extra_context)
except RuntimeException:
except RuntimeException as exc:
fire_event(DatabaseErrorRunningHook(hook_type=hook_type.value))
raise
self.node_results.append(
BaseResult(
status=RunStatus.Error,
thread_id="main",
timing=[],
message=f"{hook_type.value} failed, error:\n {exc.msg}",
adapter_response=exc.msg,
execution_time=0,
failures=1,
)
)
def print_results_line(self, results, execution_time):
nodes = [r.node for r in results] + self.ran_hooks
nodes = [r.node for r in results if hasattr(r, "node")] + self.ran_hooks
stat_line = get_counts(nodes)
execution = ""
@@ -450,9 +457,6 @@ class RunTask(CompileTask):
with adapter.connection_named("master"):
self.safe_run_hooks(adapter, RunHookType.End, extras)
def after_hooks(self, adapter, results, elapsed):
self.print_results_line(results, elapsed)
def get_node_selector(self) -> ResourceTypeSelector:
if self.manifest is None or self.graph is None:
raise InternalException("manifest and graph must be set to get perform node selection")

View File

@@ -15,7 +15,7 @@ from dbt.events.functions import fire_event
from dbt.events.types import (
RunningOperationCaughtError,
RunningOperationUncaughtError,
PrintDebugStackTrace,
LogDebugStackTrace,
)
@@ -57,11 +57,11 @@ class RunOperationTask(ManifestTask):
self._run_unsafe()
except dbt.exceptions.Exception as exc:
fire_event(RunningOperationCaughtError(exc=str(exc)))
fire_event(PrintDebugStackTrace(exc_info=traceback.format_exc()))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
success = False
except Exception as exc:
fire_event(RunningOperationUncaughtError(exc=str(exc)))
fire_event(PrintDebugStackTrace(exc_info=traceback.format_exc()))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
success = False
else:
success = True

View File

@@ -26,16 +26,17 @@ from dbt.logger import (
ModelMetadata,
NodeCount,
)
from dbt.events.functions import fire_event
from dbt.events.functions import fire_event, warn_or_error
from dbt.events.types import (
EmptyLine,
PrintCancelLine,
LogCancelLine,
DefaultSelector,
NodeStart,
NodeFinished,
QueryCancelationUnsupported,
ConcurrencyLine,
EndRunResult,
NothingToDo,
)
from dbt.contracts.graph.compiled import CompileResultNode
from dbt.contracts.graph.manifest import Manifest
@@ -47,7 +48,6 @@ from dbt.exceptions import (
NotImplementedException,
RuntimeException,
FailFastException,
warn_or_error,
)
from dbt.graph import GraphQueue, NodeSelector, SelectionSpec, parse_difference, Graph
@@ -57,7 +57,6 @@ import dbt.tracking
import dbt.exceptions
from dbt import flags
import dbt.utils
from dbt.ui import warning_tag
RESULT_FILE_NAME = "run_results.json"
MANIFEST_FILE_NAME = "manifest.json"
@@ -174,7 +173,7 @@ class GraphRunnableTask(ManifestTask):
self._flattened_nodes.append(self.manifest.sources[uid])
else:
raise InternalException(
f"Node selection returned {uid}, expected a node or a " f"source"
f"Node selection returned {uid}, expected a node or a source"
)
self.num_nodes = len([n for n in self._flattened_nodes if not n.is_ephemeral_model])
@@ -364,7 +363,7 @@ class GraphRunnableTask(ManifestTask):
continue
# if we don't have a manifest/don't have a node, print
# anyway.
fire_event(PrintCancelLine(conn_name=conn_name))
fire_event(LogCancelLine(conn_name=conn_name))
pool.join()
@@ -414,9 +413,6 @@ class GraphRunnableTask(ManifestTask):
{"adapter_cache_construction_elapsed": cache_populate_time}
)
def before_hooks(self, adapter):
pass
def before_run(self, adapter, selected_uids: AbstractSet[str]):
with adapter.connection_named("master"):
self.populate_adapter_cache(adapter)
@@ -424,24 +420,24 @@ class GraphRunnableTask(ManifestTask):
def after_run(self, adapter, results):
pass
def after_hooks(self, adapter, results, elapsed):
def print_results_line(self, node_results, elapsed):
pass
def execute_with_hooks(self, selected_uids: AbstractSet[str]):
adapter = get_adapter(self.config)
started = time.time()
try:
self.before_hooks(adapter)
started = time.time()
self.before_run(adapter, selected_uids)
res = self.execute_nodes()
self.after_run(adapter, res)
elapsed = time.time() - started
self.after_hooks(adapter, res, elapsed)
finally:
adapter.cleanup_connections()
elapsed = time.time() - started
self.print_results_line(self.node_results, elapsed)
result = self.get_result(
results=self.node_results, elapsed_time=elapsed, generated_at=datetime.utcnow()
)
result = self.get_result(results=res, elapsed_time=elapsed, generated_at=datetime.utcnow())
return result
def write_result(self, result):
@@ -459,8 +455,7 @@ class GraphRunnableTask(ManifestTask):
if len(self._flattened_nodes) == 0:
with TextOnly():
fire_event(EmptyLine())
msg = "Nothing to do. Try checking your model " "configs and model specification args"
warn_or_error(msg, log_fmt=warning_tag("{}"))
warn_or_error(NothingToDo())
result = self.get_result(
results=[],
generated_at=datetime.utcnow(),

View File

@@ -9,14 +9,13 @@ from dbt.contracts.results import RunStatus
from dbt.exceptions import InternalException
from dbt.graph import ResourceTypeSelector
from dbt.logger import TextOnly
from dbt.events.functions import fire_event
from dbt.events.functions import fire_event, info
from dbt.events.types import (
SeedHeader,
SeedHeaderSeparator,
EmptyLine,
PrintSeedErrorResultLine,
PrintSeedResultLine,
PrintStartLine,
LogSeedResult,
LogStartLine,
)
from dbt.node_types import NodeType
from dbt.contracts.results import NodeStatus
@@ -28,7 +27,7 @@ class SeedRunner(ModelRunner):
def before_execute(self):
fire_event(
PrintStartLine(
LogStartLine(
description=self.describe_node(),
index=self.node_index,
total=self.num_nodes,
@@ -47,30 +46,20 @@ class SeedRunner(ModelRunner):
def print_result_line(self, result):
model = result.node
if result.status == NodeStatus.Error:
fire_event(
PrintSeedErrorResultLine(
status=result.status,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
schema=self.node.schema,
relation=model.alias,
node_info=model.node_info,
)
)
else:
fire_event(
PrintSeedResultLine(
status=result.message,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
schema=self.node.schema,
relation=model.alias,
node_info=model.node_info,
)
level = "error" if result.status == NodeStatus.Error else "info"
fire_event(
LogSeedResult(
info=info(level=level),
status=result.status,
result_message=result.message,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
schema=self.node.schema,
relation=model.alias,
node_info=model.node_info,
)
)
class SeedTask(RunTask):

View File

@@ -1,8 +1,8 @@
from .run import ModelRunner, RunTask
from dbt.exceptions import InternalException
from dbt.events.functions import fire_event
from dbt.events.types import PrintSnapshotErrorResultLine, PrintSnapshotResultLine
from dbt.events.functions import fire_event, info
from dbt.events.types import LogSnapshotResult
from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.contracts.results import NodeStatus
@@ -15,30 +15,19 @@ class SnapshotRunner(ModelRunner):
def print_result_line(self, result):
model = result.node
cfg = model.config.to_dict(omit_none=True)
if result.status == NodeStatus.Error:
fire_event(
PrintSnapshotErrorResultLine(
status=result.status,
description=self.get_node_representation(),
cfg=cfg,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=model.node_info,
)
)
else:
fire_event(
PrintSnapshotResultLine(
status=result.message,
description=self.get_node_representation(),
cfg=cfg,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=model.node_info,
)
level = "error" if result.status == NodeStatus.Error else "info"
fire_event(
LogSnapshotResult(
info=info(level=level),
status=result.status,
description=self.get_node_representation(),
cfg=cfg,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=model.node_info,
)
)
class SnapshotTask(RunTask):

View File

@@ -19,13 +19,10 @@ from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.results import TestStatus, PrimitiveDict, RunResult
from dbt.context.providers import generate_runtime_model_context
from dbt.clients.jinja import MacroGenerator
from dbt.events.functions import fire_event
from dbt.events.functions import fire_event, info
from dbt.events.types import (
PrintErrorTestResult,
PrintPassTestResult,
PrintWarnTestResult,
PrintFailureTestResult,
PrintStartLine,
LogTestResult,
LogStartLine,
)
from dbt.exceptions import InternalException, invalid_bool_error, missing_materialization
from dbt.graph import (
@@ -67,54 +64,22 @@ class TestRunner(CompileRunner):
def print_result_line(self, result):
model = result.node
if result.status == TestStatus.Error:
fire_event(
PrintErrorTestResult(
name=model.name,
index=self.node_index,
num_models=self.num_nodes,
execution_time=result.execution_time,
node_info=model.node_info,
)
fire_event(
LogTestResult(
name=model.name,
info=info(level=LogTestResult.status_to_level(str(result.status))),
status=str(result.status),
index=self.node_index,
num_models=self.num_nodes,
execution_time=result.execution_time,
node_info=model.node_info,
num_failures=result.failures,
)
elif result.status == TestStatus.Pass:
fire_event(
PrintPassTestResult(
name=model.name,
index=self.node_index,
num_models=self.num_nodes,
execution_time=result.execution_time,
node_info=model.node_info,
)
)
elif result.status == TestStatus.Warn:
fire_event(
PrintWarnTestResult(
name=model.name,
index=self.node_index,
num_models=self.num_nodes,
execution_time=result.execution_time,
num_failures=result.failures,
node_info=model.node_info,
)
)
elif result.status == TestStatus.Fail:
fire_event(
PrintFailureTestResult(
name=model.name,
index=self.node_index,
num_models=self.num_nodes,
execution_time=result.execution_time,
num_failures=result.failures,
node_info=model.node_info,
)
)
else:
raise RuntimeError("unexpected status: {}".format(result.status))
)
def print_start_line(self):
fire_event(
PrintStartLine(
LogStartLine(
description=self.describe_node(),
index=self.node_index,
total=self.num_nodes,

View File

@@ -491,11 +491,11 @@ class SingleThreadedExecutor(ConnectingExecutor):
self, fn, *args = args
elif not args:
raise TypeError(
"descriptor 'submit' of 'SingleThreadedExecutor' object needs " "an argument"
"descriptor 'submit' of 'SingleThreadedExecutor' object needs an argument"
)
else:
raise TypeError(
"submit expected at least 1 positional argument, " "got %d" % (len(args) - 1)
"submit expected at least 1 positional argument, got %d" % (len(args) - 1)
)
fut = concurrent.futures.Future()
try:

View File

@@ -50,7 +50,7 @@ setup(
"agate>=1.6,<1.6.4",
"betterproto==1.2.5",
"click>=7.0,<9",
"colorama>=0.3.9,<0.4.6",
"colorama>=0.3.9,<0.4.7",
"hologram>=0.0.14,<=0.0.15",
"isodate>=0.6,<0.7",
"logbook>=1.5,<1.6",
@@ -63,7 +63,7 @@ setup(
"dbt-extractor~=0.4.1",
"typing-extensions>=3.7.4",
"werkzeug>=1,<3",
"pathspec~=0.9.0",
"pathspec>=0.9,<0.11",
# the following are all to match snowflake-connector-python
"requests<3.0.0",
"idna>=2.5,<4",

6503
schemas/dbt/manifest/v8.json Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,2 +0,0 @@
select bad sql here

View File

@@ -1,8 +0,0 @@
select 1 as id, current_date as updated_at
union all
select 2 as id, current_date as updated_at
union all
select 3 as id, current_date as updated_at
union all
select 4 as id, current_date as updated_at

View File

@@ -1,8 +0,0 @@
select 1 as id, current_date as updated_at
union all
select 2 as id, current_date as updated_at
union all
select 3 as id, current_date as updated_at
union all
select 4 as id, current_date as updated_at

View File

@@ -1,17 +0,0 @@
version: 2
models:
- name: good
columns:
- name: updated_at
tests:
- not_null
- name: bad
columns:
- name: updated_at
tests:
- not_null
- name: dupe
columns:
- name: updated_at
tests:
- unique

View File

@@ -1,2 +0,0 @@
a,b,c
1,\2,3,a,a,a
1 a,b,c
2 1,\2,3,a,a,a

View File

@@ -1,2 +0,0 @@
a,b,c
1,2,3
1 a b c
2 1 2 3

View File

@@ -1,4 +0,0 @@
{% snapshot good_snapshot %}
{{ config(target_schema=schema, target_database=database, strategy='timestamp', unique_key='id', updated_at='updated_at_not_real')}}
select * from {{ schema }}.good
{% endsnapshot %}

Some files were not shown because too many files have changed in this diff Show More