Compare commits

..

48 Commits

Author SHA1 Message Date
Quigley Malcolm
2e876d2c70 Update setup_db.sh to use CONNECTION_URI 2024-09-10 15:37:37 -05:00
Michelle Ark
5f357187ca pass CONNECTION_STR to downstream postgres steps 2024-09-10 15:47:44 -04:00
Michelle Ark
afd1bf2771 pass CONNECTION_STR to downstream postgres steps 2024-09-10 15:47:40 -04:00
Michelle Ark
715386ad3a windows PG_LIB_DIR; macos do not install postgres@14 2024-09-10 15:38:52 -04:00
Michelle Ark
9554d1c926 use default port/username/password 2024-09-10 15:30:05 -04:00
Michelle Ark
d6ad6a5477 use ikalnytskyi/action-setup-postgres@v6 2024-09-10 15:25:58 -04:00
Mike Alfare
1d3d315249 Add flags from dbt_project.yml to the Project and RuntimeConfig objects (#10644)
* add flags from dbt_project.yml to the Project and RuntimeConfig objects
2024-09-06 15:42:29 -04:00
Gerda Shank
b35ad46e3f Remove deprecation warning to change "tests:" config to "data_tests:" (#10670) 2024-09-05 20:35:28 -04:00
Gerda Shank
c28cb92af5 Warn if timestamp updated_at field uses incompatible timestamp (#10352)
Co-authored-by: Michelle Ark <michelle.ark@dbtlabs.com>
2024-09-04 14:42:14 -04:00
RyoAriyama
b56d96df5e Fix/changes current working dir when using a dbt project dir (#9596)
* made class changing directory a context manager.

* add change log

* fix conflict

* made base as a context manager

* add assertion

* Remove index.html

* add it test to testDbtRunner

* fix deps args order

* fix test

---------

Co-authored-by: Doug Beatty <doug.beatty@dbtlabs.com>
Co-authored-by: Chenyu Li <chenyu.li@dbtlabs.com>
2024-09-03 13:41:53 -07:00
Jeremy Cohen
37d382c8e7 Filter out empty nodes after graph selection (#10580)
* Add unit test

* Filter out empty nodes after graph selection

* Add changie

* Add --indirect-selection empty check to unit test
2024-09-03 18:48:03 +02:00
Gerda Shank
9b7f4ff842 use full manifest in adapter instead of macro_manifest (#10609)
* use full manifest in adapter instead of macro_manifest

* Add test case

* Add changelog entry

* Remove commented code.

---------

Co-authored-by: Peter Allen Webb <peter.webb@dbtlabs.com>
2024-08-29 11:32:30 -04:00
Emily Rockman
555ff8091f update dep for psycopg (#10633) 2024-08-29 09:44:49 -05:00
Emily Rockman
98fddcf54f rework test to ignore utils version (#10625) 2024-08-28 15:25:09 -05:00
Emily Rockman
d652359c61 add typing (#10619) 2024-08-28 13:26:18 -05:00
Peter Webb
f7d21e012e Add More Typing to the dbt.task Module (#10622)
* Add typing to task module.

* More typing in the task module

* Still more types for task module
2024-08-28 11:18:01 -04:00
Quigley Malcolm
e1fa461186 [TIDY FIRST] Fix typing issues in dbt/core/tasks/clean.py (#10617) 2024-08-27 17:37:16 -05:00
Quigley Malcolm
1153597970 Fix typing errors in core/dbt/contracts/sql.py (#10615) 2024-08-27 17:37:00 -05:00
Quigley Malcolm
09f9febc25 [TIDY FIRST] Fix core/dbt/version.py type hinting (#10613) 2024-08-27 17:36:31 -05:00
Doug Beatty
22181409f6 Enable calling a macro in a pre- or post-hook config in properties.yml (#10603)
* Tests for calling a macro in a pre- or post-hook config in properties.yml

* Late render pre- and post-hooks configs in properties / schema YAML files

* Changelog entry
2024-08-27 11:08:56 -06:00
William Deng
f25a474f75 updated saved query tests and fixtures (#10610) 2024-08-26 17:39:35 -04:00
aliceliu
3c55806203 Fix state:modified check for exports (#10565) 2024-08-23 15:22:38 -04:00
Gerda Shank
bba020fcc0 Add test for source names with quotes (#10588) 2024-08-21 11:57:34 -04:00
Courtney Holcomb
84eb0ff672 Bump DSI version (#10585)
* Bump DSI version

* Changelog
2024-08-20 16:37:52 -04:00
Kshitij Aranke
3695698e22 [CORE-364] Add group info to RunResultError, RunResultFailure, RunResultWarning log lines (#10535) 2024-08-19 11:26:00 -07:00
Courtney Holcomb
9ca1bc5b4c Remove unneeded TODO (#10568) 2024-08-14 14:49:47 -07:00
Gerda Shank
5f66678f6d Incremental models with a contract don't need their columns modified (#10371) 2024-08-14 08:15:25 -07:00
Jean Cochrane
63262e93cb Use model alias for the CTE identifier generated during ephemeral materialization (#10290)
* Use alias instead of name when adding ephemeral model prefixes

* Adjust TestCustomSchemaWithCustomMacroFromModelName to test ephemeral models

* Add changelog entry for ephemeral model CTE identifier fix

* Reference model.identifier and model.name where appropriate to resolve typing errors

* Move test for ephemeral model with alias to dedicated test in test_compile.py
2024-08-09 15:00:55 -07:00
Tobie Tusing
374412af53 Improve tree traversal of select_children (#10526)
* update children search

* update search to include children in original selector

* add changie

* remove unused function

* fix wrong function call

* fix depth
2024-08-09 17:38:15 -04:00
Kshitij Aranke
47848b8ea8 Fix add_ephemeral_prefix to identifier instead of name (#10550) 2024-08-09 13:58:37 -05:00
Michelle Ark
3d09872a56 reset deprecations prior to usage in unit tests (#10545) 2024-08-08 12:25:22 -04:00
Colin Rogers
dfa7d06526 Revert "Remove Undocumented Property" (#10544)
* Revert "Remove undocumented property which does not pass mypy checks after an…"

This reverts commit 21a46332f1.

* add code comment
2024-08-07 20:01:55 -07:00
aliceliu
7f57dd5a30 Support using the measure label when using create_metric option (#10536) 2024-08-07 15:06:46 -05:00
Peter Webb
56bfbeaedd Depend on snowplow-tracker rather than our old fork, minimal-snowplow-tracker. (#10530) 2024-08-07 14:55:42 -04:00
Michelle Ark
1dd26e79af deprecations.buffer: respect --quiet and --warn-error-options for deprecations (#10534) 2024-08-07 11:16:40 -04:00
Quigley Malcolm
86223609dd Parameterized testing examples utilizing happy path fixture (#10480)
* sketch

* Bring back the happy path fixture snapshot file

The commit c783a86 removed the snapshot file from the happy path fixture.
This was done because the snapshot was breaking the tests we were adding,
`test_run_commands`. However this broke `test_ls` in `test_list.py`. In order
to move forward, we need everything to be working. Maybe the idea was to delete
the `test_list.py` file, however that is not noted anywhere, and was not done.
Thus this commit ensures that test is not broken nor or new tests.

* Create conftest for `functional` tests so that happy path fixtures are accessible

* Format `test_commands.py` and update imports to appease pre-commit hooks

* Parametrize `test_run_command` to make it easier to see which command is failing (if any)

* Update the setup for `TestRunCommands.test_run_command` to be more formulaic

* Add test to ensure resource types are selectable

* Fix docstring formatting in TestRunCommands

* Fixup documentation for test_commands.py

---------

Co-authored-by: Chenyu Li <chenyu.li@dbtlabs.com>
2024-08-07 10:05:25 -05:00
Peter Webb
21a46332f1 Remove undocumented property which does not pass mypy checks after annotations in dbt-common. (#10529) 2024-08-06 11:04:54 -04:00
Michelle Ark
ff2726c3b5 more defensive node.all_constraints access (#10508) 2024-07-31 20:02:27 -04:00
Courtney Holcomb
014444dc18 Bump DSI version to release new time spine validations (#10507) 2024-07-30 15:52:40 -07:00
Kshitij Aranke
25c2042dc9 Bump dbt-adapters to 1.3.0 (#10499) 2024-07-29 18:57:24 +01:00
Courtney Holcomb
0a160fc27a Support time spine configs for sub-daily granularity (#10483) 2024-07-29 13:39:39 -04:00
Michelle Ark
c598741262 Bump dbt common 1.6 (#10489) 2024-07-26 13:51:34 -04:00
Courtney Holcomb
f9c2b9398f Remove newlines from JSON schema files (#10486) 2024-07-26 13:36:13 -04:00
Michelle Ark
cab6dabbc7 parse + compile constraint.to and constraint.to_columns on foreign key constraints (#10414) 2024-07-25 10:56:17 -04:00
nakamichi
e1621ebc54 Fix typing for artifact schemas (#10443) 2024-07-24 18:22:21 -04:00
Michelle Ark
cd90d4493c add predicate to EventCatcher test util (#10482) 2024-07-23 17:55:30 -04:00
Kshitij Aranke
560d151dcd [Tidy First] Update PR template punctuation (#10479) 2024-07-23 19:32:52 +01:00
Doug Beatty
229c537748 Update pytest examples for contributors (#10478) 2024-07-23 11:42:31 -06:00
137 changed files with 3876 additions and 644 deletions

View File

@@ -0,0 +1,6 @@
kind: Breaking Changes
body: Fix changing the current working directory when using dpt deps, clean and init.
time: 2023-12-06T19:24:42.575372+09:00
custom:
Author: rariyama
Issue: "8997"

View File

@@ -0,0 +1,7 @@
kind: Dependencies
body: Increase supported version range for dbt-semantic-interfaces. Needed to support
custom calendar features.
time: 2024-08-20T13:19:09.015225-07:00
custom:
Author: courtneyholcomb
Issue: "9265"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Warning message for snapshot timestamp data types
time: 2024-06-21T14:16:35.717637-04:00
custom:
Author: gshank
Issue: "10234"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Support ref and source in foreign key constraint expressions, bump dbt-common minimum to 1.6
time: 2024-07-19T16:18:41.434278-04:00
custom:
Author: michelleark
Issue: "8062"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Support new semantic layer time spine configs to enable sub-daily granularity.
time: 2024-07-22T20:22:38.258249-07:00
custom:
Author: courtneyholcomb
Issue: "10475"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Add support for behavior flags
time: 2024-08-29T13:53:20.16122-04:00
custom:
Author: mikealfare
Issue: "10618"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Use model alias for the CTE identifier generated during ephemeral materialization
time: 2024-06-10T20:05:22.510814008Z
custom:
Author: jeancochrane
Issue: "5273"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Fix typing for artifact schemas
time: 2024-07-14T10:02:54.452099+09:00
custom:
Author: nakamichiworks
Issue: "10442"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Do not update varchar column definitions if a contract exists
time: 2024-07-28T22:14:21.67712-04:00
custom:
Author: gshank
Issue: "10362"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: fix all_constraints access, disabled node parsing of non-uniquely named resources
time: 2024-07-31T09:51:52.751135-04:00
custom:
Author: michelleark gshank
Issue: "10509"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Propagate measure label when using create_metrics
time: 2024-08-06T17:21:10.265494-07:00
custom:
Author: aliceliu
Issue: "10536"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: respect --quiet and --warn-error-options for flag deprecations
time: 2024-08-06T19:48:43.399453-04:00
custom:
Author: michelleark
Issue: "10105"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Fix state:modified check for exports
time: 2024-08-13T15:42:35.471685-07:00
custom:
Author: aliceliu
Issue: "10138"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Filter out empty nodes after graph selection to support consistent selection of nodes that depend on upstream public models
time: 2024-08-16T14:08:07.426235-07:00
custom:
Author: jtcohen6
Issue: "8987"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Late render pre- and post-hooks configs in properties / schema YAML files
time: 2024-08-24T21:09:03.252733-06:00
custom:
Author: dbeatty10
Issue: "10603"

View File

@@ -0,0 +1,7 @@
kind: Fixes
body: Allow the use of env_var function in certain macros in which it was previously
unavailable.
time: 2024-08-29T10:57:01.160613-04:00
custom:
Author: peterallenwebb
Issue: "10609"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: 'Remove deprecation for tests: to data_tests: change'
time: 2024-09-05T18:02:48.086421-04:00
custom:
Author: gshank
Issue: "10564"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Move from minimal-snowplow-tracker fork back to snowplow-tracker
time: 2024-08-06T15:54:06.422444-04:00
custom:
Author: peterallenwebb
Issue: "8409"

View File

@@ -0,0 +1,7 @@
kind: Under the Hood
body: Add group info to RunResultError, RunResultFailure, RunResultWarning log lines
time: 2024-08-07T15:56:52.171199-05:00
custom:
Author: aranke
Issue: ""
JiraID: "364"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Improve speed of tree traversal when finding children, increasing build speed for some selectors
time: 2024-08-09T13:02:34.759905-07:00
custom:
Author: ttusing
Issue: "10434"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Add test for sources tables with quotes
time: 2024-08-21T09:55:16.038101-04:00
custom:
Author: gshank
Issue: "10582"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Additional type hints for `core/dbt/version.py`
time: 2024-08-27T10:50:14.047859-05:00
custom:
Author: QMalcolm
Issue: "10612"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Fix typing issues in core/dbt/contracts/sql.py
time: 2024-08-27T11:31:23.749912-05:00
custom:
Author: QMalcolm
Issue: "10614"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Fix type errors in `dbt/core/task/clean.py`
time: 2024-08-27T11:48:10.438173-05:00
custom:
Author: QMalcolm
Issue: "10616"

View File

@@ -5,7 +5,6 @@ runs:
steps:
- shell: bash
run: |
brew services start postgresql
echo "Check PostgreSQL service is running"
i=10
COMMAND='pg_isready'

View File

@@ -2,6 +2,8 @@ name: "Set up postgres (windows)"
description: "Set up postgres service on windows vm for dbt integration tests"
runs:
using: "composite"
env:
PQ_LIB_DIR: 'C:\Program Files\PostgreSQL\16\lib'
steps:
- shell: pwsh
run: |

View File

@@ -1,7 +1,7 @@
resolves #
Resolves #
<!---
Include the number of the issue addressed by this PR above if applicable.
Include the number of the issue addressed by this PR above, if applicable.
PRs for code changes without an associated issue *will not be merged*.
See CONTRIBUTING.md for more information.
@@ -26,8 +26,8 @@ resolves #
### Checklist
- [ ] I have read [the contributing guide](https://github.com/dbt-labs/dbt-core/blob/main/CONTRIBUTING.md) and understand what's expected of me
- [ ] I have run this code in development and it appears to resolve the stated issue
- [ ] This PR includes tests, or tests are not required/relevant for this PR
- [ ] This PR has no interface changes (e.g. macros, cli, logs, json artifacts, config files, adapter interface, etc) or this PR has already received feedback and approval from Product or DX
- [ ] This PR includes [type annotations](https://docs.python.org/3/library/typing.html) for new and modified functions
- [ ] I have read [the contributing guide](https://github.com/dbt-labs/dbt-core/blob/main/CONTRIBUTING.md) and understand what's expected of me.
- [ ] I have run this code in development, and it appears to resolve the stated issue.
- [ ] This PR includes tests, or tests are not required or relevant for this PR.
- [ ] This PR has no interface changes (e.g., macros, CLI, logs, JSON artifacts, config files, adapter interface, etc.) or this PR has already received feedback and approval from Product or DX.
- [ ] This PR includes [type annotations](https://docs.python.org/3/library/typing.html) for new and modified functions.

View File

@@ -186,17 +186,29 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
- name: Install postgres 16
uses: ikalnytskyi/action-setup-postgres@v6
with:
postgres-version: "16"
id: postgres
- name: Set up postgres (linux)
if: runner.os == 'Linux'
uses: ./.github/actions/setup-postgres-linux
env:
CONNECTION_STR: ${{ steps.postgres.outputs.connection-uri }}
- name: Set up postgres (macos)
if: runner.os == 'macOS'
uses: ./.github/actions/setup-postgres-macos
env:
CONNECTION_STR: ${{ steps.postgres.outputs.connection-uri }}
- name: Set up postgres (windows)
if: runner.os == 'Windows'
uses: ./.github/actions/setup-postgres-windows
env:
CONNECTION_STR: ${{ steps.postgres.outputs.connection-uri }}
- name: Install python tools
run: |
@@ -213,6 +225,7 @@ jobs:
command: tox -- --ddtrace
env:
PYTEST_ADDOPTS: ${{ format('--splits {0} --group {1}', env.PYTHON_INTEGRATION_TEST_WORKERS, matrix.split-group) }}
CONNECTION_STR: ${{ steps.postgres.outputs.connection-uri }}
- name: Get current date
if: always()

View File

@@ -15,6 +15,7 @@ repos:
args: [--unsafe]
- id: check-json
- id: end-of-file-fixer
exclude: schemas/dbt/manifest/
- id: trailing-whitespace
exclude_types:
- "markdown"

View File

@@ -170,9 +170,9 @@ Finally, you can also run a specific test or group of tests using [`pytest`](htt
```sh
# run all unit tests in a file
python3 -m pytest tests/unit/test_base_column.py
python3 -m pytest tests/unit/test_invocation_id.py
# run a specific unit test
python3 -m pytest tests/unit/test_base_column.py::TestNumericType::test__numeric_type
python3 -m pytest tests/unit/test_invocation_id.py::TestInvocationId::test_invocation_id
# run specific Postgres functional tests
python3 -m pytest tests/functional/sources
```

View File

@@ -144,3 +144,7 @@ help: ## Show this help message.
@echo
@echo 'options:'
@echo 'use USE_DOCKER=true to run target in a docker container'
.PHONY: json_schema
json_schema: ## Update generated JSON schema using code changes.
scripts/collect-artifact-schema.py --path schemas

View File

@@ -46,7 +46,7 @@ from dbt.artifacts.resources.v1.metric import (
MetricTimeWindow,
MetricTypeParams,
)
from dbt.artifacts.resources.v1.model import Model, ModelConfig
from dbt.artifacts.resources.v1.model import Model, ModelConfig, TimeSpine
from dbt.artifacts.resources.v1.owner import Owner
from dbt.artifacts.resources.v1.saved_query import (
Export,

View File

@@ -10,6 +10,7 @@ from dbt_common.contracts.config.properties import AdditionalPropertiesMixin
from dbt_common.contracts.constraints import ColumnLevelConstraint
from dbt_common.contracts.util import Mergeable
from dbt_common.dataclass_schema import ExtensibleDbtClassMixin, dbtClassMixin
from dbt_semantic_interfaces.type_enums import TimeGranularity
NodeVersion = Union[str, float]
@@ -66,6 +67,7 @@ class ColumnInfo(AdditionalPropertiesMixin, ExtensibleDbtClassMixin):
quote: Optional[bool] = None
tags: List[str] = field(default_factory=list)
_extra: Dict[str, Any] = field(default_factory=dict)
granularity: Optional[TimeGranularity] = None
@dataclass
@@ -219,7 +221,6 @@ class CompiledResource(ParsedResource):
extra_ctes: List[InjectedCTE] = field(default_factory=list)
_pre_injected_sql: Optional[str] = None
contract: Contract = field(default_factory=Contract)
event_time: Optional[str] = None
def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)

View File

@@ -11,6 +11,7 @@ from dbt.artifacts.resources.v1.components import (
from dbt.artifacts.resources.v1.config import NodeConfig
from dbt_common.contracts.config.base import MergeBehavior
from dbt_common.contracts.constraints import ModelLevelConstraint
from dbt_common.dataclass_schema import dbtClassMixin
@dataclass
@@ -21,6 +22,11 @@ class ModelConfig(NodeConfig):
)
@dataclass
class TimeSpine(dbtClassMixin):
standard_granularity_column: str
@dataclass
class Model(CompiledResource):
resource_type: Literal[NodeType.Model]
@@ -32,6 +38,7 @@ class Model(CompiledResource):
deprecation_date: Optional[datetime] = None
defer_relation: Optional[DeferRelation] = None
primary_key: List[str] = field(default_factory=list)
time_spine: Optional[TimeSpine] = None
def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)

View File

@@ -34,6 +34,7 @@ class Export(dbtClassMixin):
name: str
config: ExportConfig
unrendered_config: Dict[str, str] = field(default_factory=dict)
@dataclass

View File

@@ -70,4 +70,3 @@ class SourceDefinition(ParsedSourceMandatory):
unrendered_config: Dict[str, Any] = field(default_factory=dict)
relation_name: Optional[str] = None
created_at: float = field(default_factory=lambda: time.time())
event_time: Optional[str] = None

View File

@@ -77,8 +77,11 @@ class BaseArtifactMetadata(dbtClassMixin):
# remote-compile-result
# remote-execution-result
# remote-run-result
S = TypeVar("S", bound="VersionedSchema")
def schema_version(name: str, version: int):
def inner(cls: Type[VersionedSchema]):
def inner(cls: Type[S]):
cls.dbt_schema_version = SchemaVersion(
name=name,
version=version,

View File

@@ -1,7 +1,10 @@
from typing import IO, Optional
from typing import IO, List, Optional, Union
from click.exceptions import ClickException
from dbt.artifacts.schemas.catalog import CatalogArtifact
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.results import RunExecutionResult
from dbt.utils import ExitCodes
@@ -23,7 +26,7 @@ class CliException(ClickException):
# the typing of _file is to satisfy the signature of ClickException.show
# overriding this method prevents click from printing any exceptions to stdout
def show(self, _file: Optional[IO] = None) -> None:
def show(self, _file: Optional[IO] = None) -> None: # type: ignore[type-arg]
pass
@@ -31,7 +34,17 @@ class ResultExit(CliException):
"""This class wraps any exception that contains results while invoking dbt, or the
results of an invocation that did not succeed but did not throw any exceptions."""
def __init__(self, result) -> None:
def __init__(
self,
result: Union[
bool, # debug
CatalogArtifact, # docs generate
List[str], # list/ls
Manifest, # parse
None, # clean, deps, init, source
RunExecutionResult, # build, compile, run, seed, snapshot, test, run-operation
] = None,
) -> None:
super().__init__(ExitCodes.ModelError)
self.result = result

View File

@@ -15,7 +15,7 @@ from dbt.cli.resolvers import default_log_path, default_project_dir
from dbt.cli.types import Command as CliCommand
from dbt.config.project import read_project_flags
from dbt.contracts.project import ProjectFlags
from dbt.deprecations import renamed_env_var
from dbt.deprecations import fire_buffered_deprecations, renamed_env_var
from dbt.events import ALL_EVENT_NAMES
from dbt_common import ui
from dbt_common.clients import jinja
@@ -355,6 +355,8 @@ class Flags:
# not get pickled when written to disk as json.
object.__delattr__(self, "deprecated_env_var_warnings")
fire_buffered_deprecations()
@classmethod
def from_dict(cls, command: CliCommand, args_dict: Dict[str, Any]) -> "Flags":
command_arg_list = command_params(command, args_dict)

View File

@@ -218,10 +218,9 @@ def clean(ctx, **kwargs):
"""Delete all folders in the clean-targets list (usually the dbt_packages and target directories.)"""
from dbt.task.clean import CleanTask
task = CleanTask(ctx.obj["flags"], ctx.obj["project"])
results = task.run()
success = task.interpret_results(results)
with CleanTask(ctx.obj["flags"], ctx.obj["project"]) as task:
results = task.run()
success = task.interpret_results(results)
return results, success
@@ -437,9 +436,9 @@ def deps(ctx, **kwargs):
message=f"Version is required in --add-package when a package when source is {flags.SOURCE}",
option_name="--add-package",
)
task = DepsTask(flags, ctx.obj["project"])
results = task.run()
success = task.interpret_results(results)
with DepsTask(flags, ctx.obj["project"]) as task:
results = task.run()
success = task.interpret_results(results)
return results, success
@@ -459,10 +458,9 @@ def init(ctx, **kwargs):
"""Initialize a new dbt project."""
from dbt.task.init import InitTask
task = InitTask(ctx.obj["flags"])
results = task.run()
success = task.interpret_results(results)
with InitTask(ctx.obj["flags"]) as task:
results = task.run()
success = task.interpret_results(results)
return results, success

View File

@@ -1,11 +1,13 @@
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional, Union
import jinja2
from dbt.exceptions import MacroNamespaceNotStringError
from dbt.artifacts.resources import RefArgs
from dbt.exceptions import MacroNamespaceNotStringError, ParsingError
from dbt_common.clients.jinja import get_environment
from dbt_common.exceptions.macros import MacroNameNotStringError
from dbt_common.tests import test_caching_enabled
from dbt_extractor import ExtractionError, py_extract_from_source # type: ignore
_TESTING_MACRO_CACHE: Optional[Dict[str, Any]] = {}
@@ -153,3 +155,39 @@ def statically_parse_adapter_dispatch(func_call, ctx, db_wrapper):
possible_macro_calls.append(f"{package_name}.{func_name}")
return possible_macro_calls
def statically_parse_ref_or_source(expression: str) -> Union[RefArgs, List[str]]:
"""
Returns a RefArgs or List[str] object, corresponding to ref or source respectively, given an input jinja expression.
input: str representing how input node is referenced in tested model sql
* examples:
- "ref('my_model_a')"
- "ref('my_model_a', version=3)"
- "ref('package', 'my_model_a', version=3)"
- "source('my_source_schema', 'my_source_name')"
If input is not a well-formed jinja ref or source expression, a ParsingError is raised.
"""
ref_or_source: Union[RefArgs, List[str]]
try:
statically_parsed = py_extract_from_source(f"{{{{ {expression} }}}}")
except ExtractionError:
raise ParsingError(f"Invalid jinja expression: {expression}")
if statically_parsed.get("refs"):
raw_ref = list(statically_parsed["refs"])[0]
ref_or_source = RefArgs(
package=raw_ref.get("package"),
name=raw_ref.get("name"),
version=raw_ref.get("version"),
)
elif statically_parsed.get("sources"):
source_name, source_table_name = list(statically_parsed["sources"])[0]
ref_or_source = [source_name, source_table_name]
else:
raise ParsingError(f"Invalid ref or source expression: {expression}")
return ref_or_source

View File

@@ -21,6 +21,7 @@ from dbt.contracts.graph.nodes import (
InjectedCTE,
ManifestNode,
ManifestSQLNode,
ModelNode,
SeedNode,
UnitTestDefinition,
UnitTestNode,
@@ -29,12 +30,15 @@ from dbt.events.types import FoundStats, WritingInjectedSQLForNode
from dbt.exceptions import (
DbtInternalError,
DbtRuntimeError,
ForeignKeyConstraintToSyntaxError,
GraphDependencyNotFoundError,
ParsingError,
)
from dbt.flags import get_flags
from dbt.graph import Graph
from dbt.node_types import ModelLanguage, NodeType
from dbt_common.clients.system import make_directory
from dbt_common.contracts.constraints import ConstraintType
from dbt_common.events.contextvars import get_node_info
from dbt_common.events.format import pluralize
from dbt_common.events.functions import fire_event
@@ -371,7 +375,7 @@ class Compiler:
_extend_prepended_ctes(prepended_ctes, new_prepended_ctes)
new_cte_name = self.add_ephemeral_prefix(cte_model.name)
new_cte_name = self.add_ephemeral_prefix(cte_model.identifier)
rendered_sql = cte_model._pre_injected_sql or cte_model.compiled_code
sql = f" {new_cte_name} as (\n{rendered_sql}\n)"
@@ -437,8 +441,31 @@ class Compiler:
relation_name = str(relation_cls.create_from(self.config, node))
node.relation_name = relation_name
# Compile 'ref' and 'source' expressions in foreign key constraints
if isinstance(node, ModelNode):
for constraint in node.all_constraints:
if constraint.type == ConstraintType.foreign_key and constraint.to:
constraint.to = self._compile_relation_for_foreign_key_constraint_to(
manifest, node, constraint.to
)
return node
def _compile_relation_for_foreign_key_constraint_to(
self, manifest: Manifest, node: ManifestSQLNode, to_expression: str
) -> str:
try:
foreign_key_node = manifest.find_node_from_ref_or_source(to_expression)
except ParsingError:
raise ForeignKeyConstraintToSyntaxError(node, to_expression)
if not foreign_key_node:
raise GraphDependencyNotFoundError(node, to_expression)
adapter = get_adapter(self.config)
relation_name = str(adapter.Relation.create_from(self.config, foreign_key_node))
return relation_name
# This method doesn't actually "compile" any of the nodes. That is done by the
# "compile_node" method. This creates a Linker and builds the networkx graph,
# writes out the graph.gpickle file, and prints the stats, returning a Graph object.
@@ -520,6 +547,8 @@ class Compiler:
the node's raw_code into compiled_code, and then calls the
recursive method to "prepend" the ctes.
"""
# REVIEW: UnitTestDefinition shouldn't be possible here because of the
# type of node, and it is likewise an invalid return type.
if isinstance(node, UnitTestDefinition):
return node

View File

@@ -480,6 +480,7 @@ class PartialProject(RenderComponents):
rendered.selectors_dict["selectors"]
)
dbt_cloud = cfg.dbt_cloud
flags: Dict[str, Any] = cfg.flags
project = Project(
project_name=name,
@@ -524,6 +525,7 @@ class PartialProject(RenderComponents):
project_env_vars=project_env_vars,
restrict_access=cfg.restrict_access,
dbt_cloud=dbt_cloud,
flags=flags,
)
# sanity check - this means an internal issue
project.validate()
@@ -568,11 +570,6 @@ class PartialProject(RenderComponents):
) = package_and_project_data_from_root(project_root)
selectors_dict = selector_data_from_root(project_root)
if "flags" in project_dict:
# We don't want to include "flags" in the Project,
# it goes in ProjectFlags
project_dict.pop("flags")
return cls.from_dicts(
project_root=project_root,
project_dict=project_dict,
@@ -645,6 +642,7 @@ class Project:
project_env_vars: Dict[str, Any]
restrict_access: bool
dbt_cloud: Dict[str, Any]
flags: Dict[str, Any]
@property
def all_source_paths(self) -> List[str]:
@@ -724,6 +722,7 @@ class Project:
"require-dbt-version": [v.to_version_string() for v in self.dbt_version],
"restrict-access": self.restrict_access,
"dbt-cloud": self.dbt_cloud,
"flags": self.flags,
}
)
if self.query_comment:
@@ -821,8 +820,8 @@ def read_project_flags(project_dir: str, profiles_dir: str) -> ProjectFlags:
if profile_project_flags:
# This can't use WARN_ERROR or WARN_ERROR_OPTIONS because they're in
# the config that we're loading. Uses special "warn" method.
deprecations.warn("project-flags-moved")
# the config that we're loading. Uses special "buffer" method and fired after flags are initialized in preflight.
deprecations.buffer("project-flags-moved")
project_flags = profile_project_flags
if project_flags is not None:

View File

@@ -193,6 +193,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
log_cache_events=log_cache_events,
dependencies=dependencies,
dbt_cloud=project.dbt_cloud,
flags=project.flags,
)
# Called by 'load_projects' in this class

View File

@@ -1,3 +1,5 @@
from dbt_semantic_interfaces.type_enums import TimeGranularity
DEFAULT_ENV_PLACEHOLDER = "DBT_DEFAULT_PLACEHOLDER"
SECRET_PLACEHOLDER = "$$$DBT_SECRET_START$$${}$$$DBT_SECRET_END$$$"
@@ -15,6 +17,8 @@ DEPENDENCIES_FILE_NAME = "dependencies.yml"
PACKAGE_LOCK_FILE_NAME = "package-lock.yml"
MANIFEST_FILE_NAME = "manifest.json"
SEMANTIC_MANIFEST_FILE_NAME = "semantic_manifest.json"
TIME_SPINE_MODEL_NAME = "metricflow_time_spine"
LEGACY_TIME_SPINE_MODEL_NAME = "metricflow_time_spine"
LEGACY_TIME_SPINE_GRANULARITY = TimeGranularity.DAY
MINIMUM_REQUIRED_TIME_SPINE_GRANULARITY = TimeGranularity.DAY
PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"
PACKAGE_LOCK_HASH_KEY = "sha1_hash"

View File

@@ -8,7 +8,7 @@ from dbt.adapters.exceptions import (
RelationWrongTypeError,
)
from dbt.adapters.exceptions.cache import CacheInconsistencyError
from dbt.events.types import JinjaLogWarning
from dbt.events.types import JinjaLogWarning, SnapshotTimestampWarning
from dbt.exceptions import (
AmbiguousAliasError,
AmbiguousCatalogMatchError,
@@ -116,6 +116,17 @@ def raise_fail_fast_error(msg, node=None) -> NoReturn:
raise FailFastError(msg, node=node)
def warn_snapshot_timestamp_data_types(
snapshot_time_data_type: str, updated_at_data_type: str
) -> None:
warn_or_error(
SnapshotTimestampWarning(
snapshot_time_data_type=snapshot_time_data_type,
updated_at_data_type=updated_at_data_type,
)
)
# Update this when a new function should be added to the
# dbt context's `exceptions` key!
CONTEXT_EXPORTS = {
@@ -141,6 +152,7 @@ CONTEXT_EXPORTS = {
raise_contract_error,
column_type_missing,
raise_fail_fast_error,
warn_snapshot_timestamp_data_types,
]
}

View File

@@ -20,7 +20,6 @@ from typing_extensions import Protocol
from dbt import selected_resources
from dbt.adapters.base.column import Column
from dbt.adapters.base.relation import EventTimeFilter
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.exceptions import MissingConfigError
from dbt.adapters.factory import (
@@ -231,21 +230,6 @@ class BaseResolver(metaclass=abc.ABCMeta):
def resolve_limit(self) -> Optional[int]:
return 0 if getattr(self.config.args, "EMPTY", False) else None
@property
def resolve_event_time_filter(self) -> Optional[EventTimeFilter]:
field_name = getattr(self.model, "event_time")
start_time = getattr(self.model, "start_time")
end_time = getattr(self.model, "end_time")
if start_time and end_time and field_name:
return EventTimeFilter(
field_name=field_name,
start_time=start_time,
end_time=end_time,
)
return None
@abc.abstractmethod
def __call__(self, *args: str) -> Union[str, RelationProxy, MetricReference]:
pass
@@ -561,11 +545,7 @@ class RuntimeRefResolver(BaseRefResolver):
def create_relation(self, target_model: ManifestNode) -> RelationProxy:
if target_model.is_ephemeral_model:
self.model.set_cte(target_model.unique_id, None)
return self.Relation.create_ephemeral_from(
target_model,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter,
)
return self.Relation.create_ephemeral_from(target_model, limit=self.resolve_limit)
elif (
hasattr(target_model, "defer_relation")
and target_model.defer_relation
@@ -583,18 +563,10 @@ class RuntimeRefResolver(BaseRefResolver):
)
):
return self.Relation.create_from(
self.config,
target_model.defer_relation,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter,
self.config, target_model.defer_relation, limit=self.resolve_limit
)
else:
return self.Relation.create_from(
self.config,
target_model,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter,
)
return self.Relation.create_from(self.config, target_model, limit=self.resolve_limit)
def validate(
self,
@@ -661,12 +633,7 @@ class RuntimeSourceResolver(BaseSourceResolver):
target_kind="source",
disabled=(isinstance(target_source, Disabled)),
)
return self.Relation.create_from(
self.config,
target_source,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter,
)
return self.Relation.create_from(self.config, target_source, limit=self.resolve_limit)
class RuntimeUnitTestSourceResolver(BaseSourceResolver):
@@ -1007,7 +974,8 @@ class ProviderContext(ManifestContext):
table = agate_helper.from_csv(path, text_columns=column_types, delimiter=delimiter)
except ValueError as e:
raise LoadAgateTableValueError(e, node=self.model)
table.original_abspath = os.path.abspath(path)
# this is used by some adapters
table.original_abspath = os.path.abspath(path) # type: ignore
return table
@contextproperty()

View File

@@ -32,9 +32,10 @@ from dbt.adapters.exceptions import (
from dbt.adapters.factory import get_adapter_package_names
# to preserve import paths
from dbt.artifacts.resources import BaseResource, DeferRelation, NodeVersion
from dbt.artifacts.resources import BaseResource, DeferRelation, NodeVersion, RefArgs
from dbt.artifacts.resources.v1.config import NodeConfig
from dbt.artifacts.schemas.manifest import ManifestMetadata, UniqueID, WritableManifest
from dbt.clients.jinja_static import statically_parse_ref_or_source
from dbt.contracts.files import (
AnySourceFile,
FileHash,
@@ -412,11 +413,11 @@ class DisabledLookup(dbtClassMixin):
self.storage: Dict[str, Dict[PackageName, List[Any]]] = {}
self.populate(manifest)
def populate(self, manifest):
def populate(self, manifest: "Manifest"):
for node in list(chain.from_iterable(manifest.disabled.values())):
self.add_node(node)
def add_node(self, node):
def add_node(self, node: GraphMemberNode) -> None:
if node.search_name not in self.storage:
self.storage[node.search_name] = {}
if node.package_name not in self.storage[node.search_name]:
@@ -426,8 +427,12 @@ class DisabledLookup(dbtClassMixin):
# This should return a list of disabled nodes. It's different from
# the other Lookup functions in that it returns full nodes, not just unique_ids
def find(
self, search_name, package: Optional[PackageName], version: Optional[NodeVersion] = None
):
self,
search_name,
package: Optional[PackageName],
version: Optional[NodeVersion] = None,
resource_types: Optional[List[NodeType]] = None,
) -> Optional[List[Any]]:
if version:
search_name = f"{search_name}.v{version}"
@@ -436,16 +441,29 @@ class DisabledLookup(dbtClassMixin):
pkg_dct: Mapping[PackageName, List[Any]] = self.storage[search_name]
nodes = []
if package is None:
if not pkg_dct:
return None
else:
return next(iter(pkg_dct.values()))
nodes = next(iter(pkg_dct.values()))
elif package in pkg_dct:
return pkg_dct[package]
nodes = pkg_dct[package]
else:
return None
if resource_types is None:
return nodes
else:
new_nodes = []
for node in nodes:
if node.resource_type in resource_types:
new_nodes.append(node)
if not new_nodes:
return None
else:
return new_nodes
class AnalysisLookup(RefableLookup):
_lookup_types: ClassVar[set] = set([NodeType.Analysis])
@@ -1294,7 +1312,12 @@ class Manifest(MacroMethods, dbtClassMixin):
# it's possible that the node is disabled
if disabled is None:
disabled = self.disabled_lookup.find(target_model_name, pkg, target_model_version)
disabled = self.disabled_lookup.find(
target_model_name,
pkg,
version=target_model_version,
resource_types=REFABLE_NODE_TYPES,
)
if disabled:
return Disabled(disabled[0])
@@ -1635,6 +1658,22 @@ class Manifest(MacroMethods, dbtClassMixin):
# end of methods formerly in ParseResult
def find_node_from_ref_or_source(
self, expression: str
) -> Optional[Union[ModelNode, SourceDefinition]]:
ref_or_source = statically_parse_ref_or_source(expression)
node = None
if isinstance(ref_or_source, RefArgs):
node = self.ref_lookup.find(
ref_or_source.name, ref_or_source.package, ref_or_source.version, self
)
else:
source_name, source_table_name = ref_or_source[0], ref_or_source[1]
node = self.source_lookup.find(f"{source_name}.{source_table_name}", None, self)
return node
# Provide support for copy.deepcopy() - we just need to avoid the lock!
# pickle and deepcopy use this. It returns a callable object used to
# create the initial version of the object and a tuple of arguments

View File

@@ -18,7 +18,6 @@ from typing import (
from mashumaro.types import SerializableType
from dbt import deprecations
from dbt.adapters.base import ConstraintSupport
from dbt.adapters.factory import get_adapter_constraint_support
from dbt.artifacts.resources import Analysis as AnalysisResource
@@ -58,6 +57,7 @@ from dbt.artifacts.resources import SingularTest as SingularTestResource
from dbt.artifacts.resources import Snapshot as SnapshotResource
from dbt.artifacts.resources import SourceDefinition as SourceDefinitionResource
from dbt.artifacts.resources import SqlOperation as SqlOperationResource
from dbt.artifacts.resources import TimeSpine
from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource
from dbt.contracts.graph.model_config import UnitTestNodeConfig
from dbt.contracts.graph.node_args import ModelNodeArgs
@@ -85,7 +85,11 @@ from dbt.node_types import (
NodeType,
)
from dbt_common.clients.system import write_file
from dbt_common.contracts.constraints import ConstraintType
from dbt_common.contracts.constraints import (
ColumnLevelConstraint,
ConstraintType,
ModelLevelConstraint,
)
from dbt_common.events.contextvars import set_log_contextvars
from dbt_common.events.functions import warn_or_error
@@ -379,10 +383,6 @@ class CompiledNode(CompiledResource, ParsedNode):
"""Contains attributes necessary for SQL files and nodes with refs, sources, etc,
so all ManifestNodes except SeedNode."""
# TODO: should these go here? and get set during execution?
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
@property
def empty(self):
return not self.raw_code.strip()
@@ -493,6 +493,18 @@ class ModelNode(ModelResource, CompiledNode):
def materialization_enforces_constraints(self) -> bool:
return self.config.materialized in ["table", "incremental"]
@property
def all_constraints(self) -> List[Union[ModelLevelConstraint, ColumnLevelConstraint]]:
constraints: List[Union[ModelLevelConstraint, ColumnLevelConstraint]] = []
for model_level_constraint in self.constraints:
constraints.append(model_level_constraint)
for column in self.columns.values():
for column_level_constraint in column.constraints:
constraints.append(column_level_constraint)
return constraints
def infer_primary_key(self, data_tests: List["GenericTestNode"]) -> List[str]:
"""
Infers the columns that can be used as primary key of a model in the following order:
@@ -1135,12 +1147,6 @@ class UnpatchedSourceDefinition(BaseNode):
"Invalid test config: cannot have both 'tests' and 'data_tests' defined"
)
if self.tests:
if is_root_project:
deprecations.warn(
"project-test-config",
deprecated_path="tests",
exp_path="data_tests",
)
self.data_tests.extend(self.tests)
self.tests.clear()
@@ -1151,12 +1157,6 @@ class UnpatchedSourceDefinition(BaseNode):
"Invalid test config: cannot have both 'tests' and 'data_tests' defined"
)
if column.tests:
if is_root_project:
deprecations.warn(
"project-test-config",
deprecated_path="tests",
exp_path="data_tests",
)
column.data_tests.extend(column.tests)
column.tests.clear()
@@ -1454,6 +1454,13 @@ class Group(GroupResource, BaseNode):
def resource_class(cls) -> Type[GroupResource]:
return GroupResource
def to_logging_dict(self) -> Dict[str, Union[str, Dict[str, str]]]:
return {
"name": self.name,
"package_name": self.package_name,
"owner": self.owner.to_dict(),
}
# ====================================
# SemanticModel node
@@ -1559,13 +1566,12 @@ class SavedQuery(NodeInfoMixin, GraphNode, SavedQueryResource):
# exports should be in the same order, so we zip them for easy iteration
for old_export, new_export in zip(old.exports, self.exports):
if not (
old_export.name == new_export.name
and old_export.config.export_as == new_export.config.export_as
and old_export.config.schema_name == new_export.config.schema_name
and old_export.config.alias == new_export.config.alias
):
if not (old_export.name == new_export.name):
return False
keys = ["export_as", "schema", "alias"]
for key in keys:
if old_export.unrendered_config.get(key) != new_export.unrendered_config.get(key):
return False
return True
@@ -1613,6 +1619,7 @@ class ParsedNodePatch(ParsedPatch):
latest_version: Optional[NodeVersion]
constraints: List[Dict[str, Any]]
deprecation_date: Optional[datetime]
time_spine: Optional[TimeSpine] = None
@dataclass

View File

@@ -1,10 +1,19 @@
from dbt.constants import TIME_SPINE_MODEL_NAME
from typing import List, Optional
from dbt.constants import (
LEGACY_TIME_SPINE_GRANULARITY,
LEGACY_TIME_SPINE_MODEL_NAME,
MINIMUM_REQUIRED_TIME_SPINE_GRANULARITY,
)
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ModelNode
from dbt.events.types import SemanticValidationFailure
from dbt.exceptions import ParsingError
from dbt_common.clients.system import write_file
from dbt_common.events.base_types import EventLevel
from dbt_common.events.functions import fire_event
from dbt_semantic_interfaces.implementations.metric import PydanticMetric
from dbt_semantic_interfaces.implementations.node_relation import PydanticNodeRelation
from dbt_semantic_interfaces.implementations.project_configuration import (
PydanticProjectConfiguration,
)
@@ -13,8 +22,12 @@ from dbt_semantic_interfaces.implementations.semantic_manifest import (
PydanticSemanticManifest,
)
from dbt_semantic_interfaces.implementations.semantic_model import PydanticSemanticModel
from dbt_semantic_interfaces.implementations.time_spine import (
PydanticTimeSpine,
PydanticTimeSpinePrimaryColumn,
)
from dbt_semantic_interfaces.implementations.time_spine_table_configuration import (
PydanticTimeSpineTableConfiguration,
PydanticTimeSpineTableConfiguration as LegacyTimeSpine,
)
from dbt_semantic_interfaces.type_enums import TimeGranularity
from dbt_semantic_interfaces.validations.semantic_manifest_validator import (
@@ -23,7 +36,7 @@ from dbt_semantic_interfaces.validations.semantic_manifest_validator import (
class SemanticManifest:
def __init__(self, manifest) -> None:
def __init__(self, manifest: Manifest) -> None:
self.manifest = manifest
def validate(self) -> bool:
@@ -59,8 +72,50 @@ class SemanticManifest:
write_file(file_path, json)
def _get_pydantic_semantic_manifest(self) -> PydanticSemanticManifest:
pydantic_time_spines: List[PydanticTimeSpine] = []
minimum_time_spine_granularity: Optional[TimeGranularity] = None
for node in self.manifest.nodes.values():
if not (isinstance(node, ModelNode) and node.time_spine):
continue
time_spine = node.time_spine
standard_granularity_column = None
for column in node.columns.values():
if column.name == time_spine.standard_granularity_column:
standard_granularity_column = column
break
# Assertions needed for type checking
if not standard_granularity_column:
raise ParsingError(
"Expected to find time spine standard granularity column in model columns, but did not. "
"This should have been caught in YAML parsing."
)
if not standard_granularity_column.granularity:
raise ParsingError(
"Expected to find granularity set for time spine standard granularity column, but did not. "
"This should have been caught in YAML parsing."
)
pydantic_time_spine = PydanticTimeSpine(
node_relation=PydanticNodeRelation(
alias=node.alias,
schema_name=node.schema,
database=node.database,
relation_name=node.relation_name,
),
primary_column=PydanticTimeSpinePrimaryColumn(
name=time_spine.standard_granularity_column,
time_granularity=standard_granularity_column.granularity,
),
)
pydantic_time_spines.append(pydantic_time_spine)
if (
not minimum_time_spine_granularity
or standard_granularity_column.granularity.to_int()
< minimum_time_spine_granularity.to_int()
):
minimum_time_spine_granularity = standard_granularity_column.granularity
project_config = PydanticProjectConfiguration(
time_spine_table_configurations=[],
time_spine_table_configurations=[], time_spines=pydantic_time_spines
)
pydantic_semantic_manifest = PydanticSemanticManifest(
metrics=[], semantic_models=[], project_configuration=project_config
@@ -79,24 +134,39 @@ class SemanticManifest:
PydanticSavedQuery.parse_obj(saved_query.to_dict())
)
# Look for time-spine table model and create time spine table configuration
if self.manifest.semantic_models:
# Get model for time_spine_table
model = self.manifest.ref_lookup.find(TIME_SPINE_MODEL_NAME, None, None, self.manifest)
if not model:
raise ParsingError(
"The semantic layer requires a 'metricflow_time_spine' model in the project, but none was found. "
"Guidance on creating this model can be found on our docs site ("
"https://docs.getdbt.com/docs/build/metricflow-time-spine) "
)
# Create time_spine_table_config, set it in project_config, and add to semantic manifest
time_spine_table_config = PydanticTimeSpineTableConfiguration(
location=model.relation_name,
column_name="date_day",
grain=TimeGranularity.DAY,
legacy_time_spine_model = self.manifest.ref_lookup.find(
LEGACY_TIME_SPINE_MODEL_NAME, None, None, self.manifest
)
pydantic_semantic_manifest.project_configuration.time_spine_table_configurations = [
time_spine_table_config
]
if legacy_time_spine_model:
if (
not minimum_time_spine_granularity
or LEGACY_TIME_SPINE_GRANULARITY.to_int()
< minimum_time_spine_granularity.to_int()
):
minimum_time_spine_granularity = LEGACY_TIME_SPINE_GRANULARITY
# If no time spines have been configured at DAY or smaller AND legacy time spine model does not exist, error.
if (
not minimum_time_spine_granularity
or minimum_time_spine_granularity.to_int()
> MINIMUM_REQUIRED_TIME_SPINE_GRANULARITY.to_int()
):
raise ParsingError(
"The semantic layer requires a time spine model with granularity DAY or smaller in the project, "
"but none was found. Guidance on creating this model can be found on our docs site "
"(https://docs.getdbt.com/docs/build/metricflow-time-spine)."
)
# For backward compatibility: if legacy time spine exists, include it in the manifest.
if legacy_time_spine_model:
legacy_time_spine = LegacyTimeSpine(
location=legacy_time_spine_model.relation_name,
column_name="date_day",
grain=LEGACY_TIME_SPINE_GRANULARITY,
)
pydantic_semantic_manifest.project_configuration.time_spine_table_configurations = [
legacy_time_spine
]
return pydantic_semantic_manifest

View File

@@ -116,6 +116,7 @@ class HasColumnAndTestProps(HasColumnProps):
class UnparsedColumn(HasColumnAndTestProps):
quote: Optional[bool] = None
tags: List[str] = field(default_factory=list)
granularity: Optional[str] = None # str is really a TimeGranularity Enum
@dataclass
@@ -206,6 +207,11 @@ class UnparsedNodeUpdate(HasConfig, HasColumnTests, HasColumnAndTestProps, HasYa
access: Optional[str] = None
@dataclass
class UnparsedTimeSpine(dbtClassMixin):
standard_granularity_column: str
@dataclass
class UnparsedModelUpdate(UnparsedNodeUpdate):
quote_columns: Optional[bool] = None
@@ -213,6 +219,7 @@ class UnparsedModelUpdate(UnparsedNodeUpdate):
latest_version: Optional[NodeVersion] = None
versions: Sequence[UnparsedVersion] = field(default_factory=list)
deprecation_date: Optional[datetime.datetime] = None
time_spine: Optional[UnparsedTimeSpine] = None
def __post_init__(self) -> None:
if self.latest_version:
@@ -234,6 +241,26 @@ class UnparsedModelUpdate(UnparsedNodeUpdate):
self.deprecation_date = normalize_date(self.deprecation_date)
if self.time_spine:
columns = (
self.get_columns_for_version(self.latest_version)
if self.latest_version
else self.columns
)
column_names_to_columns = {column.name: column for column in columns}
if self.time_spine.standard_granularity_column not in column_names_to_columns:
raise ParsingError(
f"Time spine standard granularity column must be defined on the model. Got invalid "
f"column name '{self.time_spine.standard_granularity_column}' for model '{self.name}'. Valid names"
f"{' for latest version' if self.latest_version else ''}: {list(column_names_to_columns.keys())}."
)
column = column_names_to_columns[self.time_spine.standard_granularity_column]
if not column.granularity:
raise ParsingError(
f"Time spine standard granularity column must have a granularity defined. Please add one for "
f"column '{self.time_spine.standard_granularity_column}' in model '{self.name}'."
)
def get_columns_for_version(self, version: NodeVersion) -> List[UnparsedColumn]:
if version not in self._version_map:
raise DbtInternalError(

View File

@@ -5,7 +5,6 @@ from mashumaro.jsonschema.annotations import Pattern
from mashumaro.types import SerializableType
from typing_extensions import Annotated
from dbt import deprecations
from dbt.adapters.contracts.connection import QueryComment
from dbt.contracts.util import Identifier, list_str
from dbt_common.contracts.util import Mergeable
@@ -259,6 +258,7 @@ class Project(dbtClassMixin):
query_comment: Optional[Union[QueryComment, NoValue, str]] = field(default_factory=NoValue)
restrict_access: bool = False
dbt_cloud: Optional[Dict[str, Any]] = None
flags: Dict[str, Any] = field(default_factory=dict)
class Config(dbtMashConfig):
# These tell mashumaro to use aliases for jsonschema and for "from_dict"
@@ -312,10 +312,6 @@ class Project(dbtClassMixin):
raise ValidationError(
"Invalid project config: cannot have both 'tests' and 'data_tests' defined"
)
if "tests" in data:
deprecations.warn(
"project-test-config", deprecated_path="tests", exp_path="data_tests"
)
@dataclass

View File

@@ -29,7 +29,8 @@ class RemoteCompileResult(RemoteCompileResultMixin):
generated_at: datetime = field(default_factory=datetime.utcnow)
@property
def error(self):
def error(self) -> None:
# TODO: Can we delete this? It's never set anywhere else and never accessed
return None
@@ -40,7 +41,7 @@ class RemoteExecutionResult(ExecutionResult):
args: Dict[str, Any] = field(default_factory=dict)
generated_at: datetime = field(default_factory=datetime.utcnow)
def write(self, path: str):
def write(self, path: str) -> None:
writable = RunResultsArtifact.from_execution_results(
generated_at=self.generated_at,
results=self.results,

View File

@@ -1,9 +1,9 @@
import abc
from typing import ClassVar, Dict, List, Optional, Set
from typing import Callable, ClassVar, Dict, List, Optional, Set
import dbt.tracking
from dbt.events import types as core_types
from dbt_common.events.functions import fire_event, warn_or_error
from dbt_common.events.functions import warn_or_error
class DBTDeprecation:
@@ -98,24 +98,10 @@ class CollectFreshnessReturnSignature(DBTDeprecation):
_event = "CollectFreshnessReturnSignature"
class TestsConfigDeprecation(DBTDeprecation):
_name = "project-test-config"
_event = "TestsConfigDeprecation"
class ProjectFlagsMovedDeprecation(DBTDeprecation):
_name = "project-flags-moved"
_event = "ProjectFlagsMovedDeprecation"
def show(self, *args, **kwargs) -> None:
if self.name not in active_deprecations:
event = self.event(**kwargs)
# We can't do warn_or_error because the ProjectFlags
# is where that is set up and we're just reading it.
fire_event(event)
self.track_deprecation_warn()
active_deprecations.add(self.name)
class PackageMaterializationOverrideDeprecation(DBTDeprecation):
_name = "package-materialization-override"
@@ -147,7 +133,7 @@ def renamed_env_var(old_name: str, new_name: str):
return cb
def warn(name, *args, **kwargs):
def warn(name: str, *args, **kwargs) -> None:
if name not in deprecations:
# this should (hopefully) never happen
raise RuntimeError("Error showing deprecation warning: {}".format(name))
@@ -155,6 +141,13 @@ def warn(name, *args, **kwargs):
deprecations[name].show(*args, **kwargs)
def buffer(name: str, *args, **kwargs):
def show_callback():
deprecations[name].show(*args, **kwargs)
buffered_deprecations.append(show_callback)
# these are globally available
# since modules are only imported once, active_deprecations is a singleton
@@ -169,7 +162,6 @@ deprecations_list: List[DBTDeprecation] = [
ConfigLogPathDeprecation(),
ConfigTargetPathDeprecation(),
CollectFreshnessReturnSignature(),
TestsConfigDeprecation(),
ProjectFlagsMovedDeprecation(),
PackageMaterializationOverrideDeprecation(),
ResourceNamesWithSpacesDeprecation(),
@@ -178,6 +170,13 @@ deprecations_list: List[DBTDeprecation] = [
deprecations: Dict[str, DBTDeprecation] = {d.name: d for d in deprecations_list}
buffered_deprecations: List[Callable] = []
def reset_deprecations():
active_deprecations.clear()
def fire_buffered_deprecations():
[dep_fn() for dep_fn in buffered_deprecations]
buffered_deprecations.clear()

View File

@@ -1610,6 +1610,17 @@ message CompiledNodeMsg {
CompiledNode data = 2;
}
// Q043
message SnapshotTimestampWarning {
string snapshot_time_data_type = 1;
string updated_at_data_type = 2;
}
message SnapshotTimestampWarningMsg {
CoreEventInfo info = 1;
SnapshotTimestampWarning data = 2;
}
// W - Node testing
// Skipped W001
@@ -1809,12 +1820,19 @@ message ServingDocsExitInfoMsg {
ServingDocsExitInfo data = 2;
}
message Group {
string name = 1;
string package_name = 3;
map<string, string> owner = 7;
}
// Z021
message RunResultWarning {
string resource_type = 1;
string node_name = 2;
string path = 3;
NodeInfo node_info = 4;
Group group = 5;
}
message RunResultWarningMsg {
@@ -1828,6 +1846,7 @@ message RunResultFailure {
string node_name = 2;
string path = 3;
NodeInfo node_info = 4;
Group group = 5;
}
message RunResultFailureMsg {
@@ -1849,6 +1868,7 @@ message StatsLineMsg {
message RunResultError {
string msg = 1;
NodeInfo node_info = 2;
Group group = 3;
}
message RunResultErrorMsg {

File diff suppressed because one or more lines are too long

View File

@@ -388,6 +388,9 @@ class ConfigTargetPathDeprecation(WarnLevel):
return line_wrap_message(warning_tag(f"Deprecated functionality\n\n{description}"))
# Note: this deprecation has been removed, but we are leaving
# the event class here, because users may have specified it in
# warn_error_options.
class TestsConfigDeprecation(WarnLevel):
def code(self) -> str:
return "D012"
@@ -1614,6 +1617,18 @@ class CompiledNode(InfoLevel):
return f"Compiled node '{self.node_name}' is:\n{self.compiled}"
class SnapshotTimestampWarning(WarnLevel):
def code(self) -> str:
return "Q043"
def message(self) -> str:
return (
f"Data type of snapshot table timestamp columns ({self.snapshot_time_data_type}) "
f"doesn't match derived column 'updated_at' ({self.updated_at_data_type}). "
"Please update snapshot config 'updated_at'."
)
# =======================================================
# W - Node testing
# =======================================================

View File

@@ -136,6 +136,18 @@ class GraphDependencyNotFoundError(CompilationError):
return msg
class ForeignKeyConstraintToSyntaxError(CompilationError):
def __init__(self, node, expression: str) -> None:
self.expression = expression
self.node = node
super().__init__(msg=self.get_message())
def get_message(self) -> str:
msg = f"'{self.node.unique_id}' defines a foreign key constraint 'to' expression which is not valid 'ref' or 'source' syntax: {self.expression}."
return msg
# client level exceptions

View File

@@ -59,18 +59,40 @@ class Graph:
def select_children(
self, selected: Set[UniqueId], max_depth: Optional[int] = None
) -> Set[UniqueId]:
descendants: Set[UniqueId] = set()
for node in selected:
descendants.update(self.descendants(node, max_depth))
return descendants
"""Returns all nodes which are descendants of the 'selected' set.
Nodes in the 'selected' set are counted as children only if
they are descendants of other nodes in the 'selected' set."""
children: Set[UniqueId] = set()
i = 0
while len(selected) > 0 and (max_depth is None or i < max_depth):
next_layer: Set[UniqueId] = set()
for node in selected:
next_layer.update(self.descendants(node, 1))
next_layer = next_layer - children # Avoid re-searching
children.update(next_layer)
selected = next_layer
i += 1
return children
def select_parents(
self, selected: Set[UniqueId], max_depth: Optional[int] = None
) -> Set[UniqueId]:
ancestors: Set[UniqueId] = set()
for node in selected:
ancestors.update(self.ancestors(node, max_depth))
return ancestors
"""Returns all nodes which are ancestors of the 'selected' set.
Nodes in the 'selected' set are counted as parents only if
they are ancestors of other nodes in the 'selected' set."""
parents: Set[UniqueId] = set()
i = 0
while len(selected) > 0 and (max_depth is None or i < max_depth):
next_layer: Set[UniqueId] = set()
for node in selected:
next_layer.update(self.ancestors(node, 1))
next_layer = next_layer - parents # Avoid re-searching
parents.update(next_layer)
selected = next_layer
i += 1
return parents
def select_successors(self, selected: Set[UniqueId]) -> Set[UniqueId]:
successors: Set[UniqueId] = set()

View File

@@ -87,12 +87,15 @@ class NodeSelector(MethodManager):
)
return set(), set()
neighbors = self.collect_specified_neighbors(spec, collected)
selected = collected | neighbors
# if --indirect-selection EMPTY, do not expand to adjacent tests
if spec.indirect_selection == IndirectSelection.Empty:
return collected, set()
return selected, set()
else:
neighbors = self.collect_specified_neighbors(spec, collected)
direct_nodes, indirect_nodes = self.expand_selection(
selected=(collected | neighbors), indirect_selection=spec.indirect_selection
selected=selected, indirect_selection=spec.indirect_selection
)
return direct_nodes, indirect_nodes
@@ -177,10 +180,14 @@ class NodeSelector(MethodManager):
node = self.manifest.nodes[unique_id]
if self.include_empty_nodes:
return node.config.enabled
return node.config.enabled
def _is_empty_node(self, unique_id: UniqueId) -> bool:
if unique_id in self.manifest.nodes:
node = self.manifest.nodes[unique_id]
return node.empty
else:
return not node.empty and node.config.enabled
return False
def node_is_match(self, node: GraphMemberNode) -> bool:
"""Determine if a node is a match for the selector. Non-match nodes
@@ -212,7 +219,12 @@ class NodeSelector(MethodManager):
"""Return the subset of selected nodes that is a match for this
selector.
"""
return {unique_id for unique_id in selected if self._is_match(unique_id)}
return {
unique_id
for unique_id in selected
if self._is_match(unique_id)
and (self.include_empty_nodes or not self._is_empty_node(unique_id))
}
def expand_selection(
self,

View File

@@ -18,6 +18,7 @@ from dbt.exceptions import ParsingError
from dbt.parser.search import FileBlock
from dbt_common.contracts.constraints import ColumnLevelConstraint, ConstraintType
from dbt_common.exceptions import DbtInternalError
from dbt_semantic_interfaces.type_enums import TimeGranularity
def trimmed(inp: str) -> str:
@@ -185,13 +186,12 @@ class ParserRef:
self.column_info: Dict[str, ColumnInfo] = {}
def _add(self, column: HasColumnProps) -> None:
tags: List[str] = []
tags.extend(getattr(column, "tags", ()))
quote: Optional[bool]
tags: List[str] = getattr(column, "tags", [])
quote: Optional[bool] = None
granularity: Optional[TimeGranularity] = None
if isinstance(column, UnparsedColumn):
quote = column.quote
else:
quote = None
granularity = TimeGranularity(column.granularity) if column.granularity else None
if any(
c
@@ -209,6 +209,7 @@ class ParserRef:
tags=tags,
quote=quote,
_extra=column.extra,
granularity=granularity,
)
@classmethod

View File

@@ -1028,12 +1028,11 @@ class ManifestLoader:
return state_check
def save_macros_to_adapter(self, adapter):
macro_manifest = MacroManifest(self.manifest.macros)
adapter.set_macro_resolver(macro_manifest)
adapter.set_macro_resolver(self.manifest)
# This executes the callable macro_hook and sets the
# query headers
# This executes the callable macro_hook and sets the query headers
query_header_context = generate_query_header_context(adapter.config, macro_manifest)
query_header_context = generate_query_header_context(adapter.config, self.manifest)
self.macro_hook(query_header_context)
# This creates a MacroManifest which contains the macros in

View File

@@ -11,6 +11,7 @@ from dbt.config.renderer import BaseRenderer, Keypath
# keyword args are rendered to capture refs in render_test_update.
# Keyword args are finally rendered at compilation time.
# Descriptions are not rendered until 'process_docs'.
# Pre- and post-hooks in configs are late-rendered.
class SchemaYamlRenderer(BaseRenderer):
def __init__(self, context: Dict[str, Any], key: str) -> None:
super().__init__(context)
@@ -43,6 +44,14 @@ class SchemaYamlRenderer(BaseRenderer):
if len(keypath) == 2 and keypath[1] in ("tests", "data_tests", "description"):
return True
# pre- and post-hooks
if (
len(keypath) >= 2
and keypath[0] == "config"
and keypath[1] in ("pre_hook", "post_hook")
):
return True
# versions
if len(keypath) == 5 and keypath[4] == "description":
return True

View File

@@ -612,7 +612,7 @@ class SemanticModelParser(YamlReader):
) -> None:
unparsed_metric = UnparsedMetric(
name=measure.name,
label=measure.name,
label=measure.label or measure.name,
type="simple",
type_params=UnparsedMetricTypeParams(measure=measure.name, expr=measure.name),
description=measure.description or f"Metric created from measure {measure.name}",
@@ -778,7 +778,9 @@ class SavedQueryParser(YamlReader):
self, unparsed: UnparsedExport, saved_query_config: SavedQueryConfig
) -> Export:
return Export(
name=unparsed.name, config=self._get_export_config(unparsed.config, saved_query_config)
name=unparsed.name,
config=self._get_export_config(unparsed.config, saved_query_config),
unrendered_config=unparsed.config,
)
def _get_query_params(self, unparsed: UnparsedQueryParams) -> QueryParams:

View File

@@ -4,7 +4,9 @@ from abc import ABCMeta, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Generic, Iterable, List, Optional, Type, TypeVar
from dbt import deprecations
from dbt.artifacts.resources import RefArgs
from dbt.artifacts.resources.v1.model import TimeSpine
from dbt.clients.jinja_static import statically_parse_ref_or_source
from dbt.clients.yaml_helper import load_yaml_text
from dbt.config import RuntimeConfig
from dbt.context.configured import SchemaYamlVars, generate_schema_yml_context
@@ -66,18 +68,20 @@ from dbt_common.events.functions import warn_or_error
from dbt_common.exceptions import DbtValidationError
from dbt_common.utils import deep_merge
schema_file_keys = (
"models",
"seeds",
"snapshots",
"sources",
"macros",
"analyses",
"exposures",
"metrics",
"semantic_models",
"saved_queries",
)
schema_file_keys_to_resource_types = {
"models": NodeType.Model,
"seeds": NodeType.Seed,
"snapshots": NodeType.Snapshot,
"sources": NodeType.Source,
"macros": NodeType.Macro,
"analyses": NodeType.Analysis,
"exposures": NodeType.Exposure,
"metrics": NodeType.Metric,
"semantic_models": NodeType.SemanticModel,
"saved_queries": NodeType.SavedQuery,
}
schema_file_keys = list(schema_file_keys_to_resource_types.keys())
# ===============================================================================
@@ -563,12 +567,6 @@ class PatchParser(YamlReader, Generic[NonSourceTarget, Parsed]):
raise ValidationError(
"Invalid test config: cannot have both 'tests' and 'data_tests' defined"
)
if is_root_project:
deprecations.warn(
"project-test-config",
deprecated_path="tests",
exp_path="data_tests",
)
data["data_tests"] = data.pop("tests")
# model-level tests
@@ -617,9 +615,16 @@ class NodePatchParser(PatchParser[NodeTarget, ParsedNodePatch], Generic[NodeTarg
# could possibly skip creating one. Leaving here for now for
# code consistency.
deprecation_date: Optional[datetime.datetime] = None
time_spine: Optional[TimeSpine] = None
if isinstance(block.target, UnparsedModelUpdate):
deprecation_date = block.target.deprecation_date
time_spine = (
TimeSpine(
standard_granularity_column=block.target.time_spine.standard_granularity_column
)
if block.target.time_spine
else None
)
patch = ParsedNodePatch(
name=block.target.name,
original_file_path=block.target.original_file_path,
@@ -635,6 +640,7 @@ class NodePatchParser(PatchParser[NodeTarget, ParsedNodePatch], Generic[NodeTarg
latest_version=None,
constraints=block.target.constraints,
deprecation_date=deprecation_date,
time_spine=time_spine,
)
assert isinstance(self.yaml.file, SchemaSourceFile)
source_file: SchemaSourceFile = self.yaml.file
@@ -667,7 +673,10 @@ class NodePatchParser(PatchParser[NodeTarget, ParsedNodePatch], Generic[NodeTarg
# handle disabled nodes
if unique_id is None:
# Node might be disabled. Following call returns list of matching disabled nodes
found_nodes = self.manifest.disabled_lookup.find(patch.name, patch.package_name)
resource_type = schema_file_keys_to_resource_types[patch.yaml_key]
found_nodes = self.manifest.disabled_lookup.find(
patch.name, patch.package_name, resource_types=[resource_type]
)
if found_nodes:
if len(found_nodes) > 1 and patch.config.get("enabled"):
# There are multiple disabled nodes for this model and the schema file wants to enable one.
@@ -799,7 +808,9 @@ class ModelPatchParser(NodePatchParser[UnparsedModelUpdate]):
if versioned_model_unique_id is None:
# Node might be disabled. Following call returns list of matching disabled nodes
found_nodes = self.manifest.disabled_lookup.find(versioned_model_name, None)
found_nodes = self.manifest.disabled_lookup.find(
versioned_model_name, None, resource_types=[NodeType.Model]
)
if found_nodes:
if len(found_nodes) > 1 and target.config.get("enabled"):
# There are multiple disabled nodes for this model and the schema file wants to enable one.
@@ -900,6 +911,11 @@ class ModelPatchParser(NodePatchParser[UnparsedModelUpdate]):
def patch_node_properties(self, node, patch: "ParsedNodePatch") -> None:
super().patch_node_properties(node, patch)
# Remaining patch properties are only relevant to ModelNode objects
if not isinstance(node, ModelNode):
return
node.version = patch.version
node.latest_version = patch.latest_version
node.deprecation_date = patch.deprecation_date
@@ -913,9 +929,10 @@ class ModelPatchParser(NodePatchParser[UnparsedModelUpdate]):
)
# These two will have to be reapplied after config is built for versioned models
self.patch_constraints(node, patch.constraints)
self.patch_time_spine(node, patch.time_spine)
node.build_contract_checksum()
def patch_constraints(self, node, constraints) -> None:
def patch_constraints(self, node: ModelNode, constraints: List[Dict[str, Any]]) -> None:
contract_config = node.config.get("contract")
if contract_config.enforced is True:
self._validate_constraint_prerequisites(node)
@@ -930,6 +947,29 @@ class ModelPatchParser(NodePatchParser[UnparsedModelUpdate]):
self._validate_pk_constraints(node, constraints)
node.constraints = [ModelLevelConstraint.from_dict(c) for c in constraints]
self._process_constraints_refs_and_sources(node)
def _process_constraints_refs_and_sources(self, model_node: ModelNode) -> None:
"""
Populate model_node.refs and model_node.sources based on foreign-key constraint references,
whether defined at the model-level or column-level.
"""
for constraint in model_node.all_constraints:
if constraint.type == ConstraintType.foreign_key and constraint.to:
try:
ref_or_source = statically_parse_ref_or_source(constraint.to)
except ParsingError:
raise ParsingError(
f"Invalid 'ref' or 'source' syntax on foreign key constraint 'to' on model {model_node.name}: {constraint.to}."
)
if isinstance(ref_or_source, RefArgs):
model_node.refs.append(ref_or_source)
else:
model_node.sources.append(ref_or_source)
def patch_time_spine(self, node: ModelNode, time_spine: Optional[TimeSpine]) -> None:
node.time_spine = time_spine
def _validate_pk_constraints(
self, model_node: ModelNode, constraints: List[Dict[str, Any]]

View File

@@ -17,5 +17,5 @@ class PluginNodes:
def add_model(self, model_args: ModelNodeArgs) -> None:
self.models[model_args.unique_id] = model_args
def update(self, other: "PluginNodes"):
def update(self, other: "PluginNodes") -> None:
self.models.update(other.models)

View File

@@ -44,15 +44,10 @@ from dbt.graph import Graph
from dbt.task.printer import print_run_result_error
from dbt_common.events.contextvars import get_node_info
from dbt_common.events.functions import fire_event
from dbt_common.exceptions import (
CompilationError,
DbtInternalError,
DbtRuntimeError,
NotImplementedError,
)
from dbt_common.exceptions import DbtInternalError, DbtRuntimeError, NotImplementedError
def read_profiles(profiles_dir=None):
def read_profiles(profiles_dir: Optional[str] = None) -> Dict[str, Any]:
"""This is only used for some error handling"""
if profiles_dir is None:
profiles_dir = get_flags().PROFILES_DIR
@@ -71,6 +66,13 @@ class BaseTask(metaclass=ABCMeta):
def __init__(self, args: Flags) -> None:
self.args = args
def __enter__(self):
self.orig_dir = os.getcwd()
return self
def __exit__(self, exc_type, exc_value, traceback):
os.chdir(self.orig_dir)
@abstractmethod
def run(self):
raise dbt_common.exceptions.base.NotImplementedError("Not Implemented")
@@ -123,7 +125,7 @@ class ConfiguredTask(BaseTask):
self.manifest = manifest
self.compiler = Compiler(self.config)
def compile_manifest(self):
def compile_manifest(self) -> None:
if self.manifest is None:
raise DbtInternalError("compile_manifest called before manifest was loaded")
@@ -165,7 +167,7 @@ class ExecutionContext:
class BaseRunner(metaclass=ABCMeta):
def __init__(self, config, adapter, node, node_index, num_nodes) -> None:
def __init__(self, config, adapter, node, node_index: int, num_nodes: int) -> None:
self.config = config
self.compiler = Compiler(config)
self.adapter = adapter
@@ -272,7 +274,7 @@ class BaseRunner(metaclass=ABCMeta):
failures=result.failures,
)
def compile_and_execute(self, manifest, ctx):
def compile_and_execute(self, manifest: Manifest, ctx: ExecutionContext):
result = None
with (
self.adapter.connection_named(self.node.unique_id, self.node)
@@ -305,7 +307,7 @@ class BaseRunner(metaclass=ABCMeta):
return result
def _handle_catchable_exception(self, e, ctx):
def _handle_catchable_exception(self, e: DbtRuntimeError, ctx: ExecutionContext) -> str:
if e.node is None:
e.add_node(ctx.node)
@@ -316,7 +318,7 @@ class BaseRunner(metaclass=ABCMeta):
)
return str(e)
def _handle_internal_exception(self, e, ctx):
def _handle_internal_exception(self, e: DbtInternalError, ctx: ExecutionContext) -> str:
fire_event(
InternalErrorOnRun(
build_path=self._node_build_path(), exc=str(e), node_info=get_node_info()
@@ -324,7 +326,7 @@ class BaseRunner(metaclass=ABCMeta):
)
return str(e)
def _handle_generic_exception(self, e, ctx):
def _handle_generic_exception(self, e: Exception, ctx: ExecutionContext) -> str:
fire_event(
GenericExceptionOnRun(
build_path=self._node_build_path(),
@@ -337,9 +339,8 @@ class BaseRunner(metaclass=ABCMeta):
return str(e)
def handle_exception(self, e, ctx):
catchable_errors = (CompilationError, DbtRuntimeError)
if isinstance(e, catchable_errors):
def handle_exception(self, e: Exception, ctx: ExecutionContext) -> str:
if isinstance(e, DbtRuntimeError):
error = self._handle_catchable_exception(e, ctx)
elif isinstance(e, DbtInternalError):
error = self._handle_internal_exception(e, ctx)
@@ -347,7 +348,7 @@ class BaseRunner(metaclass=ABCMeta):
error = self._handle_generic_exception(e, ctx)
return error
def safe_run(self, manifest):
def safe_run(self, manifest: Manifest):
started = time.time()
ctx = ExecutionContext(self.node)
error = None
@@ -394,19 +395,19 @@ class BaseRunner(metaclass=ABCMeta):
return None
def before_execute(self):
raise NotImplementedError()
def before_execute(self) -> None:
raise NotImplementedError("before_execute is not implemented")
def execute(self, compiled_node, manifest):
raise NotImplementedError()
raise NotImplementedError("execute is not implemented")
def run(self, compiled_node, manifest):
return self.execute(compiled_node, manifest)
def after_execute(self, result):
raise NotImplementedError()
def after_execute(self, result) -> None:
raise NotImplementedError("after_execute is not implemented")
def _skip_caused_by_ephemeral_failure(self):
def _skip_caused_by_ephemeral_failure(self) -> bool:
if self.skip_cause is None or self.skip_cause.node is None:
return False
return self.skip_cause.node.is_ephemeral_model
@@ -461,7 +462,7 @@ class BaseRunner(metaclass=ABCMeta):
node_result = RunResult.from_node(self.node, RunStatus.Skipped, error_message)
return node_result
def do_skip(self, cause=None):
def do_skip(self, cause=None) -> None:
self.skip = True
self.skip_cause = cause

View File

@@ -1,5 +1,5 @@
import threading
from typing import Dict, List, Set
from typing import Dict, List, Optional, Set, Type
from dbt.artifacts.schemas.results import NodeStatus, RunStatus
from dbt.artifacts.schemas.run import RunResult
@@ -24,16 +24,16 @@ from .test import TestRunner as test_runner
class SavedQueryRunner(BaseRunner):
# Stub. No-op Runner for Saved Queries, which require MetricFlow for execution.
@property
def description(self):
def description(self) -> str:
return f"saved query {self.node.name}"
def before_execute(self):
def before_execute(self) -> None:
pass
def compile(self, manifest):
def compile(self, manifest: Manifest):
return self.node
def after_execute(self, result):
def after_execute(self, result) -> None:
fire_event(
LogNodeNoOpResult(
description=self.description,
@@ -83,7 +83,7 @@ class BuildTask(RunTask):
self.selected_unit_tests: Set = set()
self.model_to_unit_test_map: Dict[str, List] = {}
def resource_types(self, no_unit_tests=False):
def resource_types(self, no_unit_tests: bool = False) -> List[NodeType]:
resource_types = resource_types_from_args(
self.args, set(self.ALL_RESOURCE_VALUES), set(self.ALL_RESOURCE_VALUES)
)
@@ -210,7 +210,7 @@ class BuildTask(RunTask):
resource_types=resource_types,
)
def get_runner_type(self, node):
def get_runner_type(self, node) -> Optional[Type[BaseRunner]]:
return self.RUNNER_MAP.get(node.resource_type)
# Special build compile_manifest method to pass add_test_edges to the compiler

View File

@@ -16,7 +16,7 @@ class CleanTask(BaseTask):
self.config = config
self.project = config
def run(self):
def run(self) -> None:
"""
This function takes all the paths in the target file
and cleans the project paths that are not protected.

View File

@@ -1,7 +1,8 @@
import threading
from typing import AbstractSet, Any, Iterable, List, Optional, Set
from typing import AbstractSet, Any, Collection, Iterable, List, Optional, Set, Type
from dbt.adapters.base import BaseRelation
from dbt.artifacts.resources.types import NodeType
from dbt.artifacts.schemas.run import RunResult, RunStatus
from dbt.clients.jinja import MacroGenerator
from dbt.context.providers import generate_runtime_model_context
@@ -16,10 +17,10 @@ from dbt_common.exceptions import CompilationError, DbtInternalError
class CloneRunner(BaseRunner):
def before_execute(self):
def before_execute(self) -> None:
pass
def after_execute(self, result):
def after_execute(self, result) -> None:
pass
def _build_run_model_result(self, model, context):
@@ -44,7 +45,7 @@ class CloneRunner(BaseRunner):
failures=None,
)
def compile(self, manifest):
def compile(self, manifest: Manifest):
# no-op
return self.node
@@ -91,7 +92,7 @@ class CloneRunner(BaseRunner):
class CloneTask(GraphRunnableTask):
def raise_on_first_error(self):
def raise_on_first_error(self) -> bool:
return False
def get_run_mode(self) -> GraphRunnableMode:
@@ -133,8 +134,8 @@ class CloneTask(GraphRunnableTask):
self.populate_adapter_cache(adapter, schemas_to_cache)
@property
def resource_types(self):
resource_types = resource_types_from_args(
def resource_types(self) -> List[NodeType]:
resource_types: Collection[NodeType] = resource_types_from_args(
self.args, set(REFABLE_NODE_TYPES), set(REFABLE_NODE_TYPES)
)
@@ -154,5 +155,5 @@ class CloneTask(GraphRunnableTask):
resource_types=resource_types,
)
def get_runner_type(self, _):
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return CloneRunner

View File

@@ -1,6 +1,8 @@
import threading
from typing import Optional, Type
from dbt.artifacts.schemas.run import RunResult, RunStatus
from dbt.contracts.graph.manifest import Manifest
from dbt.events.types import CompiledNode, ParseInlineNodeError
from dbt.graph import ResourceTypeSelector
from dbt.node_types import EXECUTABLE_NODE_TYPES, NodeType
@@ -17,10 +19,10 @@ from dbt_common.exceptions import DbtInternalError
class CompileRunner(BaseRunner):
def before_execute(self):
def before_execute(self) -> None:
pass
def after_execute(self, result):
def after_execute(self, result) -> None:
pass
def execute(self, compiled_node, manifest):
@@ -35,7 +37,7 @@ class CompileRunner(BaseRunner):
failures=None,
)
def compile(self, manifest):
def compile(self, manifest: Manifest):
return self.compiler.compile_node(self.node, manifest, {})
@@ -44,7 +46,7 @@ class CompileTask(GraphRunnableTask):
# it should be removed before the task is complete
_inline_node_id = None
def raise_on_first_error(self):
def raise_on_first_error(self) -> bool:
return True
def get_node_selector(self) -> ResourceTypeSelector:
@@ -62,10 +64,10 @@ class CompileTask(GraphRunnableTask):
resource_types=resource_types,
)
def get_runner_type(self, _):
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return CompileRunner
def task_end_messages(self, results):
def task_end_messages(self, results) -> None:
is_inline = bool(getattr(self.args, "inline", None))
output_format = getattr(self.args, "output", "text")
@@ -127,14 +129,14 @@ class CompileTask(GraphRunnableTask):
raise DbtException("Error parsing inline query")
super()._runtime_initialize()
def after_run(self, adapter, results):
def after_run(self, adapter, results) -> None:
# remove inline node from manifest
if self._inline_node_id:
self.manifest.nodes.pop(self._inline_node_id)
self._inline_node_id = None
super().after_run(adapter, results)
def _handle_result(self, result):
def _handle_result(self, result) -> None:
super()._handle_result(result)
if (

View File

@@ -481,7 +481,7 @@ class DebugTask(BaseTask):
return status
@classmethod
def validate_connection(cls, target_dict):
def validate_connection(cls, target_dict) -> None:
"""Validate a connection dictionary. On error, raises a DbtConfigError."""
target_name = "test"
# make a fake profile that we can parse

View File

@@ -96,8 +96,6 @@ class DepsTask(BaseTask):
# See GH-7615
project.project_root = str(Path(project.project_root).resolve())
self.project = project
move_to_nearest_project_dir(project.project_root)
self.cli_vars = args.vars
def track_package_install(
@@ -202,6 +200,7 @@ class DepsTask(BaseTask):
fire_event(DepsLockUpdating(lock_filepath=lock_filepath))
def run(self) -> None:
move_to_nearest_project_dir(self.args.project_dir)
if self.args.add_package:
self.add()

View File

@@ -1,7 +1,7 @@
import os
import threading
import time
from typing import AbstractSet, Dict, List, Optional
from typing import AbstractSet, Dict, List, Optional, Type
from dbt import deprecations
from dbt.adapters.base.impl import FreshnessResponse
@@ -14,6 +14,7 @@ from dbt.artifacts.schemas.freshness import (
PartialSourceFreshnessResult,
SourceFreshnessResult,
)
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import HookNode, SourceDefinition
from dbt.contracts.results import RunStatus
from dbt.events.types import FreshnessCheckComplete, LogFreshnessResult, LogStartLine
@@ -44,7 +45,7 @@ class FreshnessRunner(BaseRunner):
def on_skip(self):
raise DbtRuntimeError("Freshness: nodes cannot be skipped!")
def before_execute(self):
def before_execute(self) -> None:
description = "freshness of {0.source_name}.{0.name}".format(self.node)
fire_event(
LogStartLine(
@@ -55,7 +56,7 @@ class FreshnessRunner(BaseRunner):
)
)
def after_execute(self, result):
def after_execute(self, result) -> None:
if hasattr(result, "node"):
source_name = result.node.source_name
table_name = result.node.name
@@ -162,7 +163,7 @@ class FreshnessRunner(BaseRunner):
**freshness,
)
def compile(self, manifest):
def compile(self, manifest: Manifest):
if self.node.resource_type != NodeType.Source:
# should be unreachable...
raise DbtRuntimeError("freshness runner: got a non-Source")
@@ -184,13 +185,13 @@ class FreshnessTask(RunTask):
super().__init__(args, config, manifest)
self._metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] = {}
def result_path(self):
def result_path(self) -> str:
if self.args.output:
return os.path.realpath(self.args.output)
else:
return os.path.join(self.config.project_target_path, RESULT_FILE_NAME)
def raise_on_first_error(self):
def raise_on_first_error(self) -> bool:
return False
def get_node_selector(self):
@@ -214,7 +215,7 @@ class FreshnessTask(RunTask):
freshness_runner.set_metadata_freshness_cache(self._metadata_freshness_cache)
return freshness_runner
def get_runner_type(self, _):
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return FreshnessRunner
def get_result(self, results, elapsed_time, generated_at):
@@ -222,7 +223,7 @@ class FreshnessTask(RunTask):
elapsed_time=elapsed_time, generated_at=generated_at, results=results
)
def task_end_messages(self, results):
def task_end_messages(self, results) -> None:
for result in results:
if result.status in (
FreshnessStatus.Error,

View File

@@ -1,4 +1,5 @@
import json
from typing import Iterator, List
from dbt.cli.flags import Flags
from dbt.config.runtime import RuntimeConfig
@@ -145,7 +146,7 @@ class ListTask(GraphRunnableTask):
}
)
def generate_paths(self):
def generate_paths(self) -> Iterator[str]:
for node in self._iterate_selected_nodes():
yield node.original_file_path
@@ -177,7 +178,7 @@ class ListTask(GraphRunnableTask):
return self.node_results
@property
def resource_types(self):
def resource_types(self) -> List[NodeType]:
if self.args.models:
return [NodeType.Model]

View File

@@ -1,6 +1,7 @@
from typing import Dict
from typing import Dict, Optional
from dbt.artifacts.schemas.results import NodeStatus
from dbt.contracts.graph.nodes import Group
from dbt.events.types import (
CheckNodeTestFailure,
EndOfRunSummary,
@@ -68,7 +69,9 @@ def print_run_status_line(results) -> None:
fire_event(StatsLine(stats=stats))
def print_run_result_error(result, newline: bool = True, is_warning: bool = False) -> None:
def print_run_result_error(
result, newline: bool = True, is_warning: bool = False, group: Optional[Group] = None
) -> None:
# set node_info for logging events
node_info = None
if hasattr(result, "node") and result.node:
@@ -77,21 +80,25 @@ def print_run_result_error(result, newline: bool = True, is_warning: bool = Fals
if newline:
fire_event(Formatting(""))
if is_warning:
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultWarning(
resource_type=result.node.resource_type,
node_name=result.node.name,
path=result.node.original_file_path,
node_info=node_info,
group=group_dict,
)
)
else:
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultFailure(
resource_type=result.node.resource_type,
node_name=result.node.name,
path=result.node.original_file_path,
node_info=node_info,
group=group_dict,
)
)
@@ -99,7 +106,10 @@ def print_run_result_error(result, newline: bool = True, is_warning: bool = Fals
if is_warning:
fire_event(RunResultWarningMessage(msg=result.message, node_info=node_info))
else:
fire_event(RunResultError(msg=result.message, node_info=node_info))
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultError(msg=result.message, node_info=node_info, group=group_dict)
)
else:
fire_event(RunResultErrorNoMessage(status=result.status, node_info=node_info))
@@ -119,10 +129,13 @@ def print_run_result_error(result, newline: bool = True, is_warning: bool = Fals
elif result.message is not None:
if newline:
fire_event(Formatting(""))
fire_event(RunResultError(msg=result.message, node_info=node_info))
group_dict = group.to_logging_dict() if group else None
fire_event(RunResultError(msg=result.message, node_info=node_info, group=group_dict))
def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None:
def print_run_end_messages(
results, keyboard_interrupt: bool = False, groups: Optional[Dict[str, Group]] = None
) -> None:
errors, warnings = [], []
for r in results:
if r.status in (NodeStatus.RuntimeErr, NodeStatus.Error, NodeStatus.Fail):
@@ -144,9 +157,11 @@ def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None:
)
for error in errors:
print_run_result_error(error, is_warning=False)
group = groups.get(error.node.unique_id) if groups and hasattr(error, "node") else None
print_run_result_error(error, is_warning=False, group=group)
for warning in warnings:
print_run_result_error(warning, is_warning=True)
group = groups.get(warning.node.unique_id) if groups and hasattr(warning, "node") else None
print_run_result_error(warning, is_warning=True, group=group)
print_run_status_line(results)

View File

@@ -1,8 +1,8 @@
import functools
import threading
import time
from datetime import datetime, timedelta
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple
from datetime import datetime
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type
from dbt import tracking, utils
from dbt.adapters.base import BaseRelation
@@ -36,6 +36,7 @@ from dbt.exceptions import CompilationError, DbtInternalError, DbtRuntimeError
from dbt.graph import ResourceTypeSelector
from dbt.hooks import get_hook_dict
from dbt.node_types import NodeType, RunHookType
from dbt.task.base import BaseRunner
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.events.base_types import EventLevel
from dbt_common.events.contextvars import log_contextvars
@@ -179,7 +180,7 @@ class ModelRunner(CompileRunner):
relation = relation.include(database=False)
return str(relation)
def describe_node(self):
def describe_node(self) -> str:
# TODO CL 'language' will be moved to node level when we change representation
return f"{self.node.language} {self.node.get_materialization()} model {self.get_node_representation()}"
@@ -213,18 +214,10 @@ class ModelRunner(CompileRunner):
level=level,
)
def before_execute(self):
if self.node.config.get("microbatch"):
# TODO: actually use partition_grain
# partition_grain = self.node.config.get("partition_grain")
lookback = self.node.config.get("lookback")
self.node.end_time = datetime.now()
self.node.start_time = self.node.end_time - timedelta(days=lookback)
self.node.start_time.replace(minute=0, hour=0, second=0, microsecond=0)
def before_execute(self) -> None:
self.print_start_line()
def after_execute(self, result):
def after_execute(self, result) -> None:
track_model_run(self.node_index, self.num_nodes, result)
self.print_result_line(result)
@@ -480,9 +473,20 @@ class RunTask(CompileTask):
resource_types=[NodeType.Model],
)
def get_runner_type(self, _):
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return ModelRunner
def get_groups_for_nodes(self, nodes):
node_to_group_name_map = {i: k for k, v in self.manifest.group_map.items() for i in v}
group_name_to_group_map = {v.name: v for v in self.manifest.groups.values()}
return {
node.unique_id: group_name_to_group_map.get(node_to_group_name_map.get(node.unique_id))
for node in nodes
}
def task_end_messages(self, results) -> None:
groups = self.get_groups_for_nodes([r.node for r in results if hasattr(r, "node")])
if results:
print_run_end_messages(results)
print_run_end_messages(results, groups=groups)

View File

@@ -5,7 +5,7 @@ from concurrent.futures import as_completed
from datetime import datetime
from multiprocessing.dummy import Pool as ThreadPool
from pathlib import Path
from typing import AbstractSet, Dict, Iterable, List, Optional, Set, Tuple, Union
from typing import AbstractSet, Dict, Iterable, List, Optional, Set, Tuple, Type, Union
import dbt.exceptions
import dbt.tracking
@@ -181,13 +181,13 @@ class GraphRunnableTask(ConfiguredTask):
self.num_nodes = len([n for n in self._flattened_nodes if not n.is_ephemeral_model])
def raise_on_first_error(self):
def raise_on_first_error(self) -> bool:
return False
def get_runner_type(self, node):
def get_runner_type(self, node) -> Optional[Type[BaseRunner]]:
raise NotImplementedError("Not Implemented")
def result_path(self):
def result_path(self) -> str:
return os.path.join(self.config.project_target_path, RESULT_FILE_NAME)
def get_runner(self, node) -> BaseRunner:
@@ -204,6 +204,10 @@ class GraphRunnableTask(ConfiguredTask):
num_nodes = self.num_nodes
cls = self.get_runner_type(node)
if cls is None:
raise DbtInternalError("Could not find runner type for node.")
return cls(self.config, adapter, node, run_count, num_nodes)
def call_runner(self, runner: BaseRunner) -> RunResult:
@@ -334,7 +338,7 @@ class GraphRunnableTask(ConfiguredTask):
args = [runner]
self._submit(pool, args, callback)
def _handle_result(self, result: RunResult):
def _handle_result(self, result: RunResult) -> None:
"""Mark the result as completed, insert the `CompileResultNode` into
the manifest, and mark any descendants (potentially with a 'cause' if
the result was an ephemeral model) as skipped.
@@ -479,7 +483,7 @@ class GraphRunnableTask(ConfiguredTask):
self.defer_to_manifest()
self.populate_adapter_cache(adapter)
def after_run(self, adapter, results):
def after_run(self, adapter, results) -> None:
pass
def print_results_line(self, node_results, elapsed):
@@ -659,7 +663,7 @@ class GraphRunnableTask(ConfiguredTask):
args=dbt.utils.args_to_dict(self.args),
)
def task_end_messages(self, results):
def task_end_messages(self, results) -> None:
print_run_end_messages(results)
def _get_previous_state(self) -> Optional[Manifest]:

View File

@@ -1,9 +1,12 @@
import random
from typing import Optional, Type
from dbt.artifacts.schemas.results import NodeStatus, RunStatus
from dbt.contracts.graph.manifest import Manifest
from dbt.events.types import LogSeedResult, LogStartLine, SeedHeader
from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.task.base import BaseRunner
from dbt_common.events.base_types import EventLevel
from dbt_common.events.functions import fire_event
from dbt_common.events.types import Formatting
@@ -14,10 +17,10 @@ from .run import ModelRunner, RunTask
class SeedRunner(ModelRunner):
def describe_node(self):
def describe_node(self) -> str:
return "seed file {}".format(self.get_node_representation())
def before_execute(self):
def before_execute(self) -> None:
fire_event(
LogStartLine(
description=self.describe_node(),
@@ -33,7 +36,7 @@ class SeedRunner(ModelRunner):
result.agate_table = agate_result.table
return result
def compile(self, manifest):
def compile(self, manifest: Manifest):
return self.node
def print_result_line(self, result):
@@ -55,7 +58,7 @@ class SeedRunner(ModelRunner):
class SeedTask(RunTask):
def raise_on_first_error(self):
def raise_on_first_error(self) -> bool:
return False
def get_node_selector(self):
@@ -68,10 +71,10 @@ class SeedTask(RunTask):
resource_types=[NodeType.Seed],
)
def get_runner_type(self, _):
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return SeedRunner
def task_end_messages(self, results):
def task_end_messages(self, results) -> None:
if self.args.show:
self.show_tables(results)

View File

@@ -67,7 +67,7 @@ class ShowTask(CompileTask):
else:
return ShowRunner
def task_end_messages(self, results):
def task_end_messages(self, results) -> None:
is_inline = bool(getattr(self.args, "inline", None))
if is_inline:
@@ -108,7 +108,7 @@ class ShowTask(CompileTask):
)
)
def _handle_result(self, result):
def _handle_result(self, result) -> None:
super()._handle_result(result)
if (

View File

@@ -1,7 +1,10 @@
from typing import Optional, Type
from dbt.artifacts.schemas.results import NodeStatus
from dbt.events.types import LogSnapshotResult
from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.task.base import BaseRunner
from dbt_common.events.base_types import EventLevel
from dbt_common.events.functions import fire_event
from dbt_common.exceptions import DbtInternalError
@@ -11,7 +14,7 @@ from .run import ModelRunner, RunTask
class SnapshotRunner(ModelRunner):
def describe_node(self):
def describe_node(self) -> str:
return "snapshot {}".format(self.get_node_representation())
def print_result_line(self, result):
@@ -34,7 +37,7 @@ class SnapshotRunner(ModelRunner):
class SnapshotTask(RunTask):
def raise_on_first_error(self):
def raise_on_first_error(self) -> bool:
return False
def get_node_selector(self):
@@ -47,5 +50,5 @@ class SnapshotTask(RunTask):
resource_types=[NodeType.Snapshot],
)
def get_runner_type(self, _):
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return SnapshotRunner

View File

@@ -5,6 +5,7 @@ from typing import Generic, TypeVar
import dbt.exceptions
import dbt_common.exceptions.base
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.sql import (
RemoteCompileResult,
RemoteCompileResultMixin,
@@ -28,18 +29,19 @@ class GenericSqlRunner(CompileRunner, Generic[SQLResult]):
exc=str(e), exc_info=traceback.format_exc(), node_info=self.node.node_info
)
)
# REVIEW: This code is invalid and will always throw.
if isinstance(e, dbt.exceptions.Exception):
if isinstance(e, dbt_common.exceptions.DbtRuntimeError):
e.add_node(ctx.node)
return e
def before_execute(self):
def before_execute(self) -> None:
pass
def after_execute(self, result):
def after_execute(self, result) -> None:
pass
def compile(self, manifest):
def compile(self, manifest: Manifest):
return self.compiler.compile_node(self.node, manifest, {}, write=False)
@abstractmethod

View File

@@ -3,7 +3,7 @@ import json
import re
import threading
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union
import daff
@@ -27,6 +27,7 @@ from dbt.flags import get_flags
from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.parser.unit_tests import UnitTestManifestLoader
from dbt.task.base import BaseRunner
from dbt.utils import _coerce_decimal, strtobool
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.events.format import pluralize
@@ -84,14 +85,14 @@ class TestRunner(CompileRunner):
_ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
_LOG_TEST_RESULT_EVENTS = LogTestResult
def describe_node_name(self):
def describe_node_name(self) -> str:
if self.node.resource_type == NodeType.Unit:
name = f"{self.node.model}::{self.node.versioned_name}"
return name
else:
return self.node.name
def describe_node(self):
def describe_node(self) -> str:
return f"{self.node.resource_type} {self.describe_node_name()}"
def print_result_line(self, result):
@@ -120,7 +121,7 @@ class TestRunner(CompileRunner):
)
)
def before_execute(self):
def before_execute(self) -> None:
self.print_start_line()
def execute_data_test(self, data_test: TestNode, manifest: Manifest) -> TestResultData:
@@ -334,7 +335,7 @@ class TestRunner(CompileRunner):
failures=failures,
)
def after_execute(self, result):
def after_execute(self, result) -> None:
self.print_result_line(result)
def _get_unit_test_agate_table(self, result_table, actual_or_expected: str):
@@ -393,7 +394,7 @@ class TestTask(RunTask):
__test__ = False
def raise_on_first_error(self):
def raise_on_first_error(self) -> bool:
return False
def get_node_selector(self) -> TestSelector:
@@ -405,7 +406,7 @@ class TestTask(RunTask):
previous_state=self.previous_state,
)
def get_runner_type(self, _):
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return TestRunner

View File

@@ -91,6 +91,7 @@ def run_dbt(
if profiles_dir and "--profiles-dir" not in args:
args.extend(["--profiles-dir", profiles_dir])
dbt = dbtRunner()
res = dbt.invoke(args)
# the exception is immediately raised to be caught in tests
@@ -148,7 +149,7 @@ def get_manifest(project_root) -> Optional[Manifest]:
if os.path.exists(path):
with open(path, "rb") as fp:
manifest_mp = fp.read()
manifest: Manifest = Manifest.from_msgpack(manifest_mp)
manifest: Manifest = Manifest.from_msgpack(manifest_mp) # type: ignore[attr-defined]
return manifest
else:
return None

View File

@@ -49,7 +49,10 @@ def get_latest_version(
return semver.VersionSpecifier.from_version_string(version_string)
def _get_core_msg_lines(installed, latest) -> Tuple[List[List[str]], str]:
def _get_core_msg_lines(
installed: semver.VersionSpecifier,
latest: Optional[semver.VersionSpecifier],
) -> Tuple[List[List[str]], str]:
installed_s = installed.to_version_string(skip_matcher=True)
installed_line = ["installed", installed_s, ""]
update_info = ""
@@ -208,7 +211,7 @@ def _get_dbt_plugins_info() -> Iterator[Tuple[str, str]]:
except ImportError:
# not an adapter
continue
yield plugin_name, mod.version # type: ignore
yield plugin_name, mod.version
def _get_adapter_plugin_names() -> Iterator[str]:

View File

@@ -59,6 +59,7 @@ setup(
"networkx>=2.3,<4.0",
"protobuf>=4.0.0,<5",
"requests<3.0.0", # should match dbt-common
"snowplow-tracker>=1.0.2,<2.0",
# ----
# These packages are major-version-0. Keep upper bounds on upcoming minor versions (which could have breaking changes)
# and check compatibility / bump in each new minor version of dbt-core.
@@ -68,11 +69,10 @@ setup(
# These are major-version-0 packages also maintained by dbt-labs.
# Accept patches but avoid automatically updating past a set minor version range.
"dbt-extractor>=0.5.0,<=0.6",
"minimal-snowplow-tracker>=0.0.2,<0.1",
"dbt-semantic-interfaces>=0.6.8,<0.7",
"dbt-semantic-interfaces>=0.7.0,<0.8",
# Minor versions for these are expected to be backwards-compatible
"dbt-common>=1.3.0,<2.0",
"dbt-adapters>=1.1.1,<2.0",
"dbt-common>=1.6.0,<2.0",
"dbt-adapters>=1.3.0,<2.0",
# ----
# Expect compatibility with all new versions of these packages, so lower bounds only.
"packaging>20.9",

View File

@@ -8,7 +8,7 @@ RUN apt-get update \
build-essential=12.9 \
ca-certificates=20210119 \
git=1:2.30.2-1+deb11u2 \
libpq-dev=13.14-0+deb11u1 \
libpq-dev=13.16-0+deb11u1 \
make=4.3-4.1 \
openssh-client=1:8.4p1-5+deb11u3 \
software-properties-common=0.96.20.2-2.1 \

View File

@@ -5689,4 +5689,4 @@
},
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://schemas.getdbt.com/dbt/manifest/v10.json"
}
}

View File

@@ -7060,4 +7060,4 @@
}
},
"$id": "https://schemas.getdbt.com/dbt/manifest/v11.json"
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -5981,4 +5981,4 @@
},
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://schemas.getdbt.com/dbt/manifest/v5.json"
}
}

View File

@@ -6206,4 +6206,4 @@
},
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://schemas.getdbt.com/dbt/manifest/v6.json"
}
}

View File

@@ -6572,4 +6572,4 @@
},
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://schemas.getdbt.com/dbt/manifest/v7.json"
}
}

View File

@@ -4431,4 +4431,4 @@
},
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://schemas.getdbt.com/dbt/manifest/v8.json"
}
}

View File

@@ -4962,4 +4962,4 @@
},
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://schemas.getdbt.com/dbt/manifest/v9.json"
}
}

View File

@@ -5,6 +5,8 @@ env | grep '^PG'
# If you want to run this script for your own postgresql (run with
# docker-compose) it will look like this:
# PGHOST=127.0.0.1 PGUSER=root PGPASSWORD=password PGDATABASE=postgres \
PG_CONNECTION_URI="${CONNECTION_URI}"
export PG_CONNECTION_URI
PGUSER="${PGUSER:-postgres}"
export PGUSER
PGPORT="${PGPORT:-5432}"
@@ -15,11 +17,11 @@ function connect_circle() {
# try to handle circleci/docker oddness
let rc=1
while [[ $rc -eq 1 ]]; do
nc -z ${PGHOST} ${PGPORT}
nc -z ${PG_CONNECTION_URI}
let rc=$?
done
if [[ $rc -ne 0 ]]; then
echo "Fatal: Could not connect to $PGHOST"
echo "Fatal: Could not connect to $PG_CONNECTION_URI"
exit 1
fi
}
@@ -30,7 +32,7 @@ if [[ -n $CIRCLECI ]]; then
fi
for i in {1..10}; do
if pg_isready -h "${PGHOST}" -p "${PGPORT}" -U "${PGUSER}" ; then
if pg_isready --d "${PG_CONNECTION_URI}"; then
break
fi

View File

@@ -27,9 +27,9 @@ select 1 as col
"""
macros__before_and_after = """
{% macro custom_run_hook(state, target, run_started_at, invocation_id) %}
{% macro custom_run_hook(state, target, run_started_at, invocation_id, table_name="on_run_hook") %}
insert into {{ target.schema }}.on_run_hook (
insert into {{ target.schema }}.{{ table_name }} (
test_state,
target_dbname,
target_host,
@@ -355,6 +355,26 @@ snapshots:
- not_null
"""
properties__model_hooks = """
version: 2
models:
- name: hooks
config:
pre_hook: "{{ custom_run_hook('start', target, run_started_at, invocation_id, table_name='on_model_hook') }}"
post_hook: "{{ custom_run_hook('end', target, run_started_at, invocation_id, table_name='on_model_hook') }}"
"""
properties__model_hooks_list = """
version: 2
models:
- name: hooks
config:
pre_hook:
- "{{ custom_run_hook('start', target, run_started_at, invocation_id, table_name='on_model_hook') }}"
post_hook:
- "{{ custom_run_hook('end', target, run_started_at, invocation_id, table_name='on_model_hook') }}"
"""
seeds__example_seed_csv = """a,b,c
1,2,3
4,5,6

View File

@@ -6,6 +6,7 @@ from dbt.exceptions import ParsingError
from dbt.tests.util import run_dbt, write_file
from dbt_common.exceptions import CompilationError
from tests.functional.adapter.hooks.fixtures import (
macros__before_and_after,
models__hooked,
models__hooks,
models__hooks_configured,
@@ -13,6 +14,8 @@ from tests.functional.adapter.hooks.fixtures import (
models__hooks_kwargs,
models__post,
models__pre,
properties__model_hooks,
properties__model_hooks_list,
properties__seed_models,
properties__test_snapshot_models,
seeds__example_seed_csv,
@@ -257,6 +260,27 @@ class TestPrePostModelHooksOnSeeds(object):
assert len(res) == 1, "Expected exactly one item"
class TestPrePostModelHooksWithMacros(BaseTestPrePost):
@pytest.fixture(scope="class")
def macros(self):
return {"before-and-after.sql": macros__before_and_after}
@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": properties__model_hooks, "hooks.sql": models__hooks}
def test_pre_and_post_run_hooks(self, project, dbt_profile_target):
run_dbt()
self.check_hooks("start", project, dbt_profile_target.get("host", None))
self.check_hooks("end", project, dbt_profile_target.get("host", None))
class TestPrePostModelHooksListWithMacros(TestPrePostModelHooksWithMacros):
@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": properties__model_hooks_list, "hooks.sql": models__hooks}
class TestHooksRefsOnSeeds:
"""
This should not succeed, and raise an explicit error

View File

@@ -292,6 +292,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"first_name": {
"name": "first_name",
@@ -301,6 +302,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"email": {
"name": "email",
@@ -310,6 +312,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"ip_address": {
"name": "ip_address",
@@ -319,6 +322,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"updated_at": {
"name": "updated_at",
@@ -328,6 +332,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
},
"contract": {"checksum": None, "enforced": False, "alias_types": True},
@@ -343,6 +348,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"access": "protected",
"version": None,
"latest_version": None,
"time_spine": None,
},
"model.test.second_model": {
"compiled_path": os.path.join(compiled_model_path, "second_model.sql"),
@@ -385,6 +391,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"first_name": {
"name": "first_name",
@@ -394,6 +401,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"email": {
"name": "email",
@@ -403,6 +411,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"ip_address": {
"name": "ip_address",
@@ -412,6 +421,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"updated_at": {
"name": "updated_at",
@@ -421,6 +431,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
},
"contract": {"checksum": None, "enforced": False, "alias_types": True},
@@ -436,6 +447,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"access": "protected",
"version": None,
"latest_version": None,
"time_spine": None,
},
"seed.test.seed": {
"build_path": None,
@@ -468,6 +480,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"first_name": {
"name": "first_name",
@@ -477,6 +490,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"email": {
"name": "email",
@@ -486,6 +500,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"ip_address": {
"name": "ip_address",
@@ -495,6 +510,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"updated_at": {
"name": "updated_at",
@@ -504,6 +520,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
},
"docs": {"node_color": None, "show": True},
@@ -730,6 +747,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
}
},
"config": {
@@ -957,6 +975,7 @@ def expected_references_manifest(project):
"version": None,
"latest_version": None,
"constraints": [],
"time_spine": None,
},
"model.test.ephemeral_summary": {
"alias": "ephemeral_summary",
@@ -972,6 +991,7 @@ def expected_references_manifest(project):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"ct": {
"description": "The number of instances of the first name",
@@ -981,6 +1001,7 @@ def expected_references_manifest(project):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
},
"config": get_rendered_model_config(materialized="table", group="test_group"),
@@ -1026,6 +1047,7 @@ def expected_references_manifest(project):
"version": None,
"latest_version": None,
"constraints": [],
"time_spine": None,
},
"model.test.view_summary": {
"alias": "view_summary",
@@ -1041,6 +1063,7 @@ def expected_references_manifest(project):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"ct": {
"description": "The number of instances of the first name",
@@ -1050,6 +1073,7 @@ def expected_references_manifest(project):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
},
"config": get_rendered_model_config(),
@@ -1091,6 +1115,7 @@ def expected_references_manifest(project):
"version": None,
"latest_version": None,
"constraints": [],
"time_spine": None,
},
"seed.test.seed": {
"alias": "seed",
@@ -1105,6 +1130,7 @@ def expected_references_manifest(project):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"first_name": {
"name": "first_name",
@@ -1114,6 +1140,7 @@ def expected_references_manifest(project):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"email": {
"name": "email",
@@ -1123,6 +1150,7 @@ def expected_references_manifest(project):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"ip_address": {
"name": "ip_address",
@@ -1132,6 +1160,7 @@ def expected_references_manifest(project):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"updated_at": {
"name": "updated_at",
@@ -1141,6 +1170,7 @@ def expected_references_manifest(project):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
},
"config": get_rendered_seed_config(),
@@ -1219,6 +1249,7 @@ def expected_references_manifest(project):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
}
},
"config": {
@@ -1487,6 +1518,7 @@ def expected_versions_manifest(project):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"ct": {
"description": "The number of instances of the first name",
@@ -1496,6 +1528,7 @@ def expected_versions_manifest(project):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
},
"config": get_rendered_model_config(
@@ -1544,6 +1577,7 @@ def expected_versions_manifest(project):
"access": "protected",
"version": 1,
"latest_version": 2,
"time_spine": None,
},
"model.test.versioned_model.v2": {
"alias": "versioned_model_v2",
@@ -1559,6 +1593,7 @@ def expected_versions_manifest(project):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
"extra": {
"description": "",
@@ -1568,6 +1603,7 @@ def expected_versions_manifest(project):
"quote": None,
"tags": [],
"constraints": [],
"granularity": None,
},
},
"config": get_rendered_model_config(
@@ -1612,6 +1648,7 @@ def expected_versions_manifest(project):
"access": "protected",
"version": 2,
"latest_version": 2,
"time_spine": None,
},
"model.test.ref_versioned_model": {
"alias": "ref_versioned_model",
@@ -1669,6 +1706,7 @@ def expected_versions_manifest(project):
"access": "protected",
"version": None,
"latest_version": None,
"time_spine": None,
},
"test.test.unique_versioned_model_v1_first_name.6138195dec": {
"alias": "unique_versioned_model_v1_first_name",

View File

@@ -42,6 +42,15 @@ with recursive t(n) as (
select sum(n) from t;
"""
first_ephemeral_model_with_alias_sql = """
{{ config(materialized = 'ephemeral', alias = 'first_alias') }}
select 1 as fun
"""
second_ephemeral_model_with_alias_sql = """
select * from {{ ref('first_ephemeral_model_with_alias') }}
"""
schema_yml = """
version: 2

Some files were not shown because too many files have changed in this diff Show More