Compare commits

...

58 Commits

Author SHA1 Message Date
Michelle Ark
b9f893a291 postgres as service 2024-09-10 15:09:06 -04:00
Mike Alfare
1d3d315249 Add flags from dbt_project.yml to the Project and RuntimeConfig objects (#10644)
* add flags from dbt_project.yml to the Project and RuntimeConfig objects
2024-09-06 15:42:29 -04:00
Gerda Shank
b35ad46e3f Remove deprecation warning to change "tests:" config to "data_tests:" (#10670) 2024-09-05 20:35:28 -04:00
Gerda Shank
c28cb92af5 Warn if timestamp updated_at field uses incompatible timestamp (#10352)
Co-authored-by: Michelle Ark <michelle.ark@dbtlabs.com>
2024-09-04 14:42:14 -04:00
RyoAriyama
b56d96df5e Fix/changes current working dir when using a dbt project dir (#9596)
* made class changing directory a context manager.

* add change log

* fix conflict

* made base as a context manager

* add assertion

* Remove index.html

* add it test to testDbtRunner

* fix deps args order

* fix test

---------

Co-authored-by: Doug Beatty <doug.beatty@dbtlabs.com>
Co-authored-by: Chenyu Li <chenyu.li@dbtlabs.com>
2024-09-03 13:41:53 -07:00
Jeremy Cohen
37d382c8e7 Filter out empty nodes after graph selection (#10580)
* Add unit test

* Filter out empty nodes after graph selection

* Add changie

* Add --indirect-selection empty check to unit test
2024-09-03 18:48:03 +02:00
Gerda Shank
9b7f4ff842 use full manifest in adapter instead of macro_manifest (#10609)
* use full manifest in adapter instead of macro_manifest

* Add test case

* Add changelog entry

* Remove commented code.

---------

Co-authored-by: Peter Allen Webb <peter.webb@dbtlabs.com>
2024-08-29 11:32:30 -04:00
Emily Rockman
555ff8091f update dep for psycopg (#10633) 2024-08-29 09:44:49 -05:00
Emily Rockman
98fddcf54f rework test to ignore utils version (#10625) 2024-08-28 15:25:09 -05:00
Emily Rockman
d652359c61 add typing (#10619) 2024-08-28 13:26:18 -05:00
Peter Webb
f7d21e012e Add More Typing to the dbt.task Module (#10622)
* Add typing to task module.

* More typing in the task module

* Still more types for task module
2024-08-28 11:18:01 -04:00
Quigley Malcolm
e1fa461186 [TIDY FIRST] Fix typing issues in dbt/core/tasks/clean.py (#10617) 2024-08-27 17:37:16 -05:00
Quigley Malcolm
1153597970 Fix typing errors in core/dbt/contracts/sql.py (#10615) 2024-08-27 17:37:00 -05:00
Quigley Malcolm
09f9febc25 [TIDY FIRST] Fix core/dbt/version.py type hinting (#10613) 2024-08-27 17:36:31 -05:00
Doug Beatty
22181409f6 Enable calling a macro in a pre- or post-hook config in properties.yml (#10603)
* Tests for calling a macro in a pre- or post-hook config in properties.yml

* Late render pre- and post-hooks configs in properties / schema YAML files

* Changelog entry
2024-08-27 11:08:56 -06:00
William Deng
f25a474f75 updated saved query tests and fixtures (#10610) 2024-08-26 17:39:35 -04:00
aliceliu
3c55806203 Fix state:modified check for exports (#10565) 2024-08-23 15:22:38 -04:00
Gerda Shank
bba020fcc0 Add test for source names with quotes (#10588) 2024-08-21 11:57:34 -04:00
Courtney Holcomb
84eb0ff672 Bump DSI version (#10585)
* Bump DSI version

* Changelog
2024-08-20 16:37:52 -04:00
Kshitij Aranke
3695698e22 [CORE-364] Add group info to RunResultError, RunResultFailure, RunResultWarning log lines (#10535) 2024-08-19 11:26:00 -07:00
Courtney Holcomb
9ca1bc5b4c Remove unneeded TODO (#10568) 2024-08-14 14:49:47 -07:00
Gerda Shank
5f66678f6d Incremental models with a contract don't need their columns modified (#10371) 2024-08-14 08:15:25 -07:00
Jean Cochrane
63262e93cb Use model alias for the CTE identifier generated during ephemeral materialization (#10290)
* Use alias instead of name when adding ephemeral model prefixes

* Adjust TestCustomSchemaWithCustomMacroFromModelName to test ephemeral models

* Add changelog entry for ephemeral model CTE identifier fix

* Reference model.identifier and model.name where appropriate to resolve typing errors

* Move test for ephemeral model with alias to dedicated test in test_compile.py
2024-08-09 15:00:55 -07:00
Tobie Tusing
374412af53 Improve tree traversal of select_children (#10526)
* update children search

* update search to include children in original selector

* add changie

* remove unused function

* fix wrong function call

* fix depth
2024-08-09 17:38:15 -04:00
Kshitij Aranke
47848b8ea8 Fix add_ephemeral_prefix to identifier instead of name (#10550) 2024-08-09 13:58:37 -05:00
Michelle Ark
3d09872a56 reset deprecations prior to usage in unit tests (#10545) 2024-08-08 12:25:22 -04:00
Colin Rogers
dfa7d06526 Revert "Remove Undocumented Property" (#10544)
* Revert "Remove undocumented property which does not pass mypy checks after an…"

This reverts commit 21a46332f1.

* add code comment
2024-08-07 20:01:55 -07:00
aliceliu
7f57dd5a30 Support using the measure label when using create_metric option (#10536) 2024-08-07 15:06:46 -05:00
Peter Webb
56bfbeaedd Depend on snowplow-tracker rather than our old fork, minimal-snowplow-tracker. (#10530) 2024-08-07 14:55:42 -04:00
Michelle Ark
1dd26e79af deprecations.buffer: respect --quiet and --warn-error-options for deprecations (#10534) 2024-08-07 11:16:40 -04:00
Quigley Malcolm
86223609dd Parameterized testing examples utilizing happy path fixture (#10480)
* sketch

* Bring back the happy path fixture snapshot file

The commit c783a86 removed the snapshot file from the happy path fixture.
This was done because the snapshot was breaking the tests we were adding,
`test_run_commands`. However this broke `test_ls` in `test_list.py`. In order
to move forward, we need everything to be working. Maybe the idea was to delete
the `test_list.py` file, however that is not noted anywhere, and was not done.
Thus this commit ensures that test is not broken nor or new tests.

* Create conftest for `functional` tests so that happy path fixtures are accessible

* Format `test_commands.py` and update imports to appease pre-commit hooks

* Parametrize `test_run_command` to make it easier to see which command is failing (if any)

* Update the setup for `TestRunCommands.test_run_command` to be more formulaic

* Add test to ensure resource types are selectable

* Fix docstring formatting in TestRunCommands

* Fixup documentation for test_commands.py

---------

Co-authored-by: Chenyu Li <chenyu.li@dbtlabs.com>
2024-08-07 10:05:25 -05:00
Peter Webb
21a46332f1 Remove undocumented property which does not pass mypy checks after annotations in dbt-common. (#10529) 2024-08-06 11:04:54 -04:00
Michelle Ark
ff2726c3b5 more defensive node.all_constraints access (#10508) 2024-07-31 20:02:27 -04:00
Courtney Holcomb
014444dc18 Bump DSI version to release new time spine validations (#10507) 2024-07-30 15:52:40 -07:00
Kshitij Aranke
25c2042dc9 Bump dbt-adapters to 1.3.0 (#10499) 2024-07-29 18:57:24 +01:00
Courtney Holcomb
0a160fc27a Support time spine configs for sub-daily granularity (#10483) 2024-07-29 13:39:39 -04:00
Michelle Ark
c598741262 Bump dbt common 1.6 (#10489) 2024-07-26 13:51:34 -04:00
Courtney Holcomb
f9c2b9398f Remove newlines from JSON schema files (#10486) 2024-07-26 13:36:13 -04:00
Michelle Ark
cab6dabbc7 parse + compile constraint.to and constraint.to_columns on foreign key constraints (#10414) 2024-07-25 10:56:17 -04:00
nakamichi
e1621ebc54 Fix typing for artifact schemas (#10443) 2024-07-24 18:22:21 -04:00
Michelle Ark
cd90d4493c add predicate to EventCatcher test util (#10482) 2024-07-23 17:55:30 -04:00
Kshitij Aranke
560d151dcd [Tidy First] Update PR template punctuation (#10479) 2024-07-23 19:32:52 +01:00
Doug Beatty
229c537748 Update pytest examples for contributors (#10478) 2024-07-23 11:42:31 -06:00
Mila Page
79ad0a3243 Support for redshift 821 (#10448)
* Add breakpoint.

* Move breakpoint.

* Add fix

* Add changelog.

* Avoid sorting for the string case.

* Add unit test.

* Fix test.

* add good unit tests for coverage of sort method.

* add sql format coverage.

* Modify behavior to log a warning and proceed.

* code review comments.

---------

Co-authored-by: Mila Page <versusfacit@users.noreply.github.com>
2024-07-22 13:54:46 -07:00
Gerda Shank
c668846404 Modify snapshot config to allow using schema/database/alias macros (#10439) 2024-07-18 15:56:51 -04:00
igorvoltaic
c4958de166 test: cover additional listagg cases (#10445)
* test: cover addidtional listagg cases

* fix embarrasement

---------

Co-authored-by: Doug Beatty <doug.beatty@dbtlabs.com>
2024-07-18 10:42:22 -04:00
Gerda Shank
33161a3035 Fix warn error options being invalid when warn or error set to none (#10453)
* Fix exclusive_primary_alt_value_setting to set warn_error_options correctly

* Add test

* Changie

* Fix unit test

* Replace conversion method

* Refactor normalize_warn_error_options
2024-07-17 14:13:34 -07:00
Gerda Shank
471b816dcd Fix multiple semantic models with generated metrics (#10451) 2024-07-17 16:09:06 -04:00
Michelle Ark
bef2d20c21 bump black in dev-requirements and pre-commit-config (#10407) 2024-07-17 12:01:14 -04:00
Michelle Ark
2a26fabfdf [tidy first] generate protos w protoc version 5.26.1 (#10456) 2024-07-17 10:35:38 -04:00
Courtney Holcomb
4c7d922a6d Add Metric.time_granularity to metric spec (#10378) 2024-07-16 13:35:20 -04:00
Michelle Ark
b03291548a update test__model_contract_true (#10449) 2024-07-16 11:32:00 -04:00
Chenyu Li
a7af3b3831 Revert "Support for redshift 821" (#10446) 2024-07-15 17:21:38 +01:00
Mila Page
6e4564ab05 Support for redshift 821 (#10366)
* Add changelog.

* Avoid sorting for the string case.

* add good unit tests for coverage of sort method.

* add sql format coverage.

---------

Co-authored-by: Mila Page <versusfacit@users.noreply.github.com>
2024-07-12 14:55:34 -07:00
Mirna Wong
1aeff2c58f GitHub issue template for documentation internal to dbt-core (#10404) 2024-07-11 18:38:25 +01:00
Gerda Shank
601fee0d5f Global CLI flag should override env var flag (#10423) 2024-07-10 15:25:19 -04:00
Emily Rockman
88b8b10df1 Update README.md (#10420)
* Update README.md

* update workflow
2024-07-08 20:44:36 -05:00
Gerda Shank
4ea0e1007c [Tidy first] Add some mypy types to parser files (#10380) 2024-07-08 10:19:39 -04:00
199 changed files with 4786 additions and 814 deletions

View File

@@ -0,0 +1,6 @@
kind: Breaking Changes
body: Fix changing the current working directory when using dpt deps, clean and init.
time: 2023-12-06T19:24:42.575372+09:00
custom:
Author: rariyama
Issue: "8997"

View File

@@ -0,0 +1,7 @@
kind: Dependencies
body: Increase supported version range for dbt-semantic-interfaces. Needed to support
custom calendar features.
time: 2024-08-20T13:19:09.015225-07:00
custom:
Author: courtneyholcomb
Issue: "9265"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Warning message for snapshot timestamp data types
time: 2024-06-21T14:16:35.717637-04:00
custom:
Author: gshank
Issue: "10234"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Add time_granularity to metric spec.
time: 2024-06-27T16:29:53.500917-07:00
custom:
Author: courtneyholcomb
Issue: "10376"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Support standard schema/database fields for snapshots
time: 2024-07-12T21:45:46.06011-04:00
custom:
Author: gshank
Issue: "10301"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Support ref and source in foreign key constraint expressions, bump dbt-common minimum to 1.6
time: 2024-07-19T16:18:41.434278-04:00
custom:
Author: michelleark
Issue: "8062"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Support new semantic layer time spine configs to enable sub-daily granularity.
time: 2024-07-22T20:22:38.258249-07:00
custom:
Author: courtneyholcomb
Issue: "10475"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Add support for behavior flags
time: 2024-08-29T13:53:20.16122-04:00
custom:
Author: mikealfare
Issue: "10618"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Use model alias for the CTE identifier generated during ephemeral materialization
time: 2024-06-10T20:05:22.510814008Z
custom:
Author: jeancochrane
Issue: "5273"

View File

@@ -0,0 +1,7 @@
kind: Fixes
body: Attempt to provide test fixture tables with all values to set types correctly
for comparisong with source tables
time: 2024-06-25T17:17:37.514619-07:00
custom:
Author: versusfacit
Issue: "10365"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: CLI flags should take precedence over env var flags
time: 2024-07-09T17:24:40.918977-04:00
custom:
Author: gshank
Issue: "10304"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Fix typing for artifact schemas
time: 2024-07-14T10:02:54.452099+09:00
custom:
Author: nakamichiworks
Issue: "10442"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Fix over deletion of generated_metrics in partial parsing
time: 2024-07-16T13:37:03.49651-04:00
custom:
Author: gshank
Issue: "10450"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Fix error constructing warn_error_options
time: 2024-07-16T17:14:27.837171-04:00
custom:
Author: gshank
Issue: "10452"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Do not update varchar column definitions if a contract exists
time: 2024-07-28T22:14:21.67712-04:00
custom:
Author: gshank
Issue: "10362"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: fix all_constraints access, disabled node parsing of non-uniquely named resources
time: 2024-07-31T09:51:52.751135-04:00
custom:
Author: michelleark gshank
Issue: "10509"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Propagate measure label when using create_metrics
time: 2024-08-06T17:21:10.265494-07:00
custom:
Author: aliceliu
Issue: "10536"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: respect --quiet and --warn-error-options for flag deprecations
time: 2024-08-06T19:48:43.399453-04:00
custom:
Author: michelleark
Issue: "10105"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Fix state:modified check for exports
time: 2024-08-13T15:42:35.471685-07:00
custom:
Author: aliceliu
Issue: "10138"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Filter out empty nodes after graph selection to support consistent selection of nodes that depend on upstream public models
time: 2024-08-16T14:08:07.426235-07:00
custom:
Author: jtcohen6
Issue: "8987"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Late render pre- and post-hooks configs in properties / schema YAML files
time: 2024-08-24T21:09:03.252733-06:00
custom:
Author: dbeatty10
Issue: "10603"

View File

@@ -0,0 +1,7 @@
kind: Fixes
body: Allow the use of env_var function in certain macros in which it was previously
unavailable.
time: 2024-08-29T10:57:01.160613-04:00
custom:
Author: peterallenwebb
Issue: "10609"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: 'Remove deprecation for tests: to data_tests: change'
time: 2024-09-05T18:02:48.086421-04:00
custom:
Author: gshank
Issue: "10564"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: bump black to 24.3.0
time: 2024-07-16T18:48:59.651834-04:00
custom:
Author: michelleark
Issue: "10454"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: generate protos with protoc version 5.26.1
time: 2024-07-16T20:57:03.332448-04:00
custom:
Author: michelleark
Issue: "10457"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Move from minimal-snowplow-tracker fork back to snowplow-tracker
time: 2024-08-06T15:54:06.422444-04:00
custom:
Author: peterallenwebb
Issue: "8409"

View File

@@ -0,0 +1,7 @@
kind: Under the Hood
body: Add group info to RunResultError, RunResultFailure, RunResultWarning log lines
time: 2024-08-07T15:56:52.171199-05:00
custom:
Author: aranke
Issue: ""
JiraID: "364"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Improve speed of tree traversal when finding children, increasing build speed for some selectors
time: 2024-08-09T13:02:34.759905-07:00
custom:
Author: ttusing
Issue: "10434"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Add test for sources tables with quotes
time: 2024-08-21T09:55:16.038101-04:00
custom:
Author: gshank
Issue: "10582"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Additional type hints for `core/dbt/version.py`
time: 2024-08-27T10:50:14.047859-05:00
custom:
Author: QMalcolm
Issue: "10612"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Fix typing issues in core/dbt/contracts/sql.py
time: 2024-08-27T11:31:23.749912-05:00
custom:
Author: QMalcolm
Issue: "10614"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Fix type errors in `dbt/core/task/clean.py`
time: 2024-08-27T11:48:10.438173-05:00
custom:
Author: QMalcolm
Issue: "10616"

View File

@@ -7,6 +7,7 @@ ignore =
W503 # makes Flake8 work like black
W504
E203 # makes Flake8 work like black
E704 # makes Flake8 work like black
E741
E501 # long line checking is done in black
exclude = test/

18
.github/ISSUE_TEMPLATE/code-docs.yml vendored Normal file
View File

@@ -0,0 +1,18 @@
name: 📄 Code docs
description: Report an issue for markdown files within this repo, such as README, ARCHITECTURE, etc.
title: "[Code docs] <title>"
labels: ["triage"]
body:
- type: markdown
attributes:
value: |
Thanks for taking the time to fill out this code docs issue!
- type: textarea
attributes:
label: Please describe the issue and your proposals.
description: |
Links? References? Anything that will give us more context about the issue you are encountering!
Tip: You can attach images by clicking this area to highlight it and then dragging files in.
validations:
required: false

View File

@@ -2,7 +2,7 @@ blank_issues_enabled: false
contact_links:
- name: Documentation
url: https://github.com/dbt-labs/docs.getdbt.com/issues/new/choose
about: Problems and issues with dbt documentation
about: Problems and issues with dbt product documentation hosted on docs.getdbt.com. Issues for markdown files within this repo, such as README, should be opened using the "Code docs" template.
- name: Ask the community for help
url: https://github.com/dbt-labs/docs.getdbt.com/discussions
about: Need help troubleshooting? Check out our guide on how to ask

View File

@@ -1,7 +1,7 @@
resolves #
Resolves #
<!---
Include the number of the issue addressed by this PR above if applicable.
Include the number of the issue addressed by this PR above, if applicable.
PRs for code changes without an associated issue *will not be merged*.
See CONTRIBUTING.md for more information.
@@ -26,8 +26,8 @@ resolves #
### Checklist
- [ ] I have read [the contributing guide](https://github.com/dbt-labs/dbt-core/blob/main/CONTRIBUTING.md) and understand what's expected of me
- [ ] I have run this code in development and it appears to resolve the stated issue
- [ ] This PR includes tests, or tests are not required/relevant for this PR
- [ ] This PR has no interface changes (e.g. macros, cli, logs, json artifacts, config files, adapter interface, etc) or this PR has already received feedback and approval from Product or DX
- [ ] This PR includes [type annotations](https://docs.python.org/3/library/typing.html) for new and modified functions
- [ ] I have read [the contributing guide](https://github.com/dbt-labs/dbt-core/blob/main/CONTRIBUTING.md) and understand what's expected of me.
- [ ] I have run this code in development, and it appears to resolve the stated issue.
- [ ] This PR includes tests, or tests are not required or relevant for this PR.
- [ ] This PR has no interface changes (e.g., macros, CLI, logs, JSON artifacts, config files, adapter interface, etc.) or this PR has already received feedback and approval from Product or DX.
- [ ] This PR includes [type annotations](https://docs.python.org/3/library/typing.html) for new and modified functions.

View File

@@ -32,7 +32,7 @@ jobs:
run: |
echo "CI failure: Artifact changes checked in core/dbt/artifacts directory."
echo "Files changed: ${{ steps.check_artifact_changes.outputs.artifacts_changed_files }}"
echo "To bypass this check, confirm that the change is not breaking (https://github.com/dbt-labs/dbt-core/blob/main/core/dbt/artifacts/README.md#breaking-changes) and add the 'artifact_minor_upgrade' label to the PR."
echo "To bypass this check, confirm that the change is not breaking (https://github.com/dbt-labs/dbt-core/blob/main/core/dbt/artifacts/README.md#breaking-changes) and add the 'artifact_minor_upgrade' label to the PR. Modifications and additions to all fields require updates to https://github.com/dbt-labs/dbt-jsonschema."
exit 1
- name: CI check passed

View File

@@ -165,6 +165,18 @@ jobs:
os: [ubuntu-20.04]
split-group: ${{ fromJson(needs.integration-metadata.outputs.split-groups) }}
include: ${{ fromJson(needs.integration-metadata.outputs.include) }}
services:
postgres:
image: postgres
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
env:
TOXENV: integration
DBT_INVOCATION_ENV: github-actions
@@ -189,14 +201,32 @@ jobs:
- name: Set up postgres (linux)
if: runner.os == 'Linux'
uses: ./.github/actions/setup-postgres-linux
env:
PGHOST: localhost
PGPORT: 5432
PGUSER: postgres
PGPASSWORD: postgres
PGDATABASE: postgres
- name: Set up postgres (macos)
if: runner.os == 'macOS'
uses: ./.github/actions/setup-postgres-macos
env:
PGHOST: localhost
PGPORT: 5432
PGUSER: postgres
PGPASSWORD: postgres
PGDATABASE: postgres
- name: Set up postgres (windows)
if: runner.os == 'Windows'
uses: ./.github/actions/setup-postgres-windows
env:
PGHOST: localhost
PGPORT: 5432
PGUSER: postgres
PGPASSWORD: postgres
PGDATABASE: postgres
- name: Install python tools
run: |

View File

@@ -15,6 +15,7 @@ repos:
args: [--unsafe]
- id: check-json
- id: end-of-file-fixer
exclude: schemas/dbt/manifest/
- id: trailing-whitespace
exclude_types:
- "markdown"
@@ -26,7 +27,7 @@ repos:
- id: isort
- repo: https://github.com/psf/black
# rev must match what's in dev-requirements.txt
rev: 22.3.0
rev: 24.3.0
hooks:
- id: black
- id: black

View File

@@ -170,9 +170,9 @@ Finally, you can also run a specific test or group of tests using [`pytest`](htt
```sh
# run all unit tests in a file
python3 -m pytest tests/unit/test_base_column.py
python3 -m pytest tests/unit/test_invocation_id.py
# run a specific unit test
python3 -m pytest tests/unit/test_base_column.py::TestNumericType::test__numeric_type
python3 -m pytest tests/unit/test_invocation_id.py::TestInvocationId::test_invocation_id
# run specific Postgres functional tests
python3 -m pytest tests/functional/sources
```

View File

@@ -144,3 +144,7 @@ help: ## Show this help message.
@echo
@echo 'options:'
@echo 'use USE_DOCKER=true to run target in a docker container'
.PHONY: json_schema
json_schema: ## Update generated JSON schema using code changes.
scripts/collect-artifact-schema.py --path schemas

View File

@@ -29,6 +29,10 @@ All existing resources are defined under `dbt/artifacts/resources/v1`.
## Making changes to dbt/artifacts
### All changes
All changes to any fields will require a manual update to [dbt-jsonschema](https://github.com/dbt-labs/dbt-jsonschema) to ensure live checking continues to work.
### Non-breaking changes
Freely make incremental, non-breaking changes in-place to the latest major version of any artifact (minor or patch bumps). The only changes that are fully forward and backward compatible are:

View File

@@ -46,7 +46,7 @@ from dbt.artifacts.resources.v1.metric import (
MetricTimeWindow,
MetricTypeParams,
)
from dbt.artifacts.resources.v1.model import Model, ModelConfig
from dbt.artifacts.resources.v1.model import Model, ModelConfig, TimeSpine
from dbt.artifacts.resources.v1.owner import Owner
from dbt.artifacts.resources.v1.saved_query import (
Export,

View File

@@ -10,6 +10,7 @@ from dbt_common.contracts.config.properties import AdditionalPropertiesMixin
from dbt_common.contracts.constraints import ColumnLevelConstraint
from dbt_common.contracts.util import Mergeable
from dbt_common.dataclass_schema import ExtensibleDbtClassMixin, dbtClassMixin
from dbt_semantic_interfaces.type_enums import TimeGranularity
NodeVersion = Union[str, float]
@@ -66,6 +67,7 @@ class ColumnInfo(AdditionalPropertiesMixin, ExtensibleDbtClassMixin):
quote: Optional[bool] = None
tags: List[str] = field(default_factory=list)
_extra: Dict[str, Any] = field(default_factory=dict)
granularity: Optional[TimeGranularity] = None
@dataclass

View File

@@ -121,6 +121,7 @@ class Metric(GraphResource):
type_params: MetricTypeParams
filter: Optional[WhereFilterIntersection] = None
metadata: Optional[SourceFileMetadata] = None
time_granularity: Optional[TimeGranularity] = None
resource_type: Literal[NodeType.Metric]
meta: Dict[str, Any] = field(default_factory=dict, metadata=MergeBehavior.Update.meta())
tags: List[str] = field(default_factory=list)

View File

@@ -11,6 +11,7 @@ from dbt.artifacts.resources.v1.components import (
from dbt.artifacts.resources.v1.config import NodeConfig
from dbt_common.contracts.config.base import MergeBehavior
from dbt_common.contracts.constraints import ModelLevelConstraint
from dbt_common.dataclass_schema import dbtClassMixin
@dataclass
@@ -21,6 +22,11 @@ class ModelConfig(NodeConfig):
)
@dataclass
class TimeSpine(dbtClassMixin):
standard_granularity_column: str
@dataclass
class Model(CompiledResource):
resource_type: Literal[NodeType.Model]
@@ -32,6 +38,7 @@ class Model(CompiledResource):
deprecation_date: Optional[datetime] = None
defer_relation: Optional[DeferRelation] = None
primary_key: List[str] = field(default_factory=list)
time_spine: Optional[TimeSpine] = None
def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)

View File

@@ -34,6 +34,7 @@ class Export(dbtClassMixin):
name: str
config: ExportConfig
unrendered_config: Dict[str, str] = field(default_factory=dict)
@dataclass

View File

@@ -19,10 +19,9 @@ class SnapshotConfig(NodeConfig):
check_cols: Union[str, List[str], None] = None
def final_validate(self):
if not self.strategy or not self.unique_key or not self.target_schema:
if not self.strategy or not self.unique_key:
raise ValidationError(
"Snapshots must be configured with a 'strategy', 'unique_key', "
"and 'target_schema'."
"Snapshots must be configured with a 'strategy' and 'unique_key'."
)
if self.strategy == "check":
if not self.check_cols:

View File

@@ -77,8 +77,11 @@ class BaseArtifactMetadata(dbtClassMixin):
# remote-compile-result
# remote-execution-result
# remote-run-result
S = TypeVar("S", bound="VersionedSchema")
def schema_version(name: str, version: int):
def inner(cls: Type[VersionedSchema]):
def inner(cls: Type[S]):
cls.dbt_schema_version = SchemaVersion(
name=name,
version=version,

View File

@@ -158,7 +158,8 @@ class RunResultsArtifact(ExecutionResult, ArtifactMixin):
@classmethod
def upgrade_schema_version(cls, data):
"""This overrides the "upgrade_schema_version" call in VersionedSchema (via
ArtifactMixin) to modify the dictionary passed in from earlier versions of the run_results."""
ArtifactMixin) to modify the dictionary passed in from earlier versions of the run_results.
"""
run_results_schema_version = get_artifact_schema_version(data)
# If less than the current version (v5), preprocess contents to match latest schema version
if run_results_schema_version <= 5:

View File

@@ -1,7 +1,10 @@
from typing import IO, Optional
from typing import IO, List, Optional, Union
from click.exceptions import ClickException
from dbt.artifacts.schemas.catalog import CatalogArtifact
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.results import RunExecutionResult
from dbt.utils import ExitCodes
@@ -23,7 +26,7 @@ class CliException(ClickException):
# the typing of _file is to satisfy the signature of ClickException.show
# overriding this method prevents click from printing any exceptions to stdout
def show(self, _file: Optional[IO] = None) -> None:
def show(self, _file: Optional[IO] = None) -> None: # type: ignore[type-arg]
pass
@@ -31,7 +34,17 @@ class ResultExit(CliException):
"""This class wraps any exception that contains results while invoking dbt, or the
results of an invocation that did not succeed but did not throw any exceptions."""
def __init__(self, result) -> None:
def __init__(
self,
result: Union[
bool, # debug
CatalogArtifact, # docs generate
List[str], # list/ls
Manifest, # parse
None, # clean, deps, init, source
RunExecutionResult, # build, compile, run, seed, snapshot, test, run-operation
] = None,
) -> None:
super().__init__(ExitCodes.ModelError)
self.result = result

View File

@@ -15,7 +15,7 @@ from dbt.cli.resolvers import default_log_path, default_project_dir
from dbt.cli.types import Command as CliCommand
from dbt.config.project import read_project_flags
from dbt.contracts.project import ProjectFlags
from dbt.deprecations import renamed_env_var
from dbt.deprecations import fire_buffered_deprecations, renamed_env_var
from dbt.events import ALL_EVENT_NAMES
from dbt_common import ui
from dbt_common.clients import jinja
@@ -92,6 +92,8 @@ class Flags:
# Set the default flags.
for key, value in FLAGS_DEFAULTS.items():
object.__setattr__(self, key, value)
# Use to handle duplicate params in _assign_params
flags_defaults_list = list(FLAGS_DEFAULTS.keys())
if ctx is None:
ctx = get_current_context()
@@ -173,13 +175,29 @@ class Flags:
old_name=dep_param.envvar,
new_name=new_param.envvar,
)
# end deprecated_params
# Set the flag value.
is_duplicate = hasattr(self, param_name.upper())
is_duplicate = (
hasattr(self, param_name.upper())
and param_name.upper() not in flags_defaults_list
)
# First time through, set as though FLAGS_DEFAULTS hasn't been set, so not a duplicate.
# Subsequent pass (to process "parent" params) should be treated as duplicates.
if param_name.upper() in flags_defaults_list:
flags_defaults_list.remove(param_name.upper())
# Note: the following determines whether parameter came from click default,
# not from FLAGS_DEFAULTS in __init__.
is_default = ctx.get_parameter_source(param_name) == ParameterSource.DEFAULT
is_envvar = ctx.get_parameter_source(param_name) == ParameterSource.ENVIRONMENT
flag_name = (new_name or param_name).upper()
if (is_duplicate and not is_default) or not is_duplicate:
# envvar flags are assigned in either parent or child context if there
# isn't an overriding cli command flag.
# If the flag has been encountered as a child cli flag, we don't
# want to overwrite with parent envvar, since the commandline flag takes precedence.
if (is_duplicate and not (is_default or is_envvar)) or not is_duplicate:
object.__setattr__(self, flag_name, param_value)
# Track default assigned params.
@@ -337,6 +355,8 @@ class Flags:
# not get pickled when written to disk as json.
object.__delattr__(self, "deprecated_env_var_warnings")
fire_buffered_deprecations()
@classmethod
def from_dict(cls, command: CliCommand, args_dict: Dict[str, Any]) -> "Flags":
command_arg_list = command_params(command, args_dict)

View File

@@ -218,10 +218,9 @@ def clean(ctx, **kwargs):
"""Delete all folders in the clean-targets list (usually the dbt_packages and target directories.)"""
from dbt.task.clean import CleanTask
task = CleanTask(ctx.obj["flags"], ctx.obj["project"])
results = task.run()
success = task.interpret_results(results)
with CleanTask(ctx.obj["flags"], ctx.obj["project"]) as task:
results = task.run()
success = task.interpret_results(results)
return results, success
@@ -437,9 +436,9 @@ def deps(ctx, **kwargs):
message=f"Version is required in --add-package when a package when source is {flags.SOURCE}",
option_name="--add-package",
)
task = DepsTask(flags, ctx.obj["project"])
results = task.run()
success = task.interpret_results(results)
with DepsTask(flags, ctx.obj["project"]) as task:
results = task.run()
success = task.interpret_results(results)
return results, success
@@ -459,10 +458,9 @@ def init(ctx, **kwargs):
"""Initialize a new dbt project."""
from dbt.task.init import InitTask
task = InitTask(ctx.obj["flags"])
results = task.run()
success = task.interpret_results(results)
with InitTask(ctx.obj["flags"]) as task:
results = task.run()
success = task.interpret_results(results)
return results, success

View File

@@ -1,6 +1,6 @@
from click import Choice, ParamType
from dbt.config.utils import exclusive_primary_alt_value_setting, parse_cli_yaml_string
from dbt.config.utils import normalize_warn_error_options, parse_cli_yaml_string
from dbt.events import ALL_EVENT_NAMES
from dbt.exceptions import OptionNotYamlDictError, ValidationError
from dbt_common.exceptions import DbtValidationError
@@ -51,12 +51,7 @@ class WarnErrorOptionsType(YAML):
def convert(self, value, param, ctx):
# this function is being used by param in click
include_exclude = super().convert(value, param, ctx)
exclusive_primary_alt_value_setting(
include_exclude, "include", "error", "warn_error_options"
)
exclusive_primary_alt_value_setting(
include_exclude, "exclude", "warn", "warn_error_options"
)
normalize_warn_error_options(include_exclude)
return WarnErrorOptions(
include=include_exclude.get("include", []),

View File

@@ -179,9 +179,11 @@ def postflight(func):
process_in_blocks=rusage.ru_inblock,
process_out_blocks=rusage.ru_oublock,
),
EventLevel.INFO
if "flags" in ctx.obj and ctx.obj["flags"].SHOW_RESOURCE_REPORT
else None,
(
EventLevel.INFO
if "flags" in ctx.obj and ctx.obj["flags"].SHOW_RESOURCE_REPORT
else None
),
)
fire_event(

View File

@@ -1,11 +1,13 @@
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional, Union
import jinja2
from dbt.exceptions import MacroNamespaceNotStringError
from dbt.artifacts.resources import RefArgs
from dbt.exceptions import MacroNamespaceNotStringError, ParsingError
from dbt_common.clients.jinja import get_environment
from dbt_common.exceptions.macros import MacroNameNotStringError
from dbt_common.tests import test_caching_enabled
from dbt_extractor import ExtractionError, py_extract_from_source # type: ignore
_TESTING_MACRO_CACHE: Optional[Dict[str, Any]] = {}
@@ -153,3 +155,39 @@ def statically_parse_adapter_dispatch(func_call, ctx, db_wrapper):
possible_macro_calls.append(f"{package_name}.{func_name}")
return possible_macro_calls
def statically_parse_ref_or_source(expression: str) -> Union[RefArgs, List[str]]:
"""
Returns a RefArgs or List[str] object, corresponding to ref or source respectively, given an input jinja expression.
input: str representing how input node is referenced in tested model sql
* examples:
- "ref('my_model_a')"
- "ref('my_model_a', version=3)"
- "ref('package', 'my_model_a', version=3)"
- "source('my_source_schema', 'my_source_name')"
If input is not a well-formed jinja ref or source expression, a ParsingError is raised.
"""
ref_or_source: Union[RefArgs, List[str]]
try:
statically_parsed = py_extract_from_source(f"{{{{ {expression} }}}}")
except ExtractionError:
raise ParsingError(f"Invalid jinja expression: {expression}")
if statically_parsed.get("refs"):
raw_ref = list(statically_parsed["refs"])[0]
ref_or_source = RefArgs(
package=raw_ref.get("package"),
name=raw_ref.get("name"),
version=raw_ref.get("version"),
)
elif statically_parsed.get("sources"):
source_name, source_table_name = list(statically_parsed["sources"])[0]
ref_or_source = [source_name, source_table_name]
else:
raise ParsingError(f"Invalid ref or source expression: {expression}")
return ref_or_source

View File

@@ -21,6 +21,7 @@ from dbt.contracts.graph.nodes import (
InjectedCTE,
ManifestNode,
ManifestSQLNode,
ModelNode,
SeedNode,
UnitTestDefinition,
UnitTestNode,
@@ -29,12 +30,15 @@ from dbt.events.types import FoundStats, WritingInjectedSQLForNode
from dbt.exceptions import (
DbtInternalError,
DbtRuntimeError,
ForeignKeyConstraintToSyntaxError,
GraphDependencyNotFoundError,
ParsingError,
)
from dbt.flags import get_flags
from dbt.graph import Graph
from dbt.node_types import ModelLanguage, NodeType
from dbt_common.clients.system import make_directory
from dbt_common.contracts.constraints import ConstraintType
from dbt_common.events.contextvars import get_node_info
from dbt_common.events.format import pluralize
from dbt_common.events.functions import fire_event
@@ -371,7 +375,7 @@ class Compiler:
_extend_prepended_ctes(prepended_ctes, new_prepended_ctes)
new_cte_name = self.add_ephemeral_prefix(cte_model.name)
new_cte_name = self.add_ephemeral_prefix(cte_model.identifier)
rendered_sql = cte_model._pre_injected_sql or cte_model.compiled_code
sql = f" {new_cte_name} as (\n{rendered_sql}\n)"
@@ -437,8 +441,31 @@ class Compiler:
relation_name = str(relation_cls.create_from(self.config, node))
node.relation_name = relation_name
# Compile 'ref' and 'source' expressions in foreign key constraints
if isinstance(node, ModelNode):
for constraint in node.all_constraints:
if constraint.type == ConstraintType.foreign_key and constraint.to:
constraint.to = self._compile_relation_for_foreign_key_constraint_to(
manifest, node, constraint.to
)
return node
def _compile_relation_for_foreign_key_constraint_to(
self, manifest: Manifest, node: ManifestSQLNode, to_expression: str
) -> str:
try:
foreign_key_node = manifest.find_node_from_ref_or_source(to_expression)
except ParsingError:
raise ForeignKeyConstraintToSyntaxError(node, to_expression)
if not foreign_key_node:
raise GraphDependencyNotFoundError(node, to_expression)
adapter = get_adapter(self.config)
relation_name = str(adapter.Relation.create_from(self.config, foreign_key_node))
return relation_name
# This method doesn't actually "compile" any of the nodes. That is done by the
# "compile_node" method. This creates a Linker and builds the networkx graph,
# writes out the graph.gpickle file, and prints the stats, returning a Graph object.
@@ -520,6 +547,8 @@ class Compiler:
the node's raw_code into compiled_code, and then calls the
recursive method to "prepend" the ctes.
"""
# REVIEW: UnitTestDefinition shouldn't be possible here because of the
# type of node, and it is likewise an invalid return type.
if isinstance(node, UnitTestDefinition):
return node

View File

@@ -10,7 +10,7 @@ from dbt import deprecations
from dbt.adapters.contracts.connection import QueryComment
from dbt.clients.yaml_helper import load_yaml_text
from dbt.config.selectors import SelectorDict
from dbt.config.utils import exclusive_primary_alt_value_setting
from dbt.config.utils import normalize_warn_error_options
from dbt.constants import (
DBT_PROJECT_FILE_NAME,
DEPENDENCIES_FILE_NAME,
@@ -480,6 +480,7 @@ class PartialProject(RenderComponents):
rendered.selectors_dict["selectors"]
)
dbt_cloud = cfg.dbt_cloud
flags: Dict[str, Any] = cfg.flags
project = Project(
project_name=name,
@@ -524,6 +525,7 @@ class PartialProject(RenderComponents):
project_env_vars=project_env_vars,
restrict_access=cfg.restrict_access,
dbt_cloud=dbt_cloud,
flags=flags,
)
# sanity check - this means an internal issue
project.validate()
@@ -568,11 +570,6 @@ class PartialProject(RenderComponents):
) = package_and_project_data_from_root(project_root)
selectors_dict = selector_data_from_root(project_root)
if "flags" in project_dict:
# We don't want to include "flags" in the Project,
# it goes in ProjectFlags
project_dict.pop("flags")
return cls.from_dicts(
project_root=project_root,
project_dict=project_dict,
@@ -645,6 +642,7 @@ class Project:
project_env_vars: Dict[str, Any]
restrict_access: bool
dbt_cloud: Dict[str, Any]
flags: Dict[str, Any]
@property
def all_source_paths(self) -> List[str]:
@@ -724,6 +722,7 @@ class Project:
"require-dbt-version": [v.to_version_string() for v in self.dbt_version],
"restrict-access": self.restrict_access,
"dbt-cloud": self.dbt_cloud,
"flags": self.flags,
}
)
if self.query_comment:
@@ -821,20 +820,15 @@ def read_project_flags(project_dir: str, profiles_dir: str) -> ProjectFlags:
if profile_project_flags:
# This can't use WARN_ERROR or WARN_ERROR_OPTIONS because they're in
# the config that we're loading. Uses special "warn" method.
deprecations.warn("project-flags-moved")
# the config that we're loading. Uses special "buffer" method and fired after flags are initialized in preflight.
deprecations.buffer("project-flags-moved")
project_flags = profile_project_flags
if project_flags is not None:
# handle collapsing `include` and `error` as well as collapsing `exclude` and `warn`
# for warn_error_options
warn_error_options = project_flags.get("warn_error_options")
exclusive_primary_alt_value_setting(
warn_error_options, "include", "error", "warn_error_options"
)
exclusive_primary_alt_value_setting(
warn_error_options, "exclude", "warn", "warn_error_options"
)
warn_error_options = project_flags.get("warn_error_options", {})
normalize_warn_error_options(warn_error_options)
ProjectFlags.validate(project_flags)
return ProjectFlags.from_dict(project_flags)

View File

@@ -193,6 +193,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
log_cache_events=log_cache_events,
dependencies=dependencies,
dbt_cloud=project.dbt_cloud,
flags=project.flags,
)
# Called by 'load_projects' in this class
@@ -290,9 +291,9 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
project_name=self.project_name,
project_id=self.hashed_name(),
user_id=tracking.active_user.id if tracking.active_user else None,
send_anonymous_usage_stats=get_flags().SEND_ANONYMOUS_USAGE_STATS
if tracking.active_user
else None,
send_anonymous_usage_stats=(
get_flags().SEND_ANONYMOUS_USAGE_STATS if tracking.active_user else None
),
adapter_type=self.credentials.type,
)

View File

@@ -49,5 +49,18 @@ def exclusive_primary_alt_value_setting(
f"Only `{alt}` or `{primary}` can be specified{where}, not both"
)
if alt_options:
dictionary[primary] = alt_options
if alt in dictionary:
alt_value = dictionary.pop(alt)
dictionary[primary] = alt_value
def normalize_warn_error_options(warn_error_options: Dict[str, Any]) -> None:
exclusive_primary_alt_value_setting(
warn_error_options, "include", "error", "warn_error_options"
)
exclusive_primary_alt_value_setting(
warn_error_options, "exclude", "warn", "warn_error_options"
)
for key in ("include", "exclude", "silence"):
if key in warn_error_options and warn_error_options[key] is None:
warn_error_options[key] = []

View File

@@ -1,3 +1,5 @@
from dbt_semantic_interfaces.type_enums import TimeGranularity
DEFAULT_ENV_PLACEHOLDER = "DBT_DEFAULT_PLACEHOLDER"
SECRET_PLACEHOLDER = "$$$DBT_SECRET_START$$${}$$$DBT_SECRET_END$$$"
@@ -15,6 +17,8 @@ DEPENDENCIES_FILE_NAME = "dependencies.yml"
PACKAGE_LOCK_FILE_NAME = "package-lock.yml"
MANIFEST_FILE_NAME = "manifest.json"
SEMANTIC_MANIFEST_FILE_NAME = "semantic_manifest.json"
TIME_SPINE_MODEL_NAME = "metricflow_time_spine"
LEGACY_TIME_SPINE_MODEL_NAME = "metricflow_time_spine"
LEGACY_TIME_SPINE_GRANULARITY = TimeGranularity.DAY
MINIMUM_REQUIRED_TIME_SPINE_GRANULARITY = TimeGranularity.DAY
PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"
PACKAGE_LOCK_HASH_KEY = "sha1_hash"

View File

@@ -27,8 +27,7 @@ class ConfigSource:
def __init__(self, project):
self.project = project
def get_config_dict(self, resource_type: NodeType):
...
def get_config_dict(self, resource_type: NodeType): ...
class UnrenderedConfig(ConfigSource):
@@ -130,12 +129,12 @@ class BaseContextConfigGenerator(Generic[T]):
return self._project_configs(self._active_project, fqn, resource_type)
@abstractmethod
def _update_from_config(self, result: T, partial: Dict[str, Any], validate: bool = False) -> T:
...
def _update_from_config(
self, result: T, partial: Dict[str, Any], validate: bool = False
) -> T: ...
@abstractmethod
def initial_result(self, resource_type: NodeType, base: bool) -> T:
...
def initial_result(self, resource_type: NodeType, base: bool) -> T: ...
def calculate_node_config(
self,
@@ -181,8 +180,7 @@ class BaseContextConfigGenerator(Generic[T]):
project_name: str,
base: bool,
patch_config_dict: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
...
) -> Dict[str, Any]: ...
class ContextConfigGenerator(BaseContextConfigGenerator[C]):

View File

@@ -8,7 +8,7 @@ from dbt.adapters.exceptions import (
RelationWrongTypeError,
)
from dbt.adapters.exceptions.cache import CacheInconsistencyError
from dbt.events.types import JinjaLogWarning
from dbt.events.types import JinjaLogWarning, SnapshotTimestampWarning
from dbt.exceptions import (
AmbiguousAliasError,
AmbiguousCatalogMatchError,
@@ -116,6 +116,17 @@ def raise_fail_fast_error(msg, node=None) -> NoReturn:
raise FailFastError(msg, node=node)
def warn_snapshot_timestamp_data_types(
snapshot_time_data_type: str, updated_at_data_type: str
) -> None:
warn_or_error(
SnapshotTimestampWarning(
snapshot_time_data_type=snapshot_time_data_type,
updated_at_data_type=updated_at_data_type,
)
)
# Update this when a new function should be added to the
# dbt context's `exceptions` key!
CONTEXT_EXPORTS = {
@@ -141,6 +152,7 @@ CONTEXT_EXPORTS = {
raise_contract_error,
column_type_missing,
raise_fail_fast_error,
warn_snapshot_timestamp_data_types,
]
}

View File

@@ -239,8 +239,7 @@ class BaseRefResolver(BaseResolver):
@abc.abstractmethod
def resolve(
self, name: str, package: Optional[str] = None, version: Optional[NodeVersion] = None
) -> RelationProxy:
...
) -> RelationProxy: ...
def _repack_args(
self, name: str, package: Optional[str], version: Optional[NodeVersion]
@@ -306,8 +305,7 @@ class BaseSourceResolver(BaseResolver):
class BaseMetricResolver(BaseResolver):
@abc.abstractmethod
def resolve(self, name: str, package: Optional[str] = None) -> MetricReference:
...
def resolve(self, name: str, package: Optional[str] = None) -> MetricReference: ...
def _repack_args(self, name: str, package: Optional[str]) -> List[str]:
if package is None:
@@ -341,8 +339,7 @@ class BaseMetricResolver(BaseResolver):
class Config(Protocol):
def __init__(self, model, context_config: Optional[ContextConfig]):
...
def __init__(self, model, context_config: Optional[ContextConfig]): ...
# Implementation of "config(..)" calls in models
@@ -977,7 +974,8 @@ class ProviderContext(ManifestContext):
table = agate_helper.from_csv(path, text_columns=column_types, delimiter=delimiter)
except ValueError as e:
raise LoadAgateTableValueError(e, node=self.model)
table.original_abspath = os.path.abspath(path)
# this is used by some adapters
table.original_abspath = os.path.abspath(path) # type: ignore
return table
@contextproperty()

View File

@@ -192,8 +192,13 @@ class SchemaSourceFile(BaseSourceFile):
sources: List[str] = field(default_factory=list)
exposures: List[str] = field(default_factory=list)
metrics: List[str] = field(default_factory=list)
# metrics generated from semantic_model measures
# The following field will no longer be used. Leaving
# here to avoid breaking existing projects. To be removed
# later if possible.
generated_metrics: List[str] = field(default_factory=list)
# metrics generated from semantic_model measures. The key is
# the name of the semantic_model, so that we can find it later.
metrics_from_measures: Dict[str, Any] = field(default_factory=dict)
groups: List[str] = field(default_factory=list)
# node patches contain models, seeds, snapshots, analyses
ndp: List[str] = field(default_factory=list)
@@ -259,6 +264,40 @@ class SchemaSourceFile(BaseSourceFile):
return self.data_tests[yaml_key][name]
return []
def add_metrics_from_measures(self, semantic_model_name: str, metric_unique_id: str):
if self.generated_metrics:
# Probably not needed, but for safety sake, convert the
# old generated_metrics to metrics_from_measures.
self.fix_metrics_from_measures()
if semantic_model_name not in self.metrics_from_measures:
self.metrics_from_measures[semantic_model_name] = []
self.metrics_from_measures[semantic_model_name].append(metric_unique_id)
def fix_metrics_from_measures(self):
# Temporary method to fix up existing projects with a partial parse file.
# This should only be called if SchemaSourceFile in a msgpack
# pack manifest has an existing "generated_metrics" list, to turn it
# it into a "metrics_from_measures" dictionary, so that we can
# correctly partially parse.
# This code can be removed when "generated_metrics" is removed.
generated_metrics = self.generated_metrics
self.generated_metrics = [] # Should never be needed again
# For each metric_unique_id we loop through the semantic models
# looking for the name of the "measure" which generated the metric.
# When it's found, add it to "metrics_from_measures", with a key
# of the semantic_model name, and a list of metrics.
for metric_unique_id in generated_metrics:
parts = metric_unique_id.split(".")
# get the metric_name
metric_name = parts[-1]
if "semantic_models" in self.dict_from_yaml:
for sem_model in self.dict_from_yaml["semantic_models"]:
if "measures" in sem_model:
for measure in sem_model["measures"]:
if measure["name"] == metric_name:
self.add_metrics_from_measures(sem_model["name"], metric_unique_id)
break
def get_key_and_name_for_test(self, test_unique_id):
yaml_key = None
block_name = None

View File

@@ -32,9 +32,10 @@ from dbt.adapters.exceptions import (
from dbt.adapters.factory import get_adapter_package_names
# to preserve import paths
from dbt.artifacts.resources import BaseResource, DeferRelation, NodeVersion
from dbt.artifacts.resources import BaseResource, DeferRelation, NodeVersion, RefArgs
from dbt.artifacts.resources.v1.config import NodeConfig
from dbt.artifacts.schemas.manifest import ManifestMetadata, UniqueID, WritableManifest
from dbt.clients.jinja_static import statically_parse_ref_or_source
from dbt.contracts.files import (
AnySourceFile,
FileHash,
@@ -54,7 +55,6 @@ from dbt.contracts.graph.nodes import (
ManifestNode,
Metric,
ModelNode,
ResultNode,
SavedQuery,
SeedNode,
SemanticModel,
@@ -413,11 +413,11 @@ class DisabledLookup(dbtClassMixin):
self.storage: Dict[str, Dict[PackageName, List[Any]]] = {}
self.populate(manifest)
def populate(self, manifest):
def populate(self, manifest: "Manifest"):
for node in list(chain.from_iterable(manifest.disabled.values())):
self.add_node(node)
def add_node(self, node):
def add_node(self, node: GraphMemberNode) -> None:
if node.search_name not in self.storage:
self.storage[node.search_name] = {}
if node.package_name not in self.storage[node.search_name]:
@@ -427,8 +427,12 @@ class DisabledLookup(dbtClassMixin):
# This should return a list of disabled nodes. It's different from
# the other Lookup functions in that it returns full nodes, not just unique_ids
def find(
self, search_name, package: Optional[PackageName], version: Optional[NodeVersion] = None
):
self,
search_name,
package: Optional[PackageName],
version: Optional[NodeVersion] = None,
resource_types: Optional[List[NodeType]] = None,
) -> Optional[List[Any]]:
if version:
search_name = f"{search_name}.v{version}"
@@ -437,16 +441,29 @@ class DisabledLookup(dbtClassMixin):
pkg_dct: Mapping[PackageName, List[Any]] = self.storage[search_name]
nodes = []
if package is None:
if not pkg_dct:
return None
else:
return next(iter(pkg_dct.values()))
nodes = next(iter(pkg_dct.values()))
elif package in pkg_dct:
return pkg_dct[package]
nodes = pkg_dct[package]
else:
return None
if resource_types is None:
return nodes
else:
new_nodes = []
for node in nodes:
if node.resource_type in resource_types:
new_nodes.append(node)
if not new_nodes:
return None
else:
return new_nodes
class AnalysisLookup(RefableLookup):
_lookup_types: ClassVar[set] = set([NodeType.Analysis])
@@ -1295,7 +1312,12 @@ class Manifest(MacroMethods, dbtClassMixin):
# it's possible that the node is disabled
if disabled is None:
disabled = self.disabled_lookup.find(target_model_name, pkg, target_model_version)
disabled = self.disabled_lookup.find(
target_model_name,
pkg,
version=target_model_version,
resource_types=REFABLE_NODE_TYPES,
)
if disabled:
return Disabled(disabled[0])
@@ -1566,13 +1588,15 @@ class Manifest(MacroMethods, dbtClassMixin):
self.exposures[exposure.unique_id] = exposure
source_file.exposures.append(exposure.unique_id)
def add_metric(self, source_file: SchemaSourceFile, metric: Metric, generated: bool = False):
def add_metric(
self, source_file: SchemaSourceFile, metric: Metric, generated_from: Optional[str] = None
):
_check_duplicates(metric, self.metrics)
self.metrics[metric.unique_id] = metric
if not generated:
if not generated_from:
source_file.metrics.append(metric.unique_id)
else:
source_file.generated_metrics.append(metric.unique_id)
source_file.add_metrics_from_measures(generated_from, metric.unique_id)
def add_group(self, source_file: SchemaSourceFile, group: Group):
_check_duplicates(group, self.groups)
@@ -1586,7 +1610,7 @@ class Manifest(MacroMethods, dbtClassMixin):
else:
self.disabled[node.unique_id] = [node]
def add_disabled(self, source_file: AnySourceFile, node: ResultNode, test_from=None):
def add_disabled(self, source_file: AnySourceFile, node: GraphMemberNode, test_from=None):
self.add_disabled_nofile(node)
if isinstance(source_file, SchemaSourceFile):
if isinstance(node, GenericTestNode):
@@ -1634,6 +1658,22 @@ class Manifest(MacroMethods, dbtClassMixin):
# end of methods formerly in ParseResult
def find_node_from_ref_or_source(
self, expression: str
) -> Optional[Union[ModelNode, SourceDefinition]]:
ref_or_source = statically_parse_ref_or_source(expression)
node = None
if isinstance(ref_or_source, RefArgs):
node = self.ref_lookup.find(
ref_or_source.name, ref_or_source.package, ref_or_source.version, self
)
else:
source_name, source_table_name = ref_or_source[0], ref_or_source[1]
node = self.source_lookup.find(f"{source_name}.{source_table_name}", None, self)
return node
# Provide support for copy.deepcopy() - we just need to avoid the lock!
# pickle and deepcopy use this. It returns a callable object used to
# create the initial version of the object and a tuple of arguments
@@ -1677,9 +1717,9 @@ class MacroManifest(MacroMethods):
self.macros = macros
self.metadata = ManifestMetadata(
user_id=tracking.active_user.id if tracking.active_user else None,
send_anonymous_usage_stats=get_flags().SEND_ANONYMOUS_USAGE_STATS
if tracking.active_user
else None,
send_anonymous_usage_stats=(
get_flags().SEND_ANONYMOUS_USAGE_STATS if tracking.active_user else None
),
)
# This is returned by the 'graph' context property
# in the ProviderContext class.

View File

@@ -18,7 +18,6 @@ from typing import (
from mashumaro.types import SerializableType
from dbt import deprecations
from dbt.adapters.base import ConstraintSupport
from dbt.adapters.factory import get_adapter_constraint_support
from dbt.artifacts.resources import Analysis as AnalysisResource
@@ -58,6 +57,7 @@ from dbt.artifacts.resources import SingularTest as SingularTestResource
from dbt.artifacts.resources import Snapshot as SnapshotResource
from dbt.artifacts.resources import SourceDefinition as SourceDefinitionResource
from dbt.artifacts.resources import SqlOperation as SqlOperationResource
from dbt.artifacts.resources import TimeSpine
from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource
from dbt.contracts.graph.model_config import UnitTestNodeConfig
from dbt.contracts.graph.node_args import ModelNodeArgs
@@ -85,7 +85,11 @@ from dbt.node_types import (
NodeType,
)
from dbt_common.clients.system import write_file
from dbt_common.contracts.constraints import ConstraintType
from dbt_common.contracts.constraints import (
ColumnLevelConstraint,
ConstraintType,
ModelLevelConstraint,
)
from dbt_common.events.contextvars import set_log_contextvars
from dbt_common.events.functions import warn_or_error
@@ -489,6 +493,18 @@ class ModelNode(ModelResource, CompiledNode):
def materialization_enforces_constraints(self) -> bool:
return self.config.materialized in ["table", "incremental"]
@property
def all_constraints(self) -> List[Union[ModelLevelConstraint, ColumnLevelConstraint]]:
constraints: List[Union[ModelLevelConstraint, ColumnLevelConstraint]] = []
for model_level_constraint in self.constraints:
constraints.append(model_level_constraint)
for column in self.columns.values():
for column_level_constraint in column.constraints:
constraints.append(column_level_constraint)
return constraints
def infer_primary_key(self, data_tests: List["GenericTestNode"]) -> List[str]:
"""
Infers the columns that can be used as primary key of a model in the following order:
@@ -636,9 +652,9 @@ class ModelNode(ModelResource, CompiledNode):
contract_enforced_disabled: bool = False
columns_removed: List[str] = []
column_type_changes: List[Dict[str, str]] = []
enforced_column_constraint_removed: List[
Dict[str, str]
] = [] # column_name, constraint_type
enforced_column_constraint_removed: List[Dict[str, str]] = (
[]
) # column_name, constraint_type
enforced_model_constraint_removed: List[Dict[str, Any]] = [] # constraint_type, columns
materialization_changed: List[str] = []
@@ -1131,12 +1147,6 @@ class UnpatchedSourceDefinition(BaseNode):
"Invalid test config: cannot have both 'tests' and 'data_tests' defined"
)
if self.tests:
if is_root_project:
deprecations.warn(
"project-test-config",
deprecated_path="tests",
exp_path="data_tests",
)
self.data_tests.extend(self.tests)
self.tests.clear()
@@ -1147,12 +1157,6 @@ class UnpatchedSourceDefinition(BaseNode):
"Invalid test config: cannot have both 'tests' and 'data_tests' defined"
)
if column.tests:
if is_root_project:
deprecations.warn(
"project-test-config",
deprecated_path="tests",
exp_path="data_tests",
)
column.data_tests.extend(column.tests)
column.tests.clear()
@@ -1450,6 +1454,13 @@ class Group(GroupResource, BaseNode):
def resource_class(cls) -> Type[GroupResource]:
return GroupResource
def to_logging_dict(self) -> Dict[str, Union[str, Dict[str, str]]]:
return {
"name": self.name,
"package_name": self.package_name,
"owner": self.owner.to_dict(),
}
# ====================================
# SemanticModel node
@@ -1554,14 +1565,13 @@ class SavedQuery(NodeInfoMixin, GraphNode, SavedQueryResource):
return False
# exports should be in the same order, so we zip them for easy iteration
for (old_export, new_export) in zip(old.exports, self.exports):
if not (
old_export.name == new_export.name
and old_export.config.export_as == new_export.config.export_as
and old_export.config.schema_name == new_export.config.schema_name
and old_export.config.alias == new_export.config.alias
):
for old_export, new_export in zip(old.exports, self.exports):
if not (old_export.name == new_export.name):
return False
keys = ["export_as", "schema", "alias"]
for key in keys:
if old_export.unrendered_config.get(key) != new_export.unrendered_config.get(key):
return False
return True
@@ -1609,6 +1619,7 @@ class ParsedNodePatch(ParsedPatch):
latest_version: Optional[NodeVersion]
constraints: List[Dict[str, Any]]
deprecation_date: Optional[datetime]
time_spine: Optional[TimeSpine] = None
@dataclass

View File

@@ -1,10 +1,19 @@
from dbt.constants import TIME_SPINE_MODEL_NAME
from typing import List, Optional
from dbt.constants import (
LEGACY_TIME_SPINE_GRANULARITY,
LEGACY_TIME_SPINE_MODEL_NAME,
MINIMUM_REQUIRED_TIME_SPINE_GRANULARITY,
)
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ModelNode
from dbt.events.types import SemanticValidationFailure
from dbt.exceptions import ParsingError
from dbt_common.clients.system import write_file
from dbt_common.events.base_types import EventLevel
from dbt_common.events.functions import fire_event
from dbt_semantic_interfaces.implementations.metric import PydanticMetric
from dbt_semantic_interfaces.implementations.node_relation import PydanticNodeRelation
from dbt_semantic_interfaces.implementations.project_configuration import (
PydanticProjectConfiguration,
)
@@ -13,8 +22,12 @@ from dbt_semantic_interfaces.implementations.semantic_manifest import (
PydanticSemanticManifest,
)
from dbt_semantic_interfaces.implementations.semantic_model import PydanticSemanticModel
from dbt_semantic_interfaces.implementations.time_spine import (
PydanticTimeSpine,
PydanticTimeSpinePrimaryColumn,
)
from dbt_semantic_interfaces.implementations.time_spine_table_configuration import (
PydanticTimeSpineTableConfiguration,
PydanticTimeSpineTableConfiguration as LegacyTimeSpine,
)
from dbt_semantic_interfaces.type_enums import TimeGranularity
from dbt_semantic_interfaces.validations.semantic_manifest_validator import (
@@ -23,7 +36,7 @@ from dbt_semantic_interfaces.validations.semantic_manifest_validator import (
class SemanticManifest:
def __init__(self, manifest) -> None:
def __init__(self, manifest: Manifest) -> None:
self.manifest = manifest
def validate(self) -> bool:
@@ -59,8 +72,50 @@ class SemanticManifest:
write_file(file_path, json)
def _get_pydantic_semantic_manifest(self) -> PydanticSemanticManifest:
pydantic_time_spines: List[PydanticTimeSpine] = []
minimum_time_spine_granularity: Optional[TimeGranularity] = None
for node in self.manifest.nodes.values():
if not (isinstance(node, ModelNode) and node.time_spine):
continue
time_spine = node.time_spine
standard_granularity_column = None
for column in node.columns.values():
if column.name == time_spine.standard_granularity_column:
standard_granularity_column = column
break
# Assertions needed for type checking
if not standard_granularity_column:
raise ParsingError(
"Expected to find time spine standard granularity column in model columns, but did not. "
"This should have been caught in YAML parsing."
)
if not standard_granularity_column.granularity:
raise ParsingError(
"Expected to find granularity set for time spine standard granularity column, but did not. "
"This should have been caught in YAML parsing."
)
pydantic_time_spine = PydanticTimeSpine(
node_relation=PydanticNodeRelation(
alias=node.alias,
schema_name=node.schema,
database=node.database,
relation_name=node.relation_name,
),
primary_column=PydanticTimeSpinePrimaryColumn(
name=time_spine.standard_granularity_column,
time_granularity=standard_granularity_column.granularity,
),
)
pydantic_time_spines.append(pydantic_time_spine)
if (
not minimum_time_spine_granularity
or standard_granularity_column.granularity.to_int()
< minimum_time_spine_granularity.to_int()
):
minimum_time_spine_granularity = standard_granularity_column.granularity
project_config = PydanticProjectConfiguration(
time_spine_table_configurations=[],
time_spine_table_configurations=[], time_spines=pydantic_time_spines
)
pydantic_semantic_manifest = PydanticSemanticManifest(
metrics=[], semantic_models=[], project_configuration=project_config
@@ -79,24 +134,39 @@ class SemanticManifest:
PydanticSavedQuery.parse_obj(saved_query.to_dict())
)
# Look for time-spine table model and create time spine table configuration
if self.manifest.semantic_models:
# Get model for time_spine_table
model = self.manifest.ref_lookup.find(TIME_SPINE_MODEL_NAME, None, None, self.manifest)
if not model:
raise ParsingError(
"The semantic layer requires a 'metricflow_time_spine' model in the project, but none was found. "
"Guidance on creating this model can be found on our docs site ("
"https://docs.getdbt.com/docs/build/metricflow-time-spine) "
)
# Create time_spine_table_config, set it in project_config, and add to semantic manifest
time_spine_table_config = PydanticTimeSpineTableConfiguration(
location=model.relation_name,
column_name="date_day",
grain=TimeGranularity.DAY,
legacy_time_spine_model = self.manifest.ref_lookup.find(
LEGACY_TIME_SPINE_MODEL_NAME, None, None, self.manifest
)
pydantic_semantic_manifest.project_configuration.time_spine_table_configurations = [
time_spine_table_config
]
if legacy_time_spine_model:
if (
not minimum_time_spine_granularity
or LEGACY_TIME_SPINE_GRANULARITY.to_int()
< minimum_time_spine_granularity.to_int()
):
minimum_time_spine_granularity = LEGACY_TIME_SPINE_GRANULARITY
# If no time spines have been configured at DAY or smaller AND legacy time spine model does not exist, error.
if (
not minimum_time_spine_granularity
or minimum_time_spine_granularity.to_int()
> MINIMUM_REQUIRED_TIME_SPINE_GRANULARITY.to_int()
):
raise ParsingError(
"The semantic layer requires a time spine model with granularity DAY or smaller in the project, "
"but none was found. Guidance on creating this model can be found on our docs site "
"(https://docs.getdbt.com/docs/build/metricflow-time-spine)."
)
# For backward compatibility: if legacy time spine exists, include it in the manifest.
if legacy_time_spine_model:
legacy_time_spine = LegacyTimeSpine(
location=legacy_time_spine_model.relation_name,
column_name="date_day",
grain=LEGACY_TIME_SPINE_GRANULARITY,
)
pydantic_semantic_manifest.project_configuration.time_spine_table_configurations = [
legacy_time_spine
]
return pydantic_semantic_manifest

View File

@@ -116,6 +116,7 @@ class HasColumnAndTestProps(HasColumnProps):
class UnparsedColumn(HasColumnAndTestProps):
quote: Optional[bool] = None
tags: List[str] = field(default_factory=list)
granularity: Optional[str] = None # str is really a TimeGranularity Enum
@dataclass
@@ -206,6 +207,11 @@ class UnparsedNodeUpdate(HasConfig, HasColumnTests, HasColumnAndTestProps, HasYa
access: Optional[str] = None
@dataclass
class UnparsedTimeSpine(dbtClassMixin):
standard_granularity_column: str
@dataclass
class UnparsedModelUpdate(UnparsedNodeUpdate):
quote_columns: Optional[bool] = None
@@ -213,6 +219,7 @@ class UnparsedModelUpdate(UnparsedNodeUpdate):
latest_version: Optional[NodeVersion] = None
versions: Sequence[UnparsedVersion] = field(default_factory=list)
deprecation_date: Optional[datetime.datetime] = None
time_spine: Optional[UnparsedTimeSpine] = None
def __post_init__(self) -> None:
if self.latest_version:
@@ -234,6 +241,26 @@ class UnparsedModelUpdate(UnparsedNodeUpdate):
self.deprecation_date = normalize_date(self.deprecation_date)
if self.time_spine:
columns = (
self.get_columns_for_version(self.latest_version)
if self.latest_version
else self.columns
)
column_names_to_columns = {column.name: column for column in columns}
if self.time_spine.standard_granularity_column not in column_names_to_columns:
raise ParsingError(
f"Time spine standard granularity column must be defined on the model. Got invalid "
f"column name '{self.time_spine.standard_granularity_column}' for model '{self.name}'. Valid names"
f"{' for latest version' if self.latest_version else ''}: {list(column_names_to_columns.keys())}."
)
column = column_names_to_columns[self.time_spine.standard_granularity_column]
if not column.granularity:
raise ParsingError(
f"Time spine standard granularity column must have a granularity defined. Please add one for "
f"column '{self.time_spine.standard_granularity_column}' in model '{self.name}'."
)
def get_columns_for_version(self, version: NodeVersion) -> List[UnparsedColumn]:
if version not in self._version_map:
raise DbtInternalError(
@@ -563,6 +590,7 @@ class UnparsedMetric(dbtClassMixin):
description: str = ""
# Note: `Union` must be the outermost part of the type annotation for serialization to work properly.
filter: Union[str, List[str], None] = None
time_granularity: Optional[str] = None
# metadata: Optional[Unparsedetadata] = None # TODO
meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)

View File

@@ -5,7 +5,6 @@ from mashumaro.jsonschema.annotations import Pattern
from mashumaro.types import SerializableType
from typing_extensions import Annotated
from dbt import deprecations
from dbt.adapters.contracts.connection import QueryComment
from dbt.contracts.util import Identifier, list_str
from dbt_common.contracts.util import Mergeable
@@ -259,6 +258,7 @@ class Project(dbtClassMixin):
query_comment: Optional[Union[QueryComment, NoValue, str]] = field(default_factory=NoValue)
restrict_access: bool = False
dbt_cloud: Optional[Dict[str, Any]] = None
flags: Dict[str, Any] = field(default_factory=dict)
class Config(dbtMashConfig):
# These tell mashumaro to use aliases for jsonschema and for "from_dict"
@@ -312,10 +312,6 @@ class Project(dbtClassMixin):
raise ValidationError(
"Invalid project config: cannot have both 'tests' and 'data_tests' defined"
)
if "tests" in data:
deprecations.warn(
"project-test-config", deprecated_path="tests", exp_path="data_tests"
)
@dataclass

View File

@@ -29,7 +29,8 @@ class RemoteCompileResult(RemoteCompileResultMixin):
generated_at: datetime = field(default_factory=datetime.utcnow)
@property
def error(self):
def error(self) -> None:
# TODO: Can we delete this? It's never set anywhere else and never accessed
return None
@@ -40,7 +41,7 @@ class RemoteExecutionResult(ExecutionResult):
args: Dict[str, Any] = field(default_factory=dict)
generated_at: datetime = field(default_factory=datetime.utcnow)
def write(self, path: str):
def write(self, path: str) -> None:
writable = RunResultsArtifact.from_execution_results(
generated_at=self.generated_at,
results=self.results,

View File

@@ -1,9 +1,9 @@
import abc
from typing import ClassVar, Dict, List, Optional, Set
from typing import Callable, ClassVar, Dict, List, Optional, Set
import dbt.tracking
from dbt.events import types as core_types
from dbt_common.events.functions import fire_event, warn_or_error
from dbt_common.events.functions import warn_or_error
class DBTDeprecation:
@@ -98,24 +98,10 @@ class CollectFreshnessReturnSignature(DBTDeprecation):
_event = "CollectFreshnessReturnSignature"
class TestsConfigDeprecation(DBTDeprecation):
_name = "project-test-config"
_event = "TestsConfigDeprecation"
class ProjectFlagsMovedDeprecation(DBTDeprecation):
_name = "project-flags-moved"
_event = "ProjectFlagsMovedDeprecation"
def show(self, *args, **kwargs) -> None:
if self.name not in active_deprecations:
event = self.event(**kwargs)
# We can't do warn_or_error because the ProjectFlags
# is where that is set up and we're just reading it.
fire_event(event)
self.track_deprecation_warn()
active_deprecations.add(self.name)
class PackageMaterializationOverrideDeprecation(DBTDeprecation):
_name = "package-materialization-override"
@@ -147,7 +133,7 @@ def renamed_env_var(old_name: str, new_name: str):
return cb
def warn(name, *args, **kwargs):
def warn(name: str, *args, **kwargs) -> None:
if name not in deprecations:
# this should (hopefully) never happen
raise RuntimeError("Error showing deprecation warning: {}".format(name))
@@ -155,6 +141,13 @@ def warn(name, *args, **kwargs):
deprecations[name].show(*args, **kwargs)
def buffer(name: str, *args, **kwargs):
def show_callback():
deprecations[name].show(*args, **kwargs)
buffered_deprecations.append(show_callback)
# these are globally available
# since modules are only imported once, active_deprecations is a singleton
@@ -169,7 +162,6 @@ deprecations_list: List[DBTDeprecation] = [
ConfigLogPathDeprecation(),
ConfigTargetPathDeprecation(),
CollectFreshnessReturnSignature(),
TestsConfigDeprecation(),
ProjectFlagsMovedDeprecation(),
PackageMaterializationOverrideDeprecation(),
ResourceNamesWithSpacesDeprecation(),
@@ -178,6 +170,13 @@ deprecations_list: List[DBTDeprecation] = [
deprecations: Dict[str, DBTDeprecation] = {d.name: d for d in deprecations_list}
buffered_deprecations: List[Callable] = []
def reset_deprecations():
active_deprecations.clear()
def fire_buffered_deprecations():
[dep_fn() for dep_fn in buffered_deprecations]
buffered_deprecations.clear()

View File

@@ -1610,6 +1610,17 @@ message CompiledNodeMsg {
CompiledNode data = 2;
}
// Q043
message SnapshotTimestampWarning {
string snapshot_time_data_type = 1;
string updated_at_data_type = 2;
}
message SnapshotTimestampWarningMsg {
CoreEventInfo info = 1;
SnapshotTimestampWarning data = 2;
}
// W - Node testing
// Skipped W001
@@ -1809,12 +1820,19 @@ message ServingDocsExitInfoMsg {
ServingDocsExitInfo data = 2;
}
message Group {
string name = 1;
string package_name = 3;
map<string, string> owner = 7;
}
// Z021
message RunResultWarning {
string resource_type = 1;
string node_name = 2;
string path = 3;
NodeInfo node_info = 4;
Group group = 5;
}
message RunResultWarningMsg {
@@ -1828,6 +1846,7 @@ message RunResultFailure {
string node_name = 2;
string path = 3;
NodeInfo node_info = 4;
Group group = 5;
}
message RunResultFailureMsg {
@@ -1849,6 +1868,7 @@ message StatsLineMsg {
message RunResultError {
string msg = 1;
NodeInfo node_info = 2;
Group group = 3;
}
message RunResultErrorMsg {

File diff suppressed because one or more lines are too long

View File

@@ -74,9 +74,7 @@ def setup_event_logger(flags, callbacks: List[Callable[[EventMsg], None]] = [])
log_level = (
EventLevel.ERROR
if flags.QUIET
else EventLevel.DEBUG
if flags.DEBUG
else EventLevel(flags.LOG_LEVEL)
else EventLevel.DEBUG if flags.DEBUG else EventLevel(flags.LOG_LEVEL)
)
console_config = get_stdout_config(
line_format,

View File

@@ -388,6 +388,9 @@ class ConfigTargetPathDeprecation(WarnLevel):
return line_wrap_message(warning_tag(f"Deprecated functionality\n\n{description}"))
# Note: this deprecation has been removed, but we are leaving
# the event class here, because users may have specified it in
# warn_error_options.
class TestsConfigDeprecation(WarnLevel):
def code(self) -> str:
return "D012"
@@ -1614,6 +1617,18 @@ class CompiledNode(InfoLevel):
return f"Compiled node '{self.node_name}' is:\n{self.compiled}"
class SnapshotTimestampWarning(WarnLevel):
def code(self) -> str:
return "Q043"
def message(self) -> str:
return (
f"Data type of snapshot table timestamp columns ({self.snapshot_time_data_type}) "
f"doesn't match derived column 'updated_at' ({self.updated_at_data_type}). "
"Please update snapshot config 'updated_at'."
)
# =======================================================
# W - Node testing
# =======================================================

View File

@@ -136,6 +136,18 @@ class GraphDependencyNotFoundError(CompilationError):
return msg
class ForeignKeyConstraintToSyntaxError(CompilationError):
def __init__(self, node, expression: str) -> None:
self.expression = expression
self.node = node
super().__init__(msg=self.get_message())
def get_message(self) -> str:
msg = f"'{self.node.unique_id}' defines a foreign key constraint 'to' expression which is not valid 'ref' or 'source' syntax: {self.expression}."
return msg
# client level exceptions

View File

@@ -59,18 +59,40 @@ class Graph:
def select_children(
self, selected: Set[UniqueId], max_depth: Optional[int] = None
) -> Set[UniqueId]:
descendants: Set[UniqueId] = set()
for node in selected:
descendants.update(self.descendants(node, max_depth))
return descendants
"""Returns all nodes which are descendants of the 'selected' set.
Nodes in the 'selected' set are counted as children only if
they are descendants of other nodes in the 'selected' set."""
children: Set[UniqueId] = set()
i = 0
while len(selected) > 0 and (max_depth is None or i < max_depth):
next_layer: Set[UniqueId] = set()
for node in selected:
next_layer.update(self.descendants(node, 1))
next_layer = next_layer - children # Avoid re-searching
children.update(next_layer)
selected = next_layer
i += 1
return children
def select_parents(
self, selected: Set[UniqueId], max_depth: Optional[int] = None
) -> Set[UniqueId]:
ancestors: Set[UniqueId] = set()
for node in selected:
ancestors.update(self.ancestors(node, max_depth))
return ancestors
"""Returns all nodes which are ancestors of the 'selected' set.
Nodes in the 'selected' set are counted as parents only if
they are ancestors of other nodes in the 'selected' set."""
parents: Set[UniqueId] = set()
i = 0
while len(selected) > 0 and (max_depth is None or i < max_depth):
next_layer: Set[UniqueId] = set()
for node in selected:
next_layer.update(self.ancestors(node, 1))
next_layer = next_layer - parents # Avoid re-searching
parents.update(next_layer)
selected = next_layer
i += 1
return parents
def select_successors(self, selected: Set[UniqueId]) -> Set[UniqueId]:
successors: Set[UniqueId] = set()

View File

@@ -87,12 +87,15 @@ class NodeSelector(MethodManager):
)
return set(), set()
neighbors = self.collect_specified_neighbors(spec, collected)
selected = collected | neighbors
# if --indirect-selection EMPTY, do not expand to adjacent tests
if spec.indirect_selection == IndirectSelection.Empty:
return collected, set()
return selected, set()
else:
neighbors = self.collect_specified_neighbors(spec, collected)
direct_nodes, indirect_nodes = self.expand_selection(
selected=(collected | neighbors), indirect_selection=spec.indirect_selection
selected=selected, indirect_selection=spec.indirect_selection
)
return direct_nodes, indirect_nodes
@@ -177,10 +180,14 @@ class NodeSelector(MethodManager):
node = self.manifest.nodes[unique_id]
if self.include_empty_nodes:
return node.config.enabled
return node.config.enabled
def _is_empty_node(self, unique_id: UniqueId) -> bool:
if unique_id in self.manifest.nodes:
node = self.manifest.nodes[unique_id]
return node.empty
else:
return not node.empty and node.config.enabled
return False
def node_is_match(self, node: GraphMemberNode) -> bool:
"""Determine if a node is a match for the selector. Non-match nodes
@@ -212,7 +219,12 @@ class NodeSelector(MethodManager):
"""Return the subset of selected nodes that is a match for this
selector.
"""
return {unique_id for unique_id in selected if self._is_match(unique_id)}
return {
unique_id
for unique_id in selected
if self._is_match(unique_id)
and (self.include_empty_nodes or not self._is_empty_node(unique_id))
}
def expand_selection(
self,

View File

@@ -7,7 +7,7 @@ from dbt import hooks, utils
from dbt.adapters.factory import get_adapter # noqa: F401
from dbt.artifacts.resources import Contract
from dbt.clients.jinja import MacroGenerator, get_rendered
from dbt.config import Project, RuntimeConfig
from dbt.config import RuntimeConfig
from dbt.context.context_config import ContextConfig
from dbt.context.providers import (
generate_generate_name_macro_context,
@@ -39,9 +39,9 @@ ConfiguredBlockType = TypeVar("ConfiguredBlockType", bound=FileBlock)
class BaseParser(Generic[FinalValue]):
def __init__(self, project: Project, manifest: Manifest) -> None:
self.project = project
self.manifest = manifest
def __init__(self, project: RuntimeConfig, manifest: Manifest) -> None:
self.project: RuntimeConfig = project
self.manifest: Manifest = manifest
@abc.abstractmethod
def parse_file(self, block: FileBlock) -> None:
@@ -63,7 +63,7 @@ class BaseParser(Generic[FinalValue]):
class Parser(BaseParser[FinalValue], Generic[FinalValue]):
def __init__(
self,
project: Project,
project: RuntimeConfig,
manifest: Manifest,
root_project: RuntimeConfig,
) -> None:
@@ -72,6 +72,7 @@ class Parser(BaseParser[FinalValue], Generic[FinalValue]):
class RelationUpdate:
# "component" is database, schema or alias
def __init__(self, config: RuntimeConfig, manifest: Manifest, component: str) -> None:
default_macro = manifest.find_generate_macro_by_name(
component=component,
@@ -121,12 +122,13 @@ class ConfiguredParser(
):
def __init__(
self,
project: Project,
project: RuntimeConfig,
manifest: Manifest,
root_project: RuntimeConfig,
) -> None:
super().__init__(project, manifest, root_project)
# this sets callables from RelationUpdate
self._update_node_database = RelationUpdate(
manifest=manifest, config=root_project, component="database"
)
@@ -288,7 +290,10 @@ class ConfiguredParser(
self._update_node_schema(parsed_node, config_dict.get("schema"))
self._update_node_alias(parsed_node, config_dict.get("alias"))
# Snapshot nodes use special "target_database" and "target_schema" fields for some reason
# Snapshot nodes use special "target_database" and "target_schema" fields
# for backward compatibility
# We have to do getattr here because saved_query parser calls this method with
# Export object instead of a node.
if getattr(parsed_node, "resource_type", None) == NodeType.Snapshot:
if "target_database" in config_dict and config_dict["target_database"]:
parsed_node.database = config_dict["target_database"]
@@ -443,9 +448,8 @@ class ConfiguredParser(
fqn=fqn,
)
self.render_update(node, config)
result = self.transform(node)
self.add_result_node(block, result)
return result
self.add_result_node(block, node)
return node
def _update_node_relation_name(self, node: ManifestNode):
# Seed and Snapshot nodes and Models that are not ephemeral,
@@ -464,17 +468,12 @@ class ConfiguredParser(
def parse_file(self, file_block: FileBlock) -> None:
pass
@abc.abstractmethod
def transform(self, node: FinalNode) -> FinalNode:
pass
class SimpleParser(
ConfiguredParser[ConfiguredBlockType, FinalNode],
Generic[ConfiguredBlockType, FinalNode],
):
def transform(self, node):
return node
pass
class SQLParser(ConfiguredParser[FileBlock, FinalNode], Generic[FinalNode]):
@@ -483,5 +482,4 @@ class SQLParser(ConfiguredParser[FileBlock, FinalNode], Generic[FinalNode]):
class SimpleSQLParser(SQLParser[FinalNode]):
def transform(self, node):
return node
pass

View File

@@ -18,6 +18,7 @@ from dbt.exceptions import ParsingError
from dbt.parser.search import FileBlock
from dbt_common.contracts.constraints import ColumnLevelConstraint, ConstraintType
from dbt_common.exceptions import DbtInternalError
from dbt_semantic_interfaces.type_enums import TimeGranularity
def trimmed(inp: str) -> str:
@@ -185,13 +186,12 @@ class ParserRef:
self.column_info: Dict[str, ColumnInfo] = {}
def _add(self, column: HasColumnProps) -> None:
tags: List[str] = []
tags.extend(getattr(column, "tags", ()))
quote: Optional[bool]
tags: List[str] = getattr(column, "tags", [])
quote: Optional[bool] = None
granularity: Optional[TimeGranularity] = None
if isinstance(column, UnparsedColumn):
quote = column.quote
else:
quote = None
granularity = TimeGranularity(column.granularity) if column.granularity else None
if any(
c
@@ -209,6 +209,7 @@ class ParserRef:
tags=tags,
quote=quote,
_extra=column.extra,
granularity=granularity,
)
@classmethod

View File

@@ -66,8 +66,6 @@ class HookSearcher(Iterable[HookBlock]):
class HookParser(SimpleParser[HookBlock, HookNode]):
def transform(self, node):
return node
# Hooks are only in the dbt_project.yml file for the project
def get_path(self) -> FilePath:

View File

@@ -222,12 +222,12 @@ class ManifestLoader:
def __init__(
self,
root_project: RuntimeConfig,
all_projects: Mapping[str, Project],
all_projects: Mapping[str, RuntimeConfig],
macro_hook: Optional[Callable[[Manifest], Any]] = None,
file_diff: Optional[FileDiff] = None,
) -> None:
self.root_project: RuntimeConfig = root_project
self.all_projects: Mapping[str, Project] = all_projects
self.all_projects: Mapping[str, RuntimeConfig] = all_projects
self.file_diff = file_diff
self.manifest: Manifest = Manifest()
self.new_manifest = self.manifest
@@ -669,7 +669,7 @@ class ManifestLoader:
# 'parser_types'
def parse_project(
self,
project: Project,
project: RuntimeConfig,
parser_files,
parser_types: List[Type[Parser]],
) -> None:
@@ -1028,12 +1028,11 @@ class ManifestLoader:
return state_check
def save_macros_to_adapter(self, adapter):
macro_manifest = MacroManifest(self.manifest.macros)
adapter.set_macro_resolver(macro_manifest)
adapter.set_macro_resolver(self.manifest)
# This executes the callable macro_hook and sets the
# query headers
# This executes the callable macro_hook and sets the query headers
query_header_context = generate_query_header_context(adapter.config, macro_manifest)
query_header_context = generate_query_header_context(adapter.config, self.manifest)
self.macro_hook(query_header_context)
# This creates a MacroManifest which contains the macros in

View File

@@ -204,7 +204,7 @@ class ModelParser(SimpleSQLParser[ModelNode]):
dbt_parser = PythonParseVisitor(node)
dbt_parser.visit(tree)
for (func, args, kwargs) in dbt_parser.dbt_function_calls:
for func, args, kwargs in dbt_parser.dbt_function_calls:
if func == "get":
num_args = len(args)
if num_args == 0:

View File

@@ -968,13 +968,17 @@ class PartialParsing:
elif unique_id in self.saved_manifest.disabled:
self.delete_disabled(unique_id, schema_file.file_id)
metrics = schema_file.generated_metrics.copy()
for unique_id in metrics:
if unique_id in self.saved_manifest.metrics:
self.saved_manifest.metrics.pop(unique_id)
schema_file.generated_metrics.remove(unique_id)
elif unique_id in self.saved_manifest.disabled:
self.delete_disabled(unique_id, schema_file.file_id)
if schema_file.generated_metrics:
# If this partial parse file has an old "generated_metrics" list,
# call code to fix it up before processing.
schema_file.fix_metrics_from_measures()
if semantic_model_name in schema_file.metrics_from_measures:
for unique_id in schema_file.metrics_from_measures[semantic_model_name]:
if unique_id in self.saved_manifest.metrics:
self.saved_manifest.metrics.pop(unique_id)
elif unique_id in self.saved_manifest.disabled:
self.delete_disabled(unique_id, schema_file.file_id)
del schema_file.metrics_from_measures[semantic_model_name]
def delete_schema_unit_test(self, schema_file, unit_test_dict):
unit_test_name = unit_test_dict["name"]

View File

@@ -11,6 +11,7 @@ from dbt.config.renderer import BaseRenderer, Keypath
# keyword args are rendered to capture refs in render_test_update.
# Keyword args are finally rendered at compilation time.
# Descriptions are not rendered until 'process_docs'.
# Pre- and post-hooks in configs are late-rendered.
class SchemaYamlRenderer(BaseRenderer):
def __init__(self, context: Dict[str, Any], key: str) -> None:
super().__init__(context)
@@ -43,6 +44,14 @@ class SchemaYamlRenderer(BaseRenderer):
if len(keypath) == 2 and keypath[1] in ("tests", "data_tests", "description"):
return True
# pre- and post-hooks
if (
len(keypath) >= 2
and keypath[0] == "config"
and keypath[1] in ("pre_hook", "post_hook")
):
return True
# versions
if len(keypath) == 5 and keypath[4] == "description":
return True

View File

@@ -31,6 +31,7 @@ from dbt.context.providers import (
generate_parse_exposure,
generate_parse_semantic_models,
)
from dbt.contracts.files import SchemaSourceFile
from dbt.contracts.graph.nodes import Exposure, Group, Metric, SavedQuery, SemanticModel
from dbt.contracts.graph.unparsed import (
UnparsedConversionTypeParams,
@@ -85,7 +86,7 @@ class ExposureParser(YamlReader):
self.schema_parser = schema_parser
self.yaml = yaml
def parse_exposure(self, unparsed: UnparsedExposure):
def parse_exposure(self, unparsed: UnparsedExposure) -> None:
package_name = self.project.project_name
unique_id = f"{NodeType.Exposure}.{package_name}.{unparsed.name}"
path = self.yaml.path.relative_path
@@ -143,6 +144,7 @@ class ExposureParser(YamlReader):
get_rendered(depends_on_jinja, ctx, parsed, capture_macros=True)
# parsed now has a populated refs/sources/metrics
assert isinstance(self.yaml.file, SchemaSourceFile)
if parsed.config.enabled:
self.manifest.add_exposure(self.yaml.file, parsed)
else:
@@ -171,7 +173,7 @@ class ExposureParser(YamlReader):
patch_config_dict=precedence_configs,
)
def parse(self):
def parse(self) -> None:
for data in self.get_key_dicts():
try:
UnparsedExposure.validate(data)
@@ -387,7 +389,7 @@ class MetricParser(YamlReader):
# input_measures=?,
)
def parse_metric(self, unparsed: UnparsedMetric, generated: bool = False):
def parse_metric(self, unparsed: UnparsedMetric, generated_from: Optional[str] = None) -> None:
package_name = self.project.project_name
unique_id = f"{NodeType.Metric}.{package_name}.{unparsed.name}"
path = self.yaml.path.relative_path
@@ -433,6 +435,9 @@ class MetricParser(YamlReader):
label=unparsed.label,
type=MetricType(unparsed.type),
type_params=self._get_metric_type_params(unparsed),
time_granularity=(
TimeGranularity(unparsed.time_granularity) if unparsed.time_granularity else None
),
filter=parse_where_filter(unparsed.filter),
meta=unparsed.meta,
tags=unparsed.tags,
@@ -442,8 +447,9 @@ class MetricParser(YamlReader):
)
# if the metric is disabled we do not want it included in the manifest, only in the disabled dict
assert isinstance(self.yaml.file, SchemaSourceFile)
if parsed.config.enabled:
self.manifest.add_metric(self.yaml.file, parsed, generated)
self.manifest.add_metric(self.yaml.file, parsed, generated_from)
else:
self.manifest.add_disabled(self.yaml.file, parsed)
@@ -471,7 +477,7 @@ class MetricParser(YamlReader):
)
return config
def parse(self):
def parse(self) -> None:
for data in self.get_key_dicts():
try:
UnparsedMetric.validate(data)
@@ -488,7 +494,7 @@ class GroupParser(YamlReader):
self.schema_parser = schema_parser
self.yaml = yaml
def parse_group(self, unparsed: UnparsedGroup):
def parse_group(self, unparsed: UnparsedGroup) -> None:
package_name = self.project.project_name
unique_id = f"{NodeType.Group}.{package_name}.{unparsed.name}"
path = self.yaml.path.relative_path
@@ -503,6 +509,7 @@ class GroupParser(YamlReader):
owner=unparsed.owner,
)
assert isinstance(self.yaml.file, SchemaSourceFile)
self.manifest.add_group(self.yaml.file, parsed)
def parse(self):
@@ -597,10 +604,15 @@ class SemanticModelParser(YamlReader):
)
return measures
def _create_metric(self, measure: UnparsedMeasure, enabled: bool) -> None:
def _create_metric(
self,
measure: UnparsedMeasure,
enabled: bool,
semantic_model_name: str,
) -> None:
unparsed_metric = UnparsedMetric(
name=measure.name,
label=measure.name,
label=measure.label or measure.name,
type="simple",
type_params=UnparsedMetricTypeParams(measure=measure.name, expr=measure.name),
description=measure.description or f"Metric created from measure {measure.name}",
@@ -608,7 +620,7 @@ class SemanticModelParser(YamlReader):
)
parser = MetricParser(self.schema_parser, yaml=self.yaml)
parser.parse_metric(unparsed=unparsed_metric, generated=True)
parser.parse_metric(unparsed=unparsed_metric, generated_from=semantic_model_name)
def _generate_semantic_model_config(
self, target: UnparsedSemanticModel, fqn: List[str], package_name: str, rendered: bool
@@ -635,7 +647,7 @@ class SemanticModelParser(YamlReader):
return config
def parse_semantic_model(self, unparsed: UnparsedSemanticModel):
def parse_semantic_model(self, unparsed: UnparsedSemanticModel) -> None:
package_name = self.project.project_name
unique_id = f"{NodeType.SemanticModel}.{package_name}.{unparsed.name}"
path = self.yaml.path.relative_path
@@ -695,6 +707,7 @@ class SemanticModelParser(YamlReader):
# if the semantic model is disabled we do not want it included in the manifest,
# only in the disabled dict
assert isinstance(self.yaml.file, SchemaSourceFile)
if parsed.config.enabled:
self.manifest.add_semantic_model(self.yaml.file, parsed)
else:
@@ -703,9 +716,11 @@ class SemanticModelParser(YamlReader):
# Create a metric for each measure with `create_metric = True`
for measure in unparsed.measures:
if measure.create_metric is True:
self._create_metric(measure=measure, enabled=parsed.config.enabled)
self._create_metric(
measure=measure, enabled=parsed.config.enabled, semantic_model_name=parsed.name
)
def parse(self):
def parse(self) -> None:
for data in self.get_key_dicts():
try:
UnparsedSemanticModel.validate(data)
@@ -763,7 +778,9 @@ class SavedQueryParser(YamlReader):
self, unparsed: UnparsedExport, saved_query_config: SavedQueryConfig
) -> Export:
return Export(
name=unparsed.name, config=self._get_export_config(unparsed.config, saved_query_config)
name=unparsed.name,
config=self._get_export_config(unparsed.config, saved_query_config),
unrendered_config=unparsed.config,
)
def _get_query_params(self, unparsed: UnparsedQueryParams) -> QueryParams:
@@ -831,6 +848,7 @@ class SavedQueryParser(YamlReader):
delattr(export, "relation_name")
# Only add thes saved query if it's enabled, otherwise we track it with other diabled nodes
assert isinstance(self.yaml.file, SchemaSourceFile)
if parsed.config.enabled:
self.manifest.add_saved_query(self.yaml.file, parsed)
else:

View File

@@ -4,11 +4,15 @@ from abc import ABCMeta, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Generic, Iterable, List, Optional, Type, TypeVar
from dbt import deprecations
from dbt.artifacts.resources import RefArgs
from dbt.artifacts.resources.v1.model import TimeSpine
from dbt.clients.jinja_static import statically_parse_ref_or_source
from dbt.clients.yaml_helper import load_yaml_text
from dbt.config import RuntimeConfig
from dbt.context.configured import SchemaYamlVars, generate_schema_yml_context
from dbt.context.context_config import ContextConfig
from dbt.contracts.files import SchemaSourceFile
from dbt.contracts.files import SchemaSourceFile, SourceFile
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import (
ModelNode,
ParsedMacroPatch,
@@ -64,18 +68,20 @@ from dbt_common.events.functions import warn_or_error
from dbt_common.exceptions import DbtValidationError
from dbt_common.utils import deep_merge
schema_file_keys = (
"models",
"seeds",
"snapshots",
"sources",
"macros",
"analyses",
"exposures",
"metrics",
"semantic_models",
"saved_queries",
)
schema_file_keys_to_resource_types = {
"models": NodeType.Model,
"seeds": NodeType.Seed,
"snapshots": NodeType.Snapshot,
"sources": NodeType.Source,
"macros": NodeType.Macro,
"analyses": NodeType.Analysis,
"exposures": NodeType.Exposure,
"metrics": NodeType.Metric,
"semantic_models": NodeType.SemanticModel,
"saved_queries": NodeType.SavedQuery,
}
schema_file_keys = list(schema_file_keys_to_resource_types.keys())
# ===============================================================================
@@ -142,9 +148,9 @@ def yaml_from_file(source_file: SchemaSourceFile) -> Optional[Dict[str, Any]]:
class SchemaParser(SimpleParser[YamlBlock, ModelNode]):
def __init__(
self,
project,
manifest,
root_project,
project: RuntimeConfig,
manifest: Manifest,
root_project: RuntimeConfig,
) -> None:
super().__init__(project, manifest, root_project)
@@ -282,33 +288,33 @@ class ParseResult:
# PatchParser, SemanticModelParser, SavedQueryParser, UnitTestParser
class YamlReader(metaclass=ABCMeta):
def __init__(self, schema_parser: SchemaParser, yaml: YamlBlock, key: str) -> None:
self.schema_parser = schema_parser
self.schema_parser: SchemaParser = schema_parser
# key: models, seeds, snapshots, sources, macros,
# analyses, exposures, unit_tests
self.key = key
self.yaml = yaml
self.schema_yaml_vars = SchemaYamlVars()
self.key: str = key
self.yaml: YamlBlock = yaml
self.schema_yaml_vars: SchemaYamlVars = SchemaYamlVars()
self.render_ctx = generate_schema_yml_context(
self.schema_parser.root_project,
self.schema_parser.project.project_name,
self.schema_yaml_vars,
)
self.renderer = SchemaYamlRenderer(self.render_ctx, self.key)
self.renderer: SchemaYamlRenderer = SchemaYamlRenderer(self.render_ctx, self.key)
@property
def manifest(self):
def manifest(self) -> Manifest:
return self.schema_parser.manifest
@property
def project(self):
def project(self) -> RuntimeConfig:
return self.schema_parser.project
@property
def default_database(self):
def default_database(self) -> str:
return self.schema_parser.default_database
@property
def root_project(self):
def root_project(self) -> RuntimeConfig:
return self.schema_parser.root_project
# for the different schema subparsers ('models', 'source', etc)
@@ -360,7 +366,7 @@ class YamlReader(metaclass=ABCMeta):
return dct
@abstractmethod
def parse(self) -> ParseResult:
def parse(self) -> Optional[ParseResult]:
raise NotImplementedError("parse is abstract")
@@ -425,7 +431,9 @@ class SourceParser(YamlReader):
fqn=fqn,
name=f"{source.name}_{table.name}",
)
self.manifest.add_source(self.yaml.file, source_def)
assert isinstance(self.yaml.file, SchemaSourceFile)
source_file: SchemaSourceFile = self.yaml.file
self.manifest.add_source(source_file, source_def)
# This class has two subclasses: NodePatchParser and MacroPatchParser
@@ -515,7 +523,7 @@ class PatchParser(YamlReader, Generic[NonSourceTarget, Parsed]):
# We want to raise an error if some attributes are in two places, and move them
# from toplevel to config if necessary
def normalize_attribute(self, data, path, attribute):
def normalize_attribute(self, data, path, attribute) -> None:
if attribute in data:
if "config" in data and attribute in data["config"]:
raise ParsingError(
@@ -529,42 +537,36 @@ class PatchParser(YamlReader, Generic[NonSourceTarget, Parsed]):
data["config"] = {}
data["config"][attribute] = data.pop(attribute)
def normalize_meta_attribute(self, data, path):
def normalize_meta_attribute(self, data, path) -> None:
return self.normalize_attribute(data, path, "meta")
def normalize_docs_attribute(self, data, path):
def normalize_docs_attribute(self, data, path) -> None:
return self.normalize_attribute(data, path, "docs")
def normalize_group_attribute(self, data, path):
def normalize_group_attribute(self, data, path) -> None:
return self.normalize_attribute(data, path, "group")
def normalize_contract_attribute(self, data, path):
def normalize_contract_attribute(self, data, path) -> None:
return self.normalize_attribute(data, path, "contract")
def normalize_access_attribute(self, data, path):
def normalize_access_attribute(self, data, path) -> None:
return self.normalize_attribute(data, path, "access")
@property
def is_root_project(self):
def is_root_project(self) -> bool:
if self.root_project.project_name == self.project.project_name:
return True
return False
def validate_data_tests(self, data):
def validate_data_tests(self, data) -> None:
# Rename 'tests' -> 'data_tests' at both model-level and column-level
# Raise a validation error if the user has defined both names
def validate_and_rename(data, is_root_project: bool):
def validate_and_rename(data, is_root_project: bool) -> None:
if data.get("tests"):
if "tests" in data and "data_tests" in data:
raise ValidationError(
"Invalid test config: cannot have both 'tests' and 'data_tests' defined"
)
if is_root_project:
deprecations.warn(
"project-test-config",
deprecated_path="tests",
exp_path="data_tests",
)
data["data_tests"] = data.pop("tests")
# model-level tests
@@ -583,7 +585,7 @@ class PatchParser(YamlReader, Generic[NonSourceTarget, Parsed]):
for column in version["columns"]:
validate_and_rename(column, self.is_root_project)
def patch_node_config(self, node, patch):
def patch_node_config(self, node, patch) -> None:
if "access" in patch.config:
if AccessType.is_valid(patch.config["access"]):
patch.config["access"] = AccessType(patch.config["access"])
@@ -613,9 +615,16 @@ class NodePatchParser(PatchParser[NodeTarget, ParsedNodePatch], Generic[NodeTarg
# could possibly skip creating one. Leaving here for now for
# code consistency.
deprecation_date: Optional[datetime.datetime] = None
time_spine: Optional[TimeSpine] = None
if isinstance(block.target, UnparsedModelUpdate):
deprecation_date = block.target.deprecation_date
time_spine = (
TimeSpine(
standard_granularity_column=block.target.time_spine.standard_granularity_column
)
if block.target.time_spine
else None
)
patch = ParsedNodePatch(
name=block.target.name,
original_file_path=block.target.original_file_path,
@@ -631,6 +640,7 @@ class NodePatchParser(PatchParser[NodeTarget, ParsedNodePatch], Generic[NodeTarg
latest_version=None,
constraints=block.target.constraints,
deprecation_date=deprecation_date,
time_spine=time_spine,
)
assert isinstance(self.yaml.file, SchemaSourceFile)
source_file: SchemaSourceFile = self.yaml.file
@@ -663,7 +673,10 @@ class NodePatchParser(PatchParser[NodeTarget, ParsedNodePatch], Generic[NodeTarg
# handle disabled nodes
if unique_id is None:
# Node might be disabled. Following call returns list of matching disabled nodes
found_nodes = self.manifest.disabled_lookup.find(patch.name, patch.package_name)
resource_type = schema_file_keys_to_resource_types[patch.yaml_key]
found_nodes = self.manifest.disabled_lookup.find(
patch.name, patch.package_name, resource_types=[resource_type]
)
if found_nodes:
if len(found_nodes) > 1 and patch.config.get("enabled"):
# There are multiple disabled nodes for this model and the schema file wants to enable one.
@@ -713,7 +726,7 @@ class NodePatchParser(PatchParser[NodeTarget, ParsedNodePatch], Generic[NodeTarg
self.patch_node_properties(node, patch)
def patch_node_properties(self, node, patch: "ParsedNodePatch"):
def patch_node_properties(self, node, patch: "ParsedNodePatch") -> None:
"""Given a ParsedNodePatch, add the new information to the node."""
# explicitly pick out the parts to update so we don't inadvertently
# step on the model name or anything
@@ -784,7 +797,7 @@ class ModelPatchParser(NodePatchParser[UnparsedModelUpdate]):
versioned_model_name, target.package_name, None
)
versioned_model_node = None
versioned_model_node: Optional[ModelNode] = None
add_node_nofile_fn: Callable
# If this is the latest version, it's allowed to define itself in a model file name that doesn't have a suffix
@@ -795,7 +808,9 @@ class ModelPatchParser(NodePatchParser[UnparsedModelUpdate]):
if versioned_model_unique_id is None:
# Node might be disabled. Following call returns list of matching disabled nodes
found_nodes = self.manifest.disabled_lookup.find(versioned_model_name, None)
found_nodes = self.manifest.disabled_lookup.find(
versioned_model_name, None, resource_types=[NodeType.Model]
)
if found_nodes:
if len(found_nodes) > 1 and target.config.get("enabled"):
# There are multiple disabled nodes for this model and the schema file wants to enable one.
@@ -808,12 +823,17 @@ class ModelPatchParser(NodePatchParser[UnparsedModelUpdate]):
"in `dbt_project.yml` or in the sql files."
)
raise ParsingError(msg)
versioned_model_node = self.manifest.disabled.pop(
found_nodes[0].unique_id
)[0]
# We know that there's only one node in the disabled list because
# otherwise we would have raised the error above
found_node = found_nodes[0]
self.manifest.disabled.pop(found_node.unique_id)
assert isinstance(found_node, ModelNode)
versioned_model_node = found_node
add_node_nofile_fn = self.manifest.add_disabled_nofile
else:
versioned_model_node = self.manifest.nodes.pop(versioned_model_unique_id)
found_node = self.manifest.nodes.pop(versioned_model_unique_id)
assert isinstance(found_node, ModelNode)
versioned_model_node = found_node
add_node_nofile_fn = self.manifest.add_node_nofile
if versioned_model_node is None:
@@ -832,12 +852,12 @@ class ModelPatchParser(NodePatchParser[UnparsedModelUpdate]):
f"model.{target.package_name}.{target.name}.{unparsed_version.formatted_v}"
)
# update source file.nodes with new unique_id
self.manifest.files[versioned_model_node.file_id].nodes.remove(
versioned_model_node_unique_id_old
)
self.manifest.files[versioned_model_node.file_id].nodes.append(
versioned_model_node.unique_id
)
model_source_file = self.manifest.files[versioned_model_node.file_id]
assert isinstance(model_source_file, SourceFile)
# because of incomplete test setup, check before removing
if versioned_model_node_unique_id_old in model_source_file.nodes:
model_source_file.nodes.remove(versioned_model_node_unique_id_old)
model_source_file.nodes.append(versioned_model_node.unique_id)
# update versioned node fqn
versioned_model_node.fqn[-1] = target.name
@@ -889,8 +909,13 @@ class ModelPatchParser(NodePatchParser[UnparsedModelUpdate]):
def _target_type(self) -> Type[UnparsedModelUpdate]:
return UnparsedModelUpdate
def patch_node_properties(self, node, patch: "ParsedNodePatch"):
def patch_node_properties(self, node, patch: "ParsedNodePatch") -> None:
super().patch_node_properties(node, patch)
# Remaining patch properties are only relevant to ModelNode objects
if not isinstance(node, ModelNode):
return
node.version = patch.version
node.latest_version = patch.latest_version
node.deprecation_date = patch.deprecation_date
@@ -904,9 +929,10 @@ class ModelPatchParser(NodePatchParser[UnparsedModelUpdate]):
)
# These two will have to be reapplied after config is built for versioned models
self.patch_constraints(node, patch.constraints)
self.patch_time_spine(node, patch.time_spine)
node.build_contract_checksum()
def patch_constraints(self, node, constraints):
def patch_constraints(self, node: ModelNode, constraints: List[Dict[str, Any]]) -> None:
contract_config = node.config.get("contract")
if contract_config.enforced is True:
self._validate_constraint_prerequisites(node)
@@ -921,8 +947,33 @@ class ModelPatchParser(NodePatchParser[UnparsedModelUpdate]):
self._validate_pk_constraints(node, constraints)
node.constraints = [ModelLevelConstraint.from_dict(c) for c in constraints]
self._process_constraints_refs_and_sources(node)
def _validate_pk_constraints(self, model_node: ModelNode, constraints: List[Dict[str, Any]]):
def _process_constraints_refs_and_sources(self, model_node: ModelNode) -> None:
"""
Populate model_node.refs and model_node.sources based on foreign-key constraint references,
whether defined at the model-level or column-level.
"""
for constraint in model_node.all_constraints:
if constraint.type == ConstraintType.foreign_key and constraint.to:
try:
ref_or_source = statically_parse_ref_or_source(constraint.to)
except ParsingError:
raise ParsingError(
f"Invalid 'ref' or 'source' syntax on foreign key constraint 'to' on model {model_node.name}: {constraint.to}."
)
if isinstance(ref_or_source, RefArgs):
model_node.refs.append(ref_or_source)
else:
model_node.sources.append(ref_or_source)
def patch_time_spine(self, node: ModelNode, time_spine: Optional[TimeSpine]) -> None:
node.time_spine = time_spine
def _validate_pk_constraints(
self, model_node: ModelNode, constraints: List[Dict[str, Any]]
) -> None:
errors = []
# check for primary key constraints defined at the column level
pk_col: List[str] = []
@@ -955,7 +1006,7 @@ class ModelPatchParser(NodePatchParser[UnparsedModelUpdate]):
+ "\n".join(errors)
)
def _validate_constraint_prerequisites(self, model_node: ModelNode):
def _validate_constraint_prerequisites(self, model_node: ModelNode) -> None:
column_warn_unsupported = [
constraint.warn_unsupported
for column in model_node.columns.values()

View File

@@ -2,12 +2,10 @@ import os
from typing import List
from dbt.contracts.graph.nodes import SnapshotNode
from dbt.exceptions import SnapshopConfigError
from dbt.node_types import NodeType
from dbt.parser.base import SQLParser
from dbt.parser.search import BlockContents, BlockSearcher, FileBlock
from dbt.utils import split_path
from dbt_common.dataclass_schema import ValidationError
class SnapshotParser(SQLParser[SnapshotNode]):
@@ -24,24 +22,6 @@ class SnapshotParser(SQLParser[SnapshotNode]):
def get_compiled_path(cls, block: FileBlock):
return block.path.relative_path
def set_snapshot_attributes(self, node):
# use the target_database setting if we got it, otherwise the
# `database` value of the node (ultimately sourced from the `database`
# config value), and if that is not set, use the database defined in
# the adapter's credentials.
if node.config.target_database:
node.database = node.config.target_database
elif not node.database:
node.database = self.root_project.credentials.database
# the target schema must be set if we got here, so overwrite the node's
# schema
node.schema = node.config.target_schema
# We need to set relation_name again, since database/schema might have changed
self._update_node_relation_name(node)
return node
def get_fqn(self, path: str, name: str) -> List[str]:
"""Get the FQN for the node. This impacts node selection and config
application.
@@ -54,13 +34,6 @@ class SnapshotParser(SQLParser[SnapshotNode]):
fqn.append(name)
return fqn
def transform(self, node: SnapshotNode) -> SnapshotNode:
try:
self.set_snapshot_attributes(node)
return node
except ValidationError as exc:
raise SnapshopConfigError(exc, node)
def parse_file(self, file_block: FileBlock) -> None:
blocks = BlockSearcher(
source=[file_block],

View File

@@ -35,6 +35,8 @@ from dbt.parser.schemas import (
YamlReader,
)
from dbt.utils import get_pseudo_test_path
from dbt_common.events.functions import fire_event
from dbt_common.events.types import SystemStdErr
from dbt_extractor import ExtractionError, py_extract_from_source # type: ignore
@@ -292,6 +294,7 @@ class UnitTestParser(YamlReader):
# for calculating state:modified
unit_test_definition.build_unit_test_checksum()
assert isinstance(self.yaml.file, SchemaSourceFile)
self.manifest.add_unit_test(self.yaml.file, unit_test_definition)
return ParseResult()
@@ -388,6 +391,44 @@ class UnitTestParser(YamlReader):
ut_fixture.fixture, self.project.project_name, unit_test_definition.unique_id
)
# sanitize order of input
if ut_fixture.rows and (
ut_fixture.format == UnitTestFormat.Dict or ut_fixture.format == UnitTestFormat.CSV
):
self._promote_first_non_none_row(ut_fixture)
def _promote_first_non_none_row(self, ut_fixture):
"""
Promote the first row with no None values to the top of the ut_fixture.rows list.
This function modifies the ut_fixture object in place.
Needed for databases like Redshift which uses the first value in a column to determine
the column type. If the first value is None, the type is assumed to be VARCHAR(1).
This leads to obscure type mismatch errors centered on a unit test fixture's `expect`.
See https://github.com/dbt-labs/dbt-redshift/issues/821 for more info.
"""
non_none_row_index = None
# Iterate through each row and its index
for index, row in enumerate(ut_fixture.rows):
# Check if all values in the row are not None
if all(value is not None for value in row.values()):
non_none_row_index = index
break
if non_none_row_index is None:
fire_event(
SystemStdErr(
bmsg="Unit Test fixtures benefit from having at least one row free of Null values to ensure consistent column types. Failure to meet this recommendation can result in type mismatch errors between unit test source models and `expected` fixtures."
)
)
else:
ut_fixture.rows[0], ut_fixture.rows[non_none_row_index] = (
ut_fixture.rows[non_none_row_index],
ut_fixture.rows[0],
)
def get_fixture_file_rows(self, fixture_name, project_name, utdef_unique_id):
# find fixture file object and store unit_test_definition unique_id
fixture = self._get_fixture(fixture_name, project_name)

View File

@@ -17,5 +17,5 @@ class PluginNodes:
def add_model(self, model_args: ModelNodeArgs) -> None:
self.models[model_args.unique_id] = model_args
def update(self, other: "PluginNodes"):
def update(self, other: "PluginNodes") -> None:
self.models.update(other.models)

View File

@@ -44,15 +44,10 @@ from dbt.graph import Graph
from dbt.task.printer import print_run_result_error
from dbt_common.events.contextvars import get_node_info
from dbt_common.events.functions import fire_event
from dbt_common.exceptions import (
CompilationError,
DbtInternalError,
DbtRuntimeError,
NotImplementedError,
)
from dbt_common.exceptions import DbtInternalError, DbtRuntimeError, NotImplementedError
def read_profiles(profiles_dir=None):
def read_profiles(profiles_dir: Optional[str] = None) -> Dict[str, Any]:
"""This is only used for some error handling"""
if profiles_dir is None:
profiles_dir = get_flags().PROFILES_DIR
@@ -71,6 +66,13 @@ class BaseTask(metaclass=ABCMeta):
def __init__(self, args: Flags) -> None:
self.args = args
def __enter__(self):
self.orig_dir = os.getcwd()
return self
def __exit__(self, exc_type, exc_value, traceback):
os.chdir(self.orig_dir)
@abstractmethod
def run(self):
raise dbt_common.exceptions.base.NotImplementedError("Not Implemented")
@@ -123,7 +125,7 @@ class ConfiguredTask(BaseTask):
self.manifest = manifest
self.compiler = Compiler(self.config)
def compile_manifest(self):
def compile_manifest(self) -> None:
if self.manifest is None:
raise DbtInternalError("compile_manifest called before manifest was loaded")
@@ -165,7 +167,7 @@ class ExecutionContext:
class BaseRunner(metaclass=ABCMeta):
def __init__(self, config, adapter, node, node_index, num_nodes) -> None:
def __init__(self, config, adapter, node, node_index: int, num_nodes: int) -> None:
self.config = config
self.compiler = Compiler(config)
self.adapter = adapter
@@ -272,11 +274,13 @@ class BaseRunner(metaclass=ABCMeta):
failures=result.failures,
)
def compile_and_execute(self, manifest, ctx):
def compile_and_execute(self, manifest: Manifest, ctx: ExecutionContext):
result = None
with self.adapter.connection_named(
self.node.unique_id, self.node
) if get_flags().INTROSPECT else nullcontext():
with (
self.adapter.connection_named(self.node.unique_id, self.node)
if get_flags().INTROSPECT
else nullcontext()
):
ctx.node.update_event_status(node_status=RunningStatus.Compiling)
fire_event(
NodeCompiling(
@@ -303,7 +307,7 @@ class BaseRunner(metaclass=ABCMeta):
return result
def _handle_catchable_exception(self, e, ctx):
def _handle_catchable_exception(self, e: DbtRuntimeError, ctx: ExecutionContext) -> str:
if e.node is None:
e.add_node(ctx.node)
@@ -314,7 +318,7 @@ class BaseRunner(metaclass=ABCMeta):
)
return str(e)
def _handle_internal_exception(self, e, ctx):
def _handle_internal_exception(self, e: DbtInternalError, ctx: ExecutionContext) -> str:
fire_event(
InternalErrorOnRun(
build_path=self._node_build_path(), exc=str(e), node_info=get_node_info()
@@ -322,7 +326,7 @@ class BaseRunner(metaclass=ABCMeta):
)
return str(e)
def _handle_generic_exception(self, e, ctx):
def _handle_generic_exception(self, e: Exception, ctx: ExecutionContext) -> str:
fire_event(
GenericExceptionOnRun(
build_path=self._node_build_path(),
@@ -335,9 +339,8 @@ class BaseRunner(metaclass=ABCMeta):
return str(e)
def handle_exception(self, e, ctx):
catchable_errors = (CompilationError, DbtRuntimeError)
if isinstance(e, catchable_errors):
def handle_exception(self, e: Exception, ctx: ExecutionContext) -> str:
if isinstance(e, DbtRuntimeError):
error = self._handle_catchable_exception(e, ctx)
elif isinstance(e, DbtInternalError):
error = self._handle_internal_exception(e, ctx)
@@ -345,7 +348,7 @@ class BaseRunner(metaclass=ABCMeta):
error = self._handle_generic_exception(e, ctx)
return error
def safe_run(self, manifest):
def safe_run(self, manifest: Manifest):
started = time.time()
ctx = ExecutionContext(self.node)
error = None
@@ -392,19 +395,19 @@ class BaseRunner(metaclass=ABCMeta):
return None
def before_execute(self):
raise NotImplementedError()
def before_execute(self) -> None:
raise NotImplementedError("before_execute is not implemented")
def execute(self, compiled_node, manifest):
raise NotImplementedError()
raise NotImplementedError("execute is not implemented")
def run(self, compiled_node, manifest):
return self.execute(compiled_node, manifest)
def after_execute(self, result):
raise NotImplementedError()
def after_execute(self, result) -> None:
raise NotImplementedError("after_execute is not implemented")
def _skip_caused_by_ephemeral_failure(self):
def _skip_caused_by_ephemeral_failure(self) -> bool:
if self.skip_cause is None or self.skip_cause.node is None:
return False
return self.skip_cause.node.is_ephemeral_model
@@ -459,7 +462,7 @@ class BaseRunner(metaclass=ABCMeta):
node_result = RunResult.from_node(self.node, RunStatus.Skipped, error_message)
return node_result
def do_skip(self, cause=None):
def do_skip(self, cause=None) -> None:
self.skip = True
self.skip_cause = cause

View File

@@ -1,5 +1,5 @@
import threading
from typing import Dict, List, Set
from typing import Dict, List, Optional, Set, Type
from dbt.artifacts.schemas.results import NodeStatus, RunStatus
from dbt.artifacts.schemas.run import RunResult
@@ -24,16 +24,16 @@ from .test import TestRunner as test_runner
class SavedQueryRunner(BaseRunner):
# Stub. No-op Runner for Saved Queries, which require MetricFlow for execution.
@property
def description(self):
def description(self) -> str:
return f"saved query {self.node.name}"
def before_execute(self):
def before_execute(self) -> None:
pass
def compile(self, manifest):
def compile(self, manifest: Manifest):
return self.node
def after_execute(self, result):
def after_execute(self, result) -> None:
fire_event(
LogNodeNoOpResult(
description=self.description,
@@ -83,7 +83,7 @@ class BuildTask(RunTask):
self.selected_unit_tests: Set = set()
self.model_to_unit_test_map: Dict[str, List] = {}
def resource_types(self, no_unit_tests=False):
def resource_types(self, no_unit_tests: bool = False) -> List[NodeType]:
resource_types = resource_types_from_args(
self.args, set(self.ALL_RESOURCE_VALUES), set(self.ALL_RESOURCE_VALUES)
)
@@ -210,7 +210,7 @@ class BuildTask(RunTask):
resource_types=resource_types,
)
def get_runner_type(self, node):
def get_runner_type(self, node) -> Optional[Type[BaseRunner]]:
return self.RUNNER_MAP.get(node.resource_type)
# Special build compile_manifest method to pass add_test_edges to the compiler

View File

@@ -16,7 +16,7 @@ class CleanTask(BaseTask):
self.config = config
self.project = config
def run(self):
def run(self) -> None:
"""
This function takes all the paths in the target file
and cleans the project paths that are not protected.

View File

@@ -1,7 +1,8 @@
import threading
from typing import AbstractSet, Any, Iterable, List, Optional, Set
from typing import AbstractSet, Any, Collection, Iterable, List, Optional, Set, Type
from dbt.adapters.base import BaseRelation
from dbt.artifacts.resources.types import NodeType
from dbt.artifacts.schemas.run import RunResult, RunStatus
from dbt.clients.jinja import MacroGenerator
from dbt.context.providers import generate_runtime_model_context
@@ -16,10 +17,10 @@ from dbt_common.exceptions import CompilationError, DbtInternalError
class CloneRunner(BaseRunner):
def before_execute(self):
def before_execute(self) -> None:
pass
def after_execute(self, result):
def after_execute(self, result) -> None:
pass
def _build_run_model_result(self, model, context):
@@ -44,7 +45,7 @@ class CloneRunner(BaseRunner):
failures=None,
)
def compile(self, manifest):
def compile(self, manifest: Manifest):
# no-op
return self.node
@@ -91,7 +92,7 @@ class CloneRunner(BaseRunner):
class CloneTask(GraphRunnableTask):
def raise_on_first_error(self):
def raise_on_first_error(self) -> bool:
return False
def get_run_mode(self) -> GraphRunnableMode:
@@ -133,8 +134,8 @@ class CloneTask(GraphRunnableTask):
self.populate_adapter_cache(adapter, schemas_to_cache)
@property
def resource_types(self):
resource_types = resource_types_from_args(
def resource_types(self) -> List[NodeType]:
resource_types: Collection[NodeType] = resource_types_from_args(
self.args, set(REFABLE_NODE_TYPES), set(REFABLE_NODE_TYPES)
)
@@ -154,5 +155,5 @@ class CloneTask(GraphRunnableTask):
resource_types=resource_types,
)
def get_runner_type(self, _):
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return CloneRunner

View File

@@ -1,6 +1,8 @@
import threading
from typing import Optional, Type
from dbt.artifacts.schemas.run import RunResult, RunStatus
from dbt.contracts.graph.manifest import Manifest
from dbt.events.types import CompiledNode, ParseInlineNodeError
from dbt.graph import ResourceTypeSelector
from dbt.node_types import EXECUTABLE_NODE_TYPES, NodeType
@@ -17,10 +19,10 @@ from dbt_common.exceptions import DbtInternalError
class CompileRunner(BaseRunner):
def before_execute(self):
def before_execute(self) -> None:
pass
def after_execute(self, result):
def after_execute(self, result) -> None:
pass
def execute(self, compiled_node, manifest):
@@ -35,7 +37,7 @@ class CompileRunner(BaseRunner):
failures=None,
)
def compile(self, manifest):
def compile(self, manifest: Manifest):
return self.compiler.compile_node(self.node, manifest, {})
@@ -44,7 +46,7 @@ class CompileTask(GraphRunnableTask):
# it should be removed before the task is complete
_inline_node_id = None
def raise_on_first_error(self):
def raise_on_first_error(self) -> bool:
return True
def get_node_selector(self) -> ResourceTypeSelector:
@@ -62,10 +64,10 @@ class CompileTask(GraphRunnableTask):
resource_types=resource_types,
)
def get_runner_type(self, _):
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return CompileRunner
def task_end_messages(self, results):
def task_end_messages(self, results) -> None:
is_inline = bool(getattr(self.args, "inline", None))
output_format = getattr(self.args, "output", "text")
@@ -127,14 +129,14 @@ class CompileTask(GraphRunnableTask):
raise DbtException("Error parsing inline query")
super()._runtime_initialize()
def after_run(self, adapter, results):
def after_run(self, adapter, results) -> None:
# remove inline node from manifest
if self._inline_node_id:
self.manifest.nodes.pop(self._inline_node_id)
self._inline_node_id = None
super().after_run(adapter, results)
def _handle_result(self, result):
def _handle_result(self, result) -> None:
super()._handle_result(result)
if (

View File

@@ -481,7 +481,7 @@ class DebugTask(BaseTask):
return status
@classmethod
def validate_connection(cls, target_dict):
def validate_connection(cls, target_dict) -> None:
"""Validate a connection dictionary. On error, raises a DbtConfigError."""
target_name = "test"
# make a fake profile that we can parse

View File

@@ -96,8 +96,6 @@ class DepsTask(BaseTask):
# See GH-7615
project.project_root = str(Path(project.project_root).resolve())
self.project = project
move_to_nearest_project_dir(project.project_root)
self.cli_vars = args.vars
def track_package_install(
@@ -202,6 +200,7 @@ class DepsTask(BaseTask):
fire_event(DepsLockUpdating(lock_filepath=lock_filepath))
def run(self) -> None:
move_to_nearest_project_dir(self.args.project_dir)
if self.args.add_package:
self.add()

View File

@@ -1,7 +1,7 @@
import os
import threading
import time
from typing import AbstractSet, Dict, List, Optional
from typing import AbstractSet, Dict, List, Optional, Type
from dbt import deprecations
from dbt.adapters.base.impl import FreshnessResponse
@@ -14,6 +14,7 @@ from dbt.artifacts.schemas.freshness import (
PartialSourceFreshnessResult,
SourceFreshnessResult,
)
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import HookNode, SourceDefinition
from dbt.contracts.results import RunStatus
from dbt.events.types import FreshnessCheckComplete, LogFreshnessResult, LogStartLine
@@ -44,7 +45,7 @@ class FreshnessRunner(BaseRunner):
def on_skip(self):
raise DbtRuntimeError("Freshness: nodes cannot be skipped!")
def before_execute(self):
def before_execute(self) -> None:
description = "freshness of {0.source_name}.{0.name}".format(self.node)
fire_event(
LogStartLine(
@@ -55,7 +56,7 @@ class FreshnessRunner(BaseRunner):
)
)
def after_execute(self, result):
def after_execute(self, result) -> None:
if hasattr(result, "node"):
source_name = result.node.source_name
table_name = result.node.name
@@ -162,7 +163,7 @@ class FreshnessRunner(BaseRunner):
**freshness,
)
def compile(self, manifest):
def compile(self, manifest: Manifest):
if self.node.resource_type != NodeType.Source:
# should be unreachable...
raise DbtRuntimeError("freshness runner: got a non-Source")
@@ -184,13 +185,13 @@ class FreshnessTask(RunTask):
super().__init__(args, config, manifest)
self._metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] = {}
def result_path(self):
def result_path(self) -> str:
if self.args.output:
return os.path.realpath(self.args.output)
else:
return os.path.join(self.config.project_target_path, RESULT_FILE_NAME)
def raise_on_first_error(self):
def raise_on_first_error(self) -> bool:
return False
def get_node_selector(self):
@@ -214,7 +215,7 @@ class FreshnessTask(RunTask):
freshness_runner.set_metadata_freshness_cache(self._metadata_freshness_cache)
return freshness_runner
def get_runner_type(self, _):
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return FreshnessRunner
def get_result(self, results, elapsed_time, generated_at):
@@ -222,7 +223,7 @@ class FreshnessTask(RunTask):
elapsed_time=elapsed_time, generated_at=generated_at, results=results
)
def task_end_messages(self, results):
def task_end_messages(self, results) -> None:
for result in results:
if result.status in (
FreshnessStatus.Error,

View File

@@ -1,4 +1,5 @@
import json
from typing import Iterator, List
from dbt.cli.flags import Flags
from dbt.config.runtime import RuntimeConfig
@@ -145,7 +146,7 @@ class ListTask(GraphRunnableTask):
}
)
def generate_paths(self):
def generate_paths(self) -> Iterator[str]:
for node in self._iterate_selected_nodes():
yield node.original_file_path
@@ -177,7 +178,7 @@ class ListTask(GraphRunnableTask):
return self.node_results
@property
def resource_types(self):
def resource_types(self) -> List[NodeType]:
if self.args.models:
return [NodeType.Model]

Some files were not shown because too many files have changed in this diff Show More