Compare commits

..

3 Commits

Author SHA1 Message Date
Callum McCann
fb8b161351 adding entity inheritence 2022-12-06 16:20:12 -06:00
Callum McCann
7ecb431278 making dimensions possible! 2022-12-06 13:54:28 -06:00
Callum McCann
792150ff6a let there be entity 2022-12-05 16:02:14 -06:00
176 changed files with 2419 additions and 1908 deletions

View File

@@ -1,6 +1,7 @@
kind: "Dependencies"
kind: "Dependency"
body: "Update pathspec requirement from ~=0.9.0 to >=0.9,<0.11 in /core"
time: 2022-09-23T00:06:46.00000Z
custom:
Author: dependabot[bot]
PR: "5917"
Issue: 4904
PR: 5917

View File

@@ -1,6 +1,7 @@
kind: "Dependencies"
kind: "Dependency"
body: "Bump black from 22.8.0 to 22.10.0"
time: 2022-10-07T00:08:48.00000Z
custom:
Author: dependabot[bot]
PR: "6019"
Issue: 4904
PR: 6019

View File

@@ -1,6 +1,7 @@
kind: "Dependencies"
kind: "Dependency"
body: "Bump mashumaro[msgpack] from 3.0.4 to 3.1.1 in /core"
time: 2022-10-20T00:07:53.00000Z
custom:
Author: dependabot[bot]
PR: "6108"
Issue: 4904
PR: 6108

View File

@@ -1,6 +1,7 @@
kind: "Dependencies"
kind: "Dependency"
body: "Update colorama requirement from <0.4.6,>=0.3.9 to >=0.3.9,<0.4.7 in /core"
time: 2022-10-26T00:09:10.00000Z
custom:
Author: dependabot[bot]
PR: "6144"
Issue: 4904
PR: 6144

View File

@@ -1,7 +0,0 @@
kind: "Dependency"
body: "Bump mashumaro[msgpack] from 3.1.1 to 3.2 in /core"
time: 2022-12-05T00:21:18.00000Z
custom:
Author: dependabot[bot]
Issue: 4904
PR: 6375

View File

@@ -4,3 +4,4 @@ time: 2022-09-08T15:41:57.689162-04:00
custom:
Author: andy-clapson
Issue: "5791"
PR: "5684"

View File

@@ -4,3 +4,4 @@ time: 2022-10-07T09:06:56.446078-05:00
custom:
Author: stu-k
Issue: "5528"
PR: "6022"

View File

@@ -3,3 +3,4 @@ time: 2022-10-17T17:14:11.715348-05:00
custom:
Author: paulbenschmidt
Issue: "5880"
PR: "324"

View File

@@ -4,3 +4,4 @@ time: 2022-11-16T15:57:43.204201+01:00
custom:
Author: jtcohen6
Issue: "323"
PR: "346"

View File

@@ -1,6 +0,0 @@
kind: Docs
body: Alphabetize `core/dbt/README.md`
time: 2022-12-02T15:05:23.695333-07:00
custom:
Author: dbeatty10
Issue: "6368"

View File

@@ -5,3 +5,4 @@ time: 2022-04-08T16:54:59.696564+01:00
custom:
Author: daniel-murray josephberni
Issue: "2968"
PR: "5859"

View File

@@ -1,6 +1,7 @@
kind: Features
body: Update structured logging. Convert to using protobuf messages. Ensure events are enriched with node_info.
body: Proto logging messages
time: 2022-08-17T15:48:57.225267-04:00
custom:
Author: gshank
Issue: "5610"
PR: "5643"

View File

@@ -4,3 +4,4 @@ time: 2022-09-12T12:59:35.121188+01:00
custom:
Author: jared-rimmer
Issue: "5486"
PR: "5812"

View File

@@ -4,3 +4,4 @@ time: 2022-09-14T09:56:25.97818-07:00
custom:
Author: colin-rogers-dbt
Issue: "5521"
PR: "5838"

View File

@@ -4,3 +4,4 @@ time: 2022-09-25T21:16:51.051239654+02:00
custom:
Author: pgoslatara
Issue: "5929"
PR: "5930"

View File

@@ -4,3 +4,4 @@ time: 2022-10-03T11:07:05.381632-05:00
custom:
Author: dave-connors-3
Issue: "5990"
PR: "5991"

View File

@@ -5,3 +5,4 @@ time: 2022-11-02T15:00:03.000805-05:00
custom:
Author: racheldaniel
Issue: "6201"
PR: "6202"

View File

@@ -1,8 +0,0 @@
kind: Features
body: Adding tarball install method for packages. Allowing package tarball to be specified
via url in the packages.yaml.
time: 2022-11-07T10:50:18.464545-05:00
custom:
Author: timle2
Issue: "4205"
PR: "4689"

View File

@@ -4,3 +4,4 @@ time: 2022-11-14T18:52:07.788593+02:00
custom:
Author: haritamar
Issue: "6246"
PR: "6247"

View File

@@ -4,3 +4,4 @@ time: 2022-11-30T11:29:13.256034-05:00
custom:
Author: michelleark
Issue: "6057"
PR: "6342"

View File

@@ -1,7 +0,0 @@
kind: Features
body: Add support for Python 3.11
time: 2022-12-06T15:07:04.753127+01:00
custom:
Author: joshuataylor MichelleArk jtcohen6
Issue: "6147"
PR: "6326"

View File

@@ -4,3 +4,4 @@ time: 2022-09-16T10:48:54.162273-05:00
custom:
Author: emmyoop
Issue: "3992"
PR: "5868"

View File

@@ -4,3 +4,4 @@ time: 2022-10-10T11:32:18.752322-05:00
custom:
Author: emmyoop
Issue: "6030"
PR: "6038"

View File

@@ -4,3 +4,4 @@ time: 2022-10-11T16:07:15.464093-04:00
custom:
Author: chamini2
Issue: "6041"
PR: "6042"

View File

@@ -5,3 +5,4 @@ time: 2022-10-16T17:37:42.846683-07:00
custom:
Author: versusfacit
Issue: "5436"
PR: "5874"

View File

@@ -4,3 +4,4 @@ time: 2022-11-07T09:53:14.340257-06:00
custom:
Author: ChenyuLInx
Issue: "5625"
PR: "6059"

View File

@@ -4,3 +4,4 @@ time: 2022-11-15T08:10:21.527884-05:00
custom:
Author: justbldwn
Issue: "6245"
PR: "6251"

View File

@@ -1,7 +0,0 @@
kind: Fixes
body: After this, will be possible to use default values for dbt.config.get
time: 2022-11-24T16:34:19.039512764-03:00
custom:
Author: devmessias
Issue: "6309"
PR: "6317"

View File

@@ -1,6 +0,0 @@
kind: Fixes
body: Use full path for writing manifest
time: 2022-12-02T16:48:59.029519-05:00
custom:
Author: gshank
Issue: "6055"

View File

@@ -4,3 +4,4 @@ time: 2022-09-27T19:42:59.241433-07:00
custom:
Author: max-sixty
Issue: "5946"
PR: "5947"

View File

@@ -4,3 +4,4 @@ time: 2022-09-29T13:44:06.275941-04:00
custom:
Author: peterallenwebb
Issue: "5809"
PR: "5975"

View File

@@ -4,3 +4,4 @@ time: 2022-10-05T12:03:10.061263-07:00
custom:
Author: max-sixty
Issue: "5983"
PR: "5983"

View File

@@ -4,3 +4,4 @@ time: 2022-10-07T09:46:27.682872-05:00
custom:
Author: emmyoop
Issue: "6023"
PR: "6024"

View File

@@ -4,3 +4,4 @@ time: 2022-10-07T14:00:44.227644-07:00
custom:
Author: max-sixty
Issue: "6028"
PR: "5978"

View File

@@ -4,3 +4,4 @@ time: 2022-10-13T18:19:12.167548-04:00
custom:
Author: peterallenwebb
Issue: "5229"
PR: "6025"

View File

@@ -4,3 +4,4 @@ time: 2022-10-17T15:15:11.499246-05:00
custom:
Author: luke-bassett
Issue: "1350"
PR: "6086"

View File

@@ -4,3 +4,4 @@ time: 2022-10-17T15:58:44.676549-04:00
custom:
Author: eve-johns
Issue: "6068"
PR: "6082"

View File

@@ -4,3 +4,4 @@ time: 2022-10-28T10:48:37.687886-04:00
custom:
Author: gshank
Issue: "6171"
PR: "6172"

View File

@@ -4,3 +4,4 @@ time: 2022-10-28T11:03:44.887836-04:00
custom:
Author: gshank
Issue: "6173"
PR: "6174"

View File

@@ -4,3 +4,4 @@ time: 2022-11-08T07:45:50.589147-06:00
custom:
Author: stu-k
Issue: "5942"
PR: "6226"

View File

@@ -4,3 +4,4 @@ time: 2022-11-08T11:56:33.743042-06:00
custom:
Author: stu-k
Issue: "5770"
PR: "6228"

View File

@@ -4,3 +4,4 @@ time: 2022-11-08T13:31:04.788547-06:00
custom:
Author: stu-k
Issue: "5771"
PR: "6230"

View File

@@ -4,3 +4,4 @@ time: 2022-11-16T13:00:37.916202-06:00
custom:
Author: stu-k
Issue: "5942"
PR: "6187"

View File

@@ -1,8 +0,0 @@
kind: Under the Hood
body: Functionality-neutral refactor of event logging system to improve encapsulation
and modularity.
time: 2022-11-18T14:57:17.792622-05:00
custom:
Author: peterallenwebb
Issue: "6139"
PR: "6291"

View File

@@ -1,7 +0,0 @@
kind: Under the Hood
body: Consolidate ParsedNode and CompiledNode classes
time: 2022-12-05T16:49:48.563583-05:00
custom:
Author: gshank
Issue: "6383"
PR: "6384"

View File

@@ -1,7 +0,0 @@
kind: Under the Hood
body: Prevent doc gen workflow from running on forks
time: 2022-12-06T09:40:15.301984-06:00
custom:
Author: stu-k
Issue: "6386"
PR: "6390"

View File

@@ -1,7 +0,0 @@
kind: Under the Hood
body: Fix intermittent database connection failure in Windows CI test
time: 2022-12-06T11:30:53.166009-07:00
custom:
Author: MichelleArk dbeatty10
Issue: "6394"
PR: "6395"

View File

@@ -6,67 +6,19 @@ changelogPath: CHANGELOG.md
versionExt: md
versionFormat: '## dbt-core {{.Version}} - {{.Time.Format "January 02, 2006"}}'
kindFormat: '### {{.Kind}}'
changeFormat: |-
{{- $IssueList := list }}
{{- $changes := splitList " " $.Custom.Issue }}
{{- range $issueNbr := $changes }}
{{- $changeLink := "[#nbr](https://github.com/dbt-labs/dbt-core/issues/nbr)" | replace "nbr" $issueNbr }}
{{- $IssueList = append $IssueList $changeLink }}
{{- end -}}
- {{.Body}} ({{ range $index, $element := $IssueList }}{{if $index}}, {{end}}{{$element}}{{end}})
changeFormat: '- {{.Body}} ([#{{.Custom.Issue}}](https://github.com/dbt-labs/dbt-core/issues/{{.Custom.Issue}}), [#{{.Custom.PR}}](https://github.com/dbt-labs/dbt-core/pull/{{.Custom.PR}}))'
kinds:
- label: Breaking Changes
- label: Features
- label: Fixes
- label: Docs
changeFormat: |-
{{- $IssueList := list }}
{{- $changes := splitList " " $.Custom.Issue }}
{{- range $issueNbr := $changes }}
{{- $changeLink := "[dbt-docs/#nbr](https://github.com/dbt-labs/dbt-docs/issues/nbr)" | replace "nbr" $issueNbr }}
{{- $IssueList = append $IssueList $changeLink }}
{{- end -}}
- {{.Body}} ({{ range $index, $element := $IssueList }}{{if $index}}, {{end}}{{$element}}{{end}})
changeFormat: '- {{.Body}} ([dbt-docs/#{{.Custom.Issue}}](https://github.com/dbt-labs/dbt-docs/issues/{{.Custom.Issue}}), [dbt-docs/#{{.Custom.PR}}](https://github.com/dbt-labs/dbt-docs/pull/{{.Custom.PR}}))'
- label: Under the Hood
- label: Dependencies
changeFormat: |-
{{- $PRList := list }}
{{- $changes := splitList " " $.Custom.PR }}
{{- range $pullrequest := $changes }}
{{- $changeLink := "[#nbr](https://github.com/dbt-labs/dbt-core/pull/nbr)" | replace "nbr" $pullrequest }}
{{- $PRList = append $PRList $changeLink }}
{{- end -}}
- {{.Body}} ({{ range $index, $element := $PRList }}{{if $index}}, {{end}}{{$element}}{{end}})
skipGlobalChoices: true
additionalChoices:
- key: Author
label: GitHub Username(s) (separated by a single space if multiple)
type: string
minLength: 3
- key: PR
label: GitHub Pull Request Number (separated by a single space if multiple)
type: string
minLength: 1
changeFormat: '- {{.Body}} ({{if ne .Custom.Issue ""}}[#{{.Custom.Issue}}](https://github.com/dbt-labs/dbt-core/issues/{{.Custom.Issue}}), {{end}}[#{{.Custom.PR}}](https://github.com/dbt-labs/dbt-core/pull/{{.Custom.PR}}))'
- label: Security
changeFormat: |-
{{- $PRList := list }}
{{- $changes := splitList " " $.Custom.PR }}
{{- range $pullrequest := $changes }}
{{- $changeLink := "[#nbr](https://github.com/dbt-labs/dbt-core/pull/nbr)" | replace "nbr" $pullrequest }}
{{- $PRList = append $PRList $changeLink }}
{{- end -}}
- {{.Body}} ({{ range $index, $element := $PRList }}{{if $index}}, {{end}}{{$element}}{{end}})
skipGlobalChoices: true
additionalChoices:
- key: Author
label: GitHub Username(s) (separated by a single space if multiple)
type: string
minLength: 3
- key: PR
label: GitHub Pull Request Number (separated by a single space if multiple)
type: string
minLength: 1
changeFormat: '- {{.Body}} ({{if ne .Custom.Issue ""}}[#{{.Custom.Issue}}](https://github.com/dbt-labs/dbt-core/issues/{{.Custom.Issue}}), {{end}}[#{{.Custom.PR}}](https://github.com/dbt-labs/dbt-core/pull/{{.Custom.PR}}))'
newlines:
afterChangelogHeader: 1
@@ -81,41 +33,42 @@ custom:
type: string
minLength: 3
- key: Issue
label: GitHub Issue Number (separated by a single space if multiple)
type: string
minLength: 1
label: GitHub Issue Number
type: int
minInt: 1
- key: PR
label: GitHub Pull Request Number
type: int
minInt: 1
footerFormat: |
{{- $contributorDict := dict }}
{{- /* any names added to this list should be all lowercase for later matching purposes */}}
{{- $core_team := list "michelleark" "peterallenwebb" "emmyoop" "nathaniel-may" "gshank" "leahwicz" "chenyulinx" "stu-k" "iknox-fa" "versusfacit" "mcknight-42" "jtcohen6" "aranke" "dependabot[bot]" "snyk-bot" "colin-rogers-dbt" }}
{{- $core_team := list "michelleark" "peterallenwebb" "emmyoop" "nathaniel-may" "gshank" "leahwicz" "chenyulinx" "stu-k" "iknox-fa" "versusfacit" "mcknight-42" "jtcohen6" "dependabot[bot]" "snyk-bot" "colin-rogers-dbt" }}
{{- range $change := .Changes }}
{{- $authorList := splitList " " $change.Custom.Author }}
{{- /* loop through all authors for a single changelog */}}
{{- /* loop through all authors for a PR */}}
{{- range $author := $authorList }}
{{- $authorLower := lower $author }}
{{- /* we only want to include non-core team contributors */}}
{{- if not (has $authorLower $core_team)}}
{{- $changeList := splitList " " $change.Custom.Author }}
{{- /* Docs kind link back to dbt-docs instead of dbt-core issues */}}
{{- $changeLink := $change.Kind }}
{{- if or (eq $change.Kind "Dependencies") (eq $change.Kind "Security") }}
{{- $changeLink = "[#nbr](https://github.com/dbt-labs/dbt-core/pull/nbr)" | replace "nbr" $change.Custom.PR }}
{{- else if eq $change.Kind "Docs"}}
{{- $changeLink = "[dbt-docs/#nbr](https://github.com/dbt-labs/dbt-docs/issues/nbr)" | replace "nbr" $change.Custom.Issue }}
{{- else }}
{{- $changeLink = "[#nbr](https://github.com/dbt-labs/dbt-core/issues/nbr)" | replace "nbr" $change.Custom.Issue }}
{{- end }}
{{- /* check if this contributor has other changes associated with them already */}}
{{- if hasKey $contributorDict $author }}
{{- $contributionList := get $contributorDict $author }}
{{- $contributionList = append $contributionList $changeLink }}
{{- $contributorDict := set $contributorDict $author $contributionList }}
{{- else }}
{{- $contributionList := list $changeLink }}
{{- $contributorDict := set $contributorDict $author $contributionList }}
{{- end }}
{{- end}}
{{- /* Docs kind link back to dbt-docs instead of dbt-core PRs */}}
{{- $prLink := $change.Kind }}
{{- if eq $change.Kind "Docs" }}
{{- $prLink = "[dbt-docs/#pr](https://github.com/dbt-labs/dbt-docs/pull/pr)" | replace "pr" $change.Custom.PR }}
{{- else }}
{{- $prLink = "[#pr](https://github.com/dbt-labs/dbt-core/pull/pr)" | replace "pr" $change.Custom.PR }}
{{- end }}
{{- /* check if this contributor has other PRs associated with them already */}}
{{- if hasKey $contributorDict $author }}
{{- $prList := get $contributorDict $author }}
{{- $prList = append $prList $prLink }}
{{- $contributorDict := set $contributorDict $author $prList }}
{{- else }}
{{- $prList := list $prLink }}
{{- $contributorDict := set $contributorDict $author $prList }}
{{- end }}
{{- end}}
{{- end}}
{{- end }}
{{- /* no indentation here for formatting so the final markdown doesn't have unneeded indentations */}}

View File

@@ -40,7 +40,7 @@ jobs:
matrix:
include:
- label: "dependencies"
changie_kind: "Dependencies"
changie_kind: "Dependency"
- label: "snyk"
changie_kind: "Security"
runs-on: ubuntu-latest
@@ -58,4 +58,4 @@ jobs:
commit_message: "Add automated changelog yaml from template for bot PR"
changie_kind: ${{ matrix.changie_kind }}
label: ${{ matrix.label }}
custom_changelog_string: "custom:\n Author: ${{ github.event.pull_request.user.login }}\n PR: ${{ github.event.pull_request.number }}"
custom_changelog_string: "custom:\n Author: ${{ github.event.pull_request.user.login }}\n Issue: 4904\n PR: ${{ github.event.pull_request.number }}"

View File

@@ -34,7 +34,6 @@ jobs:
check_gen:
name: check if generation needed
runs-on: ubuntu-latest
if: ${{ github.event.pull_request.head.repo.fork == false }}
outputs:
cli_dir_changed: ${{ steps.check_cli.outputs.cli_dir_changed }}
docs_dir_changed: ${{ steps.check_docs.outputs.docs_dir_changed }}
@@ -45,6 +44,8 @@ jobs:
echo "env.CLI_DIR: ${{ env.CLI_DIR }}"
echo "env.DOCS_BUILD_DIR: ${{ env.DOCS_BUILD_DIR }}"
echo "env.DOCS_DIR: ${{ env.DOCS_DIR }}"
echo ">>>>> git log"
git log --pretty=oneline | head -5
- name: git checkout
uses: actions/checkout@v3

View File

@@ -73,7 +73,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
python-version: ["3.7", "3.8", "3.9", "3.10"]
env:
TOXENV: "unit"
@@ -118,7 +118,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
python-version: ["3.7", "3.8", "3.9", "3.10"]
os: [ubuntu-20.04]
include:
- python-version: 3.8

View File

@@ -56,7 +56,7 @@ There are some tools that will be helpful to you in developing locally. While th
These are the tools used in `dbt-core` development and testing:
- [`tox`](https://tox.readthedocs.io/en/latest/) to manage virtualenvs across python versions. We currently target the latest patch releases for Python 3.7, 3.8, 3.9, 3.10 and 3.11
- [`tox`](https://tox.readthedocs.io/en/latest/) to manage virtualenvs across python versions. We currently target the latest patch releases for Python 3.7, 3.8, 3.9, and 3.10
- [`pytest`](https://docs.pytest.org/en/latest/) to define, discover, and run tests
- [`flake8`](https://flake8.pycqa.org/en/latest/) for code linting
- [`black`](https://github.com/psf/black) for code formatting
@@ -160,7 +160,7 @@ suites.
#### `tox`
[`tox`](https://tox.readthedocs.io/en/latest/) takes care of managing virtualenvs and install dependencies in order to run tests. You can also run tests in parallel, for example, you can run unit tests for Python 3.7, Python 3.8, Python 3.9, Python 3.10 and Python 3.11 checks in parallel with `tox -p`. Also, you can run unit tests for specific python versions with `tox -e py37`. The configuration for these tests in located in `tox.ini`.
[`tox`](https://tox.readthedocs.io/en/latest/) takes care of managing virtualenvs and install dependencies in order to run tests. You can also run tests in parallel, for example, you can run unit tests for Python 3.7, Python 3.8, Python 3.9, and Python 3.10 checks in parallel with `tox -p`. Also, you can run unit tests for specific python versions with `tox -e py37`. The configuration for these tests in located in `tox.ini`.
#### `pytest`
@@ -201,21 +201,13 @@ Here are some general rules for adding tests:
* Sometimes flake8 complains about lines that are actually fine, in which case you can put a comment on the line such as: # noqa or # noqa: ANNN, where ANNN is the error code that flake8 issues.
* To collect output for `CProfile`, run dbt with the `-r` option and the name of an output file, i.e. `dbt -r dbt.cprof run`. If you just want to profile parsing, you can do: `dbt -r dbt.cprof parse`. `pip` install `snakeviz` to view the output. Run `snakeviz dbt.cprof` and output will be rendered in a browser window.
## Adding or modifying a CHANGELOG Entry
## Adding a CHANGELOG Entry
We use [changie](https://changie.dev) to generate `CHANGELOG` entries. **Note:** Do not edit the `CHANGELOG.md` directly. Your modifications will be lost.
Follow the steps to [install `changie`](https://changie.dev/guide/installation/) for your system.
Once changie is installed and your PR is created for a new feature, simply run the following command and changie will walk you through the process of creating a changelog entry:
```shell
changie new
```
Commit the file that's created and your changelog entry is complete!
If you are contributing to a feature already in progress, you will modify the changie yaml file in dbt/.changes/unreleased/ related to your change. If you need help finding this file, please ask within the discussion for the pull request!
Once changie is installed and your PR is created, simply run `changie new` and changie will walk you through the process of creating a changelog entry. Commit the file that's created and your changelog entry is complete!
You don't need to worry about which `dbt-core` version your change will go into. Just create the changelog entry with `changie`, and open your PR against the `main` branch. All merged changes will be included in the next minor version of `dbt-core`. The Core maintainers _may_ choose to "backport" specific changes in order to patch older minor versions. In that case, a maintainer will take care of that backport after merging your PR, before releasing the new version of `dbt-core`.

View File

@@ -49,9 +49,6 @@ RUN apt-get update \
python3.10 \
python3.10-dev \
python3.10-venv \
python3.11 \
python3.11-dev \
python3.11-venv \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*

View File

@@ -2,59 +2,50 @@
## The following are individual files in this directory.
### compilation.py
### constants.py
### dataclass_schema.py
### deprecations.py
### exceptions.py
### flags.py
### helper_types.py
### hooks.py
### lib.py
### links.py
### logger.py
### main.py
### node_types.py
### profiler.py
### selected_resources.py
### semver.py
### tracking.py
### version.py
### lib.py
### node_types.py
### helper_types.py
### links.py
### semver.py
### ui.py
### compilation.py
### dataclass_schema.py
### exceptions.py
### hooks.py
### logger.py
### profiler.py
### utils.py
### version.py
## The subdirectories will be documented in a README in the subdirectory
* adapters
* cli
* clients
* config
* context
* contracts
* deps
* docs
* events
* graph
* include
* parser
* adapters
* context
* deps
* graph
* task
* tests
* clients
* events

View File

@@ -48,7 +48,6 @@ from dbt.events.types import (
Rollback,
RollbackFailed,
)
from dbt.events.contextvars import get_node_info
from dbt import flags
from dbt.utils import cast_to_str
@@ -170,9 +169,7 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
if conn.name == conn_name and conn.state == "open":
return conn
fire_event(
NewConnection(conn_name=conn_name, conn_type=self.TYPE, node_info=get_node_info())
)
fire_event(NewConnection(conn_name=conn_name, conn_type=self.TYPE))
if conn.state == "open":
fire_event(ConnectionReused(conn_name=conn_name))
@@ -339,9 +336,7 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
except Exception:
fire_event(
RollbackFailed(
conn_name=cast_to_str(connection.name),
exc_info=traceback.format_exc(),
node_info=get_node_info(),
conn_name=cast_to_str(connection.name), exc_info=traceback.format_exc()
)
)
@@ -350,16 +345,10 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
"""Perform the actual close operation."""
# On windows, sometimes connection handles don't have a close() attr.
if hasattr(connection.handle, "close"):
fire_event(
ConnectionClosed(conn_name=cast_to_str(connection.name), node_info=get_node_info())
)
fire_event(ConnectionClosed(conn_name=cast_to_str(connection.name)))
connection.handle.close()
else:
fire_event(
ConnectionLeftOpen(
conn_name=cast_to_str(connection.name), node_info=get_node_info()
)
)
fire_event(ConnectionLeftOpen(conn_name=cast_to_str(connection.name)))
@classmethod
def _rollback(cls, connection: Connection) -> None:
@@ -370,7 +359,7 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
f'"{connection.name}", but it does not have one open!'
)
fire_event(Rollback(conn_name=cast_to_str(connection.name), node_info=get_node_info()))
fire_event(Rollback(conn_name=cast_to_str(connection.name)))
cls._rollback_handle(connection)
connection.transaction_open = False
@@ -382,7 +371,7 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
return connection
if connection.transaction_open and connection.handle:
fire_event(Rollback(conn_name=cast_to_str(connection.name), node_info=get_node_info()))
fire_event(Rollback(conn_name=cast_to_str(connection.name)))
cls._rollback_handle(connection)
connection.transaction_open = False

View File

@@ -15,6 +15,7 @@ from typing import (
List,
Mapping,
Iterator,
Union,
Set,
)
@@ -37,8 +38,9 @@ from dbt.adapters.protocol import (
)
from dbt.clients.agate_helper import empty_table, merge_tables, table_from_rows
from dbt.clients.jinja import MacroGenerator
from dbt.contracts.graph.compiled import CompileResultNode, CompiledSeedNode
from dbt.contracts.graph.manifest import Manifest, MacroManifest
from dbt.contracts.graph.nodes import ResultNode
from dbt.contracts.graph.parsed import ParsedSeedNode
from dbt.events.functions import fire_event, warn_or_error
from dbt.events.types import (
CacheMiss,
@@ -62,6 +64,9 @@ from dbt.adapters.base import Credentials
from dbt.adapters.cache import RelationsCache, _make_ref_key_msg
SeedModel = Union[ParsedSeedNode, CompiledSeedNode]
GET_CATALOG_MACRO_NAME = "get_catalog"
FRESHNESS_MACRO_NAME = "collect_freshness"
@@ -238,7 +243,9 @@ class BaseAdapter(metaclass=AdapterMeta):
return conn.name
@contextmanager
def connection_named(self, name: str, node: Optional[ResultNode] = None) -> Iterator[None]:
def connection_named(
self, name: str, node: Optional[CompileResultNode] = None
) -> Iterator[None]:
try:
if self.connections.query_header is not None:
self.connections.query_header.set(name, node)
@@ -250,7 +257,7 @@ class BaseAdapter(metaclass=AdapterMeta):
self.connections.query_header.reset()
@contextmanager
def connection_for(self, node: ResultNode) -> Iterator[None]:
def connection_for(self, node: CompileResultNode) -> Iterator[None]:
with self.connection_named(node.unique_id, node):
yield
@@ -365,7 +372,7 @@ class BaseAdapter(metaclass=AdapterMeta):
lowercase strings.
"""
info_schema_name_map = SchemaSearchMap()
nodes: Iterator[ResultNode] = chain(
nodes: Iterator[CompileResultNode] = chain(
[
node
for node in manifest.nodes.values()

View File

@@ -5,7 +5,7 @@ from dbt.clients.jinja import QueryStringGenerator
from dbt.context.manifest import generate_query_header_context
from dbt.contracts.connection import AdapterRequiredConfig, QueryComment
from dbt.contracts.graph.nodes import ResultNode
from dbt.contracts.graph.compiled import CompileResultNode
from dbt.contracts.graph.manifest import Manifest
from dbt.exceptions import RuntimeException
@@ -90,7 +90,7 @@ class MacroQueryStringSetter:
def reset(self):
self.set("master", None)
def set(self, name: str, node: Optional[ResultNode]):
def set(self, name: str, node: Optional[CompileResultNode]):
wrapped: Optional[NodeWrapper] = None
if node is not None:
wrapped = NodeWrapper(node)

View File

@@ -1,8 +1,9 @@
from collections.abc import Hashable
from dataclasses import dataclass, field
from dataclasses import dataclass
from typing import Optional, TypeVar, Any, Type, Dict, Union, Iterator, Tuple, Set
from dbt.contracts.graph.nodes import SourceDefinition, ParsedNode
from dbt.contracts.graph.compiled import CompiledNode
from dbt.contracts.graph.parsed import ParsedSourceDefinition, ParsedNode
from dbt.contracts.relation import (
RelationType,
ComponentName,
@@ -26,10 +27,8 @@ class BaseRelation(FakeAPIObject, Hashable):
path: Path
type: Optional[RelationType] = None
quote_character: str = '"'
# Python 3.11 requires that these use default_factory instead of simple default
# ValueError: mutable default <class 'dbt.contracts.relation.Policy'> for field include_policy is not allowed: use default_factory
include_policy: Policy = field(default_factory=lambda: Policy())
quote_policy: Policy = field(default_factory=lambda: Policy())
include_policy: Policy = Policy()
quote_policy: Policy = Policy()
dbt_created: bool = False
def _is_exactish_match(self, field: ComponentName, value: str) -> bool:
@@ -40,9 +39,9 @@ class BaseRelation(FakeAPIObject, Hashable):
@classmethod
def _get_field_named(cls, field_name):
for f, _ in cls._get_fields():
if f.name == field_name:
return f
for field, _ in cls._get_fields():
if field.name == field_name:
return field
# this should be unreachable
raise ValueError(f"BaseRelation has no {field_name} field!")
@@ -53,11 +52,11 @@ class BaseRelation(FakeAPIObject, Hashable):
@classmethod
def get_default_quote_policy(cls) -> Policy:
return cls._get_field_named("quote_policy").default_factory()
return cls._get_field_named("quote_policy").default
@classmethod
def get_default_include_policy(cls) -> Policy:
return cls._get_field_named("include_policy").default_factory()
return cls._get_field_named("include_policy").default
def get(self, key, default=None):
"""Override `.get` to return a metadata object so we don't break
@@ -185,7 +184,7 @@ class BaseRelation(FakeAPIObject, Hashable):
)
@classmethod
def create_from_source(cls: Type[Self], source: SourceDefinition, **kwargs: Any) -> Self:
def create_from_source(cls: Type[Self], source: ParsedSourceDefinition, **kwargs: Any) -> Self:
source_quoting = source.quoting.to_dict(omit_none=True)
source_quoting.pop("column", None)
quote_policy = deep_merge(
@@ -210,7 +209,7 @@ class BaseRelation(FakeAPIObject, Hashable):
def create_ephemeral_from_node(
cls: Type[Self],
config: HasQuoting,
node: ParsedNode,
node: Union[ParsedNode, CompiledNode],
) -> Self:
# Note that ephemeral models are based on the name.
identifier = cls.add_ephemeral_prefix(node.name)
@@ -223,7 +222,7 @@ class BaseRelation(FakeAPIObject, Hashable):
def create_from_node(
cls: Type[Self],
config: HasQuoting,
node: ParsedNode,
node: Union[ParsedNode, CompiledNode],
quote_policy: Optional[Dict[str, bool]] = None,
**kwargs: Any,
) -> Self:
@@ -244,18 +243,21 @@ class BaseRelation(FakeAPIObject, Hashable):
def create_from(
cls: Type[Self],
config: HasQuoting,
node: Union[ParsedNode, SourceDefinition],
node: Union[CompiledNode, ParsedNode, ParsedSourceDefinition],
**kwargs: Any,
) -> Self:
if node.resource_type == NodeType.Source:
if not isinstance(node, SourceDefinition):
if not isinstance(node, ParsedSourceDefinition):
raise InternalException(
"type mismatch, expected SourceDefinition but got {}".format(type(node))
"type mismatch, expected ParsedSourceDefinition but got {}".format(type(node))
)
return cls.create_from_source(node, **kwargs)
else:
if not isinstance(node, (ParsedNode)):
raise InternalException(f"type mismatch, expected ParsedNode but got {type(node)}")
if not isinstance(node, (ParsedNode, CompiledNode)):
raise InternalException(
"type mismatch, expected ParsedNode or CompiledNode but "
"got {}".format(type(node))
)
return cls.create_from_node(config, node, **kwargs)
@classmethod

View File

@@ -17,7 +17,8 @@ from typing_extensions import Protocol
import agate
from dbt.contracts.connection import Connection, AdapterRequiredConfig, AdapterResponse
from dbt.contracts.graph.nodes import ParsedNode, SourceDefinition, ManifestNode
from dbt.contracts.graph.compiled import CompiledNode, ManifestNode, NonSourceCompiledNode
from dbt.contracts.graph.parsed import ParsedNode, ParsedSourceDefinition
from dbt.contracts.graph.model_config import BaseConfig
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.relation import Policy, HasQuoting
@@ -50,7 +51,7 @@ class RelationProtocol(Protocol):
def create_from(
cls: Type[Self],
config: HasQuoting,
node: Union[ParsedNode, SourceDefinition],
node: Union[CompiledNode, ParsedNode, ParsedSourceDefinition],
) -> Self:
...
@@ -64,7 +65,7 @@ class CompilerProtocol(Protocol):
node: ManifestNode,
manifest: Manifest,
extra_context: Optional[Dict[str, Any]] = None,
) -> ManifestNode:
) -> NonSourceCompiledNode:
...

View File

@@ -10,7 +10,6 @@ from dbt.adapters.base import BaseConnectionManager
from dbt.contracts.connection import Connection, ConnectionState, AdapterResponse
from dbt.events.functions import fire_event
from dbt.events.types import ConnectionUsed, SQLQuery, SQLCommit, SQLQueryStatus
from dbt.events.contextvars import get_node_info
from dbt.utils import cast_to_str
@@ -57,13 +56,7 @@ class SQLConnectionManager(BaseConnectionManager):
connection = self.get_thread_connection()
if auto_begin and connection.transaction_open is False:
self.begin()
fire_event(
ConnectionUsed(
conn_type=self.TYPE,
conn_name=cast_to_str(connection.name),
node_info=get_node_info(),
)
)
fire_event(ConnectionUsed(conn_type=self.TYPE, conn_name=cast_to_str(connection.name)))
with self.exception_handler(sql):
if abridge_sql_log:
@@ -71,11 +64,7 @@ class SQLConnectionManager(BaseConnectionManager):
else:
log_sql = sql
fire_event(
SQLQuery(
conn_name=cast_to_str(connection.name), sql=log_sql, node_info=get_node_info()
)
)
fire_event(SQLQuery(conn_name=cast_to_str(connection.name), sql=log_sql))
pre = time.time()
cursor = connection.handle.cursor()
@@ -83,9 +72,7 @@ class SQLConnectionManager(BaseConnectionManager):
fire_event(
SQLQueryStatus(
status=str(self.get_response(cursor)),
elapsed=round((time.time() - pre)),
node_info=get_node_info(),
status=str(self.get_response(cursor)), elapsed=round((time.time() - pre), 2)
)
)
@@ -169,7 +156,7 @@ class SQLConnectionManager(BaseConnectionManager):
"it does not have one open!".format(connection.name)
)
fire_event(SQLCommit(conn_name=connection.name, node_info=get_node_info()))
fire_event(SQLCommit(conn_name=connection.name))
self.add_commit_query()
connection.transaction_open = False

View File

@@ -31,6 +31,7 @@ def cli_runner():
@p.cache_selected_only
@p.debug
@p.enable_legacy_logger
@p.event_buffer_size
@p.fail_fast
@p.log_cache_events
@p.log_format

View File

@@ -80,6 +80,14 @@ enable_legacy_logger = click.option(
hidden=True,
)
event_buffer_size = click.option(
"--event-buffer-size",
envvar="DBT_EVENT_BUFFER_SIZE",
help="Sets the max number of events to buffer in EVENT_HISTORY.",
default=100000,
type=click.INT,
)
exclude = click.option("--exclude", envvar=None, help="Specify the nodes to exclude.")
fail_fast = click.option(

View File

@@ -25,7 +25,8 @@ from dbt.utils import (
)
from dbt.clients._jinja_blocks import BlockIterator, BlockData, BlockTag
from dbt.contracts.graph.nodes import GenericTestNode
from dbt.contracts.graph.compiled import CompiledGenericTestNode
from dbt.contracts.graph.parsed import ParsedGenericTestNode
from dbt.exceptions import (
InternalException,
@@ -619,7 +620,7 @@ GENERIC_TEST_KWARGS_NAME = "_dbt_generic_test_kwargs"
def add_rendered_test_kwargs(
context: Dict[str, Any],
node: GenericTestNode,
node: Union[ParsedGenericTestNode, CompiledGenericTestNode],
capture_macros: bool = False,
) -> None:
"""Render each of the test kwargs in the given context using the native

View File

@@ -1,6 +1,6 @@
import os
from collections import defaultdict
from typing import List, Dict, Any, Tuple, Optional
from typing import List, Dict, Any, Tuple, cast, Optional
import networkx as nx # type: ignore
import pickle
@@ -12,13 +12,15 @@ from dbt.clients import jinja
from dbt.clients.system import make_directory
from dbt.context.providers import generate_runtime_model_context
from dbt.contracts.graph.manifest import Manifest, UniqueID
from dbt.contracts.graph.nodes import (
ParsedNode,
ManifestNode,
GenericTestNode,
from dbt.contracts.graph.compiled import (
COMPILED_TYPES,
CompiledGenericTestNode,
GraphMemberNode,
InjectedCTE,
ManifestNode,
NonSourceCompiledNode,
)
from dbt.contracts.graph.parsed import ParsedNode
from dbt.exceptions import (
dependency_not_found,
InternalException,
@@ -26,8 +28,7 @@ from dbt.exceptions import (
)
from dbt.graph import Graph
from dbt.events.functions import fire_event
from dbt.events.types import FoundStats, WritingInjectedSQLForNode
from dbt.events.contextvars import get_node_info
from dbt.events.types import FoundStats, CompilingNode, WritingInjectedSQLForNode
from dbt.node_types import NodeType, ModelLanguage
from dbt.events.format import pluralize
import dbt.tracking
@@ -35,6 +36,14 @@ import dbt.tracking
graph_file_name = "graph.gpickle"
def _compiled_type_for(model: ParsedNode):
if type(model) not in COMPILED_TYPES:
raise InternalException(
f"Asked to compile {type(model)} node, but it has no compiled form"
)
return COMPILED_TYPES[type(model)]
def print_compile_stats(stats):
names = {
NodeType.Model: "model",
@@ -47,6 +56,7 @@ def print_compile_stats(stats):
NodeType.Source: "source",
NodeType.Exposure: "exposure",
NodeType.Metric: "metric",
NodeType.Entity: "entity",
}
results = {k: 0 for k in names.keys()}
@@ -82,6 +92,8 @@ def _generate_stats(manifest: Manifest):
stats[exposure.resource_type] += 1
for metric in manifest.metrics.values():
stats[metric.resource_type] += 1
for entity in manifest.entities.values():
stats[entity.resource_type] += 1
for macro in manifest.macros.values():
stats[macro.resource_type] += 1
return stats
@@ -167,7 +179,7 @@ class Compiler:
# a dict for jinja rendering of SQL
def _create_node_context(
self,
node: ManifestNode,
node: NonSourceCompiledNode,
manifest: Manifest,
extra_context: Dict[str, Any],
) -> Dict[str, Any]:
@@ -175,7 +187,7 @@ class Compiler:
context = generate_runtime_model_context(node, self.config, manifest)
context.update(extra_context)
if isinstance(node, GenericTestNode):
if isinstance(node, CompiledGenericTestNode):
# for test nodes, add a special keyword args value to the context
jinja.add_rendered_test_kwargs(context, node)
@@ -252,10 +264,10 @@ class Compiler:
def _recursively_prepend_ctes(
self,
model: ManifestNode,
model: NonSourceCompiledNode,
manifest: Manifest,
extra_context: Optional[Dict[str, Any]],
) -> Tuple[ManifestNode, List[InjectedCTE]]:
) -> Tuple[NonSourceCompiledNode, List[InjectedCTE]]:
"""This method is called by the 'compile_node' method. Starting
from the node that it is passed in, it will recursively call
itself using the 'extra_ctes'. The 'ephemeral' models do
@@ -296,6 +308,8 @@ class Compiler:
# This model has already been compiled, so it's been
# through here before
if getattr(cte_model, "compiled", False):
assert isinstance(cte_model, tuple(COMPILED_TYPES.values()))
cte_model = cast(NonSourceCompiledNode, cte_model)
new_prepended_ctes = cte_model.extra_ctes
# if the cte_model isn't compiled, i.e. first time here
@@ -332,7 +346,7 @@ class Compiler:
return model, prepended_ctes
# Sets compiled fields in the ManifestNode passed in,
# creates a compiled_node from the ManifestNode passed in,
# creates a "context" dictionary for jinja rendering,
# and then renders the "compiled_code" using the node, the
# raw_code and the context.
@@ -341,10 +355,12 @@ class Compiler:
node: ManifestNode,
manifest: Manifest,
extra_context: Optional[Dict[str, Any]] = None,
) -> ManifestNode:
) -> NonSourceCompiledNode:
if extra_context is None:
extra_context = {}
fire_event(CompilingNode(unique_id=node.unique_id))
data = node.to_dict(omit_none=True)
data.update(
{
@@ -354,8 +370,9 @@ class Compiler:
"extra_ctes": [],
}
)
compiled_node = _compiled_type_for(node).from_dict(data)
if node.language == ModelLanguage.python:
if compiled_node.language == ModelLanguage.python:
# TODO could we also 'minify' this code at all? just aesthetic, not functional
# quoating seems like something very specific to sql so far
@@ -363,7 +380,7 @@ class Compiler:
# TODO try to find better way to do this, given that
original_quoting = self.config.quoting
self.config.quoting = {key: False for key in original_quoting.keys()}
context = self._create_node_context(node, manifest, extra_context)
context = self._create_node_context(compiled_node, manifest, extra_context)
postfix = jinja.get_rendered(
"{{ py_script_postfix(model) }}",
@@ -371,23 +388,23 @@ class Compiler:
node,
)
# we should NOT jinja render the python model's 'raw code'
node.compiled_code = f"{node.raw_code}\n\n{postfix}"
compiled_node.compiled_code = f"{node.raw_code}\n\n{postfix}"
# restore quoting settings in the end since context is lazy evaluated
self.config.quoting = original_quoting
else:
context = self._create_node_context(node, manifest, extra_context)
node.compiled_code = jinja.get_rendered(
context = self._create_node_context(compiled_node, manifest, extra_context)
compiled_node.compiled_code = jinja.get_rendered(
node.raw_code,
context,
node,
)
node.relation_name = self._get_relation_name(node)
compiled_node.relation_name = self._get_relation_name(node)
node.compiled = True
compiled_node.compiled = True
return node
return compiled_node
def write_graph_file(self, linker: Linker, manifest: Manifest):
filename = graph_file_name
@@ -494,10 +511,10 @@ class Compiler:
return Graph(linker.graph)
# writes the "compiled_code" into the target/compiled directory
def _write_node(self, node: ManifestNode) -> ManifestNode:
def _write_node(self, node: NonSourceCompiledNode) -> ManifestNode:
if not node.extra_ctes_injected or node.resource_type == NodeType.Snapshot:
return node
fire_event(WritingInjectedSQLForNode(node_info=get_node_info()))
fire_event(WritingInjectedSQLForNode(unique_id=node.unique_id))
if node.compiled_code:
node.compiled_path = node.write_node(
@@ -511,7 +528,7 @@ class Compiler:
manifest: Manifest,
extra_context: Optional[Dict[str, Any]] = None,
write: bool = True,
) -> ManifestNode:
) -> NonSourceCompiledNode:
"""This is the main entry point into this code. It's called by
CompileRunner.compile, GenericRPCRunner.compile, and
RunTask.get_hook_sql. It calls '_compile_node' to convert

View File

@@ -381,6 +381,7 @@ class PartialProject(RenderComponents):
sources: Dict[str, Any]
tests: Dict[str, Any]
metrics: Dict[str, Any]
entities: Dict[str, Any]
exposures: Dict[str, Any]
vars_value: VarProvider
@@ -391,6 +392,7 @@ class PartialProject(RenderComponents):
sources = cfg.sources
tests = cfg.tests
metrics = cfg.metrics
entities = cfg.entities
exposures = cfg.exposures
if cfg.vars is None:
vars_dict: Dict[str, Any] = {}
@@ -446,6 +448,7 @@ class PartialProject(RenderComponents):
sources=sources,
tests=tests,
metrics=metrics,
entities=entities,
exposures=exposures,
vars=vars_value,
config_version=cfg.config_version,
@@ -550,6 +553,7 @@ class Project:
sources: Dict[str, Any]
tests: Dict[str, Any]
metrics: Dict[str, Any]
entities: Dict[str, Any]
exposures: Dict[str, Any]
vars: VarProvider
dbt_version: List[VersionSpecifier]
@@ -624,6 +628,7 @@ class Project:
"sources": self.sources,
"tests": self.tests,
"metrics": self.metrics,
"entities": self.entities,
"exposures": self.exposures,
"vars": self.vars.to_dict(),
"require-dbt-version": [v.to_version_string() for v in self.dbt_version],

View File

@@ -116,6 +116,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
sources=project.sources,
tests=project.tests,
metrics=project.metrics,
entities=project.entities,
exposures=project.exposures,
vars=project.vars,
config_version=project.config_version,
@@ -311,6 +312,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
"sources": self._get_config_paths(self.sources),
"tests": self._get_config_paths(self.tests),
"metrics": self._get_config_paths(self.metrics),
"entities": self._get_config_paths(self.entities),
"exposures": self._get_config_paths(self.exposures),
}
@@ -506,6 +508,7 @@ class UnsetProfileConfig(RuntimeConfig):
"sources": self.sources,
"tests": self.tests,
"metrics": self.metrics,
"entities": self.entities,
"exposures": self.exposures,
"vars": self.vars.to_dict(),
"require-dbt-version": [v.to_version_string() for v in self.dbt_version],
@@ -568,6 +571,7 @@ class UnsetProfileConfig(RuntimeConfig):
sources=project.sources,
tests=project.tests,
metrics=project.metrics,
entities=project.entities,
exposures=project.exposures,
vars=project.vars,
config_version=project.config_version,

View File

@@ -8,7 +8,7 @@ from dbt import utils
from dbt.clients.jinja import get_rendered
from dbt.clients.yaml_helper import yaml, safe_load, SafeLoader, Loader, Dumper # noqa: F401
from dbt.constants import SECRET_ENV_PREFIX, DEFAULT_ENV_PLACEHOLDER
from dbt.contracts.graph.nodes import Resource
from dbt.contracts.graph.compiled import CompiledResource
from dbt.exceptions import (
CompilationException,
MacroReturn,
@@ -18,7 +18,6 @@ from dbt.exceptions import (
)
from dbt.events.functions import fire_event, get_invocation_id
from dbt.events.types import JinjaLogInfo, JinjaLogDebug
from dbt.events.contextvars import get_node_info
from dbt.version import __version__ as dbt_version
# These modules are added to the context. Consider alternative
@@ -135,11 +134,11 @@ class Var:
self,
context: Mapping[str, Any],
cli_vars: Mapping[str, Any],
node: Optional[Resource] = None,
node: Optional[CompiledResource] = None,
) -> None:
self._context: Mapping[str, Any] = context
self._cli_vars: Mapping[str, Any] = cli_vars
self._node: Optional[Resource] = node
self._node: Optional[CompiledResource] = node
self._merged: Mapping[str, Any] = self._generate_merged()
def _generate_merged(self) -> Mapping[str, Any]:
@@ -559,9 +558,9 @@ class BaseContext(metaclass=ContextMeta):
{% endmacro %}"
"""
if info:
fire_event(JinjaLogInfo(msg=msg, node_info=get_node_info()))
fire_event(JinjaLogInfo(msg=msg))
else:
fire_event(JinjaLogDebug(msg=msg, node_info=get_node_info()))
fire_event(JinjaLogDebug(msg=msg))
return ""
@contextproperty

View File

@@ -45,6 +45,8 @@ class UnrenderedConfig(ConfigSource):
model_configs = unrendered.get("tests")
elif resource_type == NodeType.Metric:
model_configs = unrendered.get("metrics")
elif resource_type == NodeType.Entity:
model_configs = unrendered.get("entities")
elif resource_type == NodeType.Exposure:
model_configs = unrendered.get("exposures")
else:
@@ -70,6 +72,8 @@ class RenderedConfig(ConfigSource):
model_configs = self.project.tests
elif resource_type == NodeType.Metric:
model_configs = self.project.metrics
elif resource_type == NodeType.Entity:
model_configs = self.project.entities
elif resource_type == NodeType.Exposure:
model_configs = self.project.exposures
else:

View File

@@ -5,8 +5,9 @@ from dbt.exceptions import (
doc_target_not_found,
)
from dbt.config.runtime import RuntimeConfig
from dbt.contracts.graph.compiled import CompileResultNode
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import Macro, ResultNode
from dbt.contracts.graph.parsed import ParsedMacro
from dbt.context.base import contextmember
from dbt.context.configured import SchemaYamlContext
@@ -16,7 +17,7 @@ class DocsRuntimeContext(SchemaYamlContext):
def __init__(
self,
config: RuntimeConfig,
node: Union[Macro, ResultNode],
node: Union[ParsedMacro, CompileResultNode],
manifest: Manifest,
current_project: str,
) -> None:
@@ -54,7 +55,7 @@ class DocsRuntimeContext(SchemaYamlContext):
else:
doc_invalid_args(self.node, args)
# Documentation
# ParsedDocumentation
target_doc = self.manifest.resolve_doc(
doc_name,
doc_package_name,

View File

@@ -1,10 +1,10 @@
from typing import Dict, MutableMapping, Optional
from dbt.contracts.graph.nodes import Macro
from dbt.contracts.graph.parsed import ParsedMacro
from dbt.exceptions import raise_duplicate_macro_name, raise_compiler_error
from dbt.include.global_project import PROJECT_NAME as GLOBAL_PROJECT_NAME
from dbt.clients.jinja import MacroGenerator
MacroNamespace = Dict[str, Macro]
MacroNamespace = Dict[str, ParsedMacro]
# This class builds the MacroResolver by adding macros
@@ -21,7 +21,7 @@ MacroNamespace = Dict[str, Macro]
class MacroResolver:
def __init__(
self,
macros: MutableMapping[str, Macro],
macros: MutableMapping[str, ParsedMacro],
root_project_name: str,
internal_package_names,
) -> None:
@@ -77,7 +77,7 @@ class MacroResolver:
def _add_macro_to(
self,
package_namespaces: Dict[str, MacroNamespace],
macro: Macro,
macro: ParsedMacro,
):
if macro.package_name in package_namespaces:
namespace = package_namespaces[macro.package_name]
@@ -89,7 +89,7 @@ class MacroResolver:
raise_duplicate_macro_name(macro, macro, macro.package_name)
package_namespaces[macro.package_name][macro.name] = macro
def add_macro(self, macro: Macro):
def add_macro(self, macro: ParsedMacro):
macro_name: str = macro.name
# internal macros (from plugins) will be processed separately from

View File

@@ -1,7 +1,7 @@
from typing import Any, Dict, Iterable, Union, Optional, List, Iterator, Mapping, Set
from dbt.clients.jinja import MacroGenerator, MacroStack
from dbt.contracts.graph.nodes import Macro
from dbt.contracts.graph.parsed import ParsedMacro
from dbt.include.global_project import PROJECT_NAME as GLOBAL_PROJECT_NAME
from dbt.exceptions import raise_duplicate_macro_name, raise_compiler_error
@@ -112,7 +112,7 @@ class MacroNamespaceBuilder:
def _add_macro_to(
self,
hierarchy: Dict[str, FlatNamespace],
macro: Macro,
macro: ParsedMacro,
macro_func: MacroGenerator,
):
if macro.package_name in hierarchy:
@@ -125,7 +125,7 @@ class MacroNamespaceBuilder:
raise_duplicate_macro_name(macro_func.macro, macro, macro.package_name)
hierarchy[macro.package_name][macro.name] = macro_func
def add_macro(self, macro: Macro, ctx: Dict[str, Any]):
def add_macro(self, macro: ParsedMacro, ctx: Dict[str, Any]):
macro_name: str = macro.name
# MacroGenerator is in clients/jinja.py
@@ -147,11 +147,13 @@ class MacroNamespaceBuilder:
elif macro.package_name == self.root_package:
self.globals[macro_name] = macro_func
def add_macros(self, macros: Iterable[Macro], ctx: Dict[str, Any]):
def add_macros(self, macros: Iterable[ParsedMacro], ctx: Dict[str, Any]):
for macro in macros:
self.add_macro(macro, ctx)
def build_namespace(self, macros: Iterable[Macro], ctx: Dict[str, Any]) -> MacroNamespace:
def build_namespace(
self, macros: Iterable[ParsedMacro], ctx: Dict[str, Any]
) -> MacroNamespace:
self.add_macros(macros, ctx)
# Iterate in reverse-order and overwrite: the packages that are first

View File

@@ -28,15 +28,19 @@ from .macros import MacroNamespaceBuilder, MacroNamespace
from .manifest import ManifestContext
from dbt.contracts.connection import AdapterResponse
from dbt.contracts.graph.manifest import Manifest, Disabled
from dbt.contracts.graph.nodes import (
Macro,
Exposure,
Metric,
SeedNode,
SourceDefinition,
Resource,
from dbt.contracts.graph.compiled import (
CompiledResource,
CompiledSeedNode,
ManifestNode,
)
from dbt.contracts.graph.parsed import (
ParsedMacro,
ParsedExposure,
ParsedMetric,
ParsedEntity,
ParsedSeedNode,
ParsedSourceDefinition,
)
from dbt.contracts.graph.metrics import MetricReference, ResolvedMetricReference
from dbt.events.functions import get_metadata_vars
from dbt.exceptions import (
@@ -298,12 +302,10 @@ class BaseMetricResolver(BaseResolver):
self.validate_args(name, package)
return self.resolve(name, package)
class Config(Protocol):
def __init__(self, model, context_config: Optional[ContextConfig]):
...
# Implementation of "config(..)" calls in models
class ParseConfigObject(Config):
def __init__(self, model, context_config: Optional[ContextConfig]):
@@ -509,7 +511,7 @@ class OperationRefResolver(RuntimeRefResolver):
def create_relation(self, target_model: ManifestNode, name: str) -> RelationProxy:
if target_model.is_ephemeral_model:
# In operations, we can't ref() ephemeral nodes, because
# Macros do not support set_cte
# ParsedMacros do not support set_cte
raise_compiler_error(
"Operations can not ref() ephemeral nodes, but {} is ephemeral".format(
target_model.name
@@ -581,9 +583,9 @@ class ModelConfiguredVar(Var):
self,
context: Dict[str, Any],
config: RuntimeConfig,
node: Resource,
node: CompiledResource,
) -> None:
self._node: Resource
self._node: CompiledResource
self._config: RuntimeConfig = config
super().__init__(context, config.cli_vars, node=node)
@@ -687,7 +689,7 @@ class ProviderContext(ManifestContext):
raise InternalException(f"Invalid provider given to context: {provider}")
# mypy appeasement - we know it'll be a RuntimeConfig
self.config: RuntimeConfig
self.model: Union[Macro, ManifestNode] = model
self.model: Union[ParsedMacro, ManifestNode] = model
super().__init__(config, manifest, model.package_name)
self.sql_results: Dict[str, AttrDict] = {}
self.context_config: Optional[ContextConfig] = context_config
@@ -776,7 +778,7 @@ class ProviderContext(ManifestContext):
@contextmember
def write(self, payload: str) -> str:
# macros/source defs aren't 'writeable'.
if isinstance(self.model, (Macro, SourceDefinition)):
if isinstance(self.model, (ParsedMacro, ParsedSourceDefinition)):
raise_compiler_error('cannot "write" macros or sources')
self.model.build_path = self.model.write_node(self.config.target_path, "run", payload)
return ""
@@ -796,7 +798,7 @@ class ProviderContext(ManifestContext):
@contextmember
def load_agate_table(self) -> agate.Table:
if not isinstance(self.model, SeedNode):
if not isinstance(self.model, (ParsedSeedNode, CompiledSeedNode)):
raise_compiler_error(
"can only load_agate_table for seeds (got a {})".format(self.model.resource_type)
)
@@ -1217,13 +1219,7 @@ class ProviderContext(ManifestContext):
if return_value is not None:
# Save the env_var value in the manifest and the var name in the source_file.
# If this is compiling, do not save because it's irrelevant to parsing.
compiling = (
True
if hasattr(self.model, "compiled")
and getattr(self.model, "compiled", False) is True
else False
)
if self.model and not compiling:
if self.model and not hasattr(self.model, "compiled"):
# If the environment variable is set from a default, store a string indicating
# that so we can skip partial parsing. Otherwise the file will be scheduled for
# reparsing. If the default changes, the file will have been updated and therefore
@@ -1278,7 +1274,7 @@ class MacroContext(ProviderContext):
def __init__(
self,
model: Macro,
model: ParsedMacro,
config: RuntimeConfig,
manifest: Manifest,
provider: Provider,
@@ -1393,7 +1389,7 @@ def generate_parser_model_context(
def generate_generate_name_macro_context(
macro: Macro,
macro: ParsedMacro,
config: RuntimeConfig,
manifest: Manifest,
) -> Dict[str, Any]:
@@ -1411,7 +1407,7 @@ def generate_runtime_model_context(
def generate_runtime_macro_context(
macro: Macro,
macro: ParsedMacro,
config: RuntimeConfig,
manifest: Manifest,
package_name: Optional[str],
@@ -1447,7 +1443,7 @@ class ExposureMetricResolver(BaseResolver):
def generate_parse_exposure(
exposure: Exposure,
exposure: ParsedExposure,
config: RuntimeConfig,
manifest: Manifest,
package_name: str,
@@ -1495,9 +1491,8 @@ class MetricRefResolver(BaseResolver):
"the name argument to ref() must be a string"
)
def generate_parse_metrics(
metric: Metric,
metric: ParsedMetric,
config: RuntimeConfig,
manifest: Manifest,
package_name: str,
@@ -1518,6 +1513,41 @@ def generate_parse_metrics(
),
}
class EntityRefResolver(BaseResolver):
def __call__(self, *args) -> str:
package = None
if len(args) == 1:
name = args[0]
elif len(args) == 2:
package, name = args
else:
ref_invalid_args(self.model, args)
self.validate_args(name, package)
self.model.refs.append(list(args))
return ""
def validate_args(self, name, package):
if not isinstance(name, str):
raise ParsingException(
f"In the entity associated with {self.model.original_file_path} "
"the name argument to ref() must be a string"
)
def generate_parse_entities(
entity: ParsedEntity,
config: RuntimeConfig,
manifest: Manifest,
package_name: str,
) -> Dict[str, Any]:
project = config.load_dependencies()[package_name]
return {
"ref": EntityRefResolver(
None,
entity,
project,
manifest,
),
}
# This class is currently used by the schema parser in order
# to limit the number of macros in the context by using

View File

@@ -16,7 +16,6 @@ from dbt.exceptions import InternalException
from dbt.utils import translate_aliases
from dbt.events.functions import fire_event
from dbt.events.types import NewConnectionOpening
from dbt.events.contextvars import get_node_info
from typing_extensions import Protocol
from dbt.dataclass_schema import (
dbtClassMixin,
@@ -113,9 +112,7 @@ class LazyHandle:
self.opener = opener
def resolve(self, connection: Connection) -> Connection:
fire_event(
NewConnectionOpening(connection_state=connection.state, node_info=get_node_info())
)
fire_event(NewConnectionOpening(connection_state=connection.state))
return self.opener(connection)

View File

@@ -227,6 +227,7 @@ class SchemaSourceFile(BaseSourceFile):
sources: List[str] = field(default_factory=list)
exposures: List[str] = field(default_factory=list)
metrics: List[str] = field(default_factory=list)
entities: List[str] = field(default_factory=list)
# node patches contain models, seeds, snapshots, analyses
ndp: List[str] = field(default_factory=list)
# any macro patches in this file by macro unique_id.

View File

@@ -0,0 +1,238 @@
from dbt.contracts.graph.parsed import (
HasTestMetadata,
ParsedNode,
ParsedAnalysisNode,
ParsedSingularTestNode,
ParsedHookNode,
ParsedModelNode,
ParsedExposure,
ParsedMetric,
ParsedEntity,
ParsedResource,
ParsedRPCNode,
ParsedSqlNode,
ParsedGenericTestNode,
ParsedSeedNode,
ParsedSnapshotNode,
ParsedSourceDefinition,
SeedConfig,
TestConfig,
same_seeds,
)
from dbt.node_types import NodeType
from dbt.contracts.util import Replaceable
from dbt.dataclass_schema import dbtClassMixin
from dataclasses import dataclass, field
from typing import Optional, List, Union, Dict, Type
@dataclass
class InjectedCTE(dbtClassMixin, Replaceable):
id: str
sql: str
@dataclass
class CompiledNodeMixin(dbtClassMixin):
# this is a special mixin class to provide a required argument. If a node
# is missing a `compiled` flag entirely, it must not be a CompiledNode.
compiled: bool
@dataclass
class CompiledNode(ParsedNode, CompiledNodeMixin):
compiled_code: Optional[str] = None
extra_ctes_injected: bool = False
extra_ctes: List[InjectedCTE] = field(default_factory=list)
relation_name: Optional[str] = None
_pre_injected_sql: Optional[str] = None
def set_cte(self, cte_id: str, sql: str):
"""This is the equivalent of what self.extra_ctes[cte_id] = sql would
do if extra_ctes were an OrderedDict
"""
for cte in self.extra_ctes:
if cte.id == cte_id:
cte.sql = sql
break
else:
self.extra_ctes.append(InjectedCTE(id=cte_id, sql=sql))
def __post_serialize__(self, dct):
dct = super().__post_serialize__(dct)
if "_pre_injected_sql" in dct:
del dct["_pre_injected_sql"]
return dct
@dataclass
class CompiledAnalysisNode(CompiledNode):
resource_type: NodeType = field(metadata={"restrict": [NodeType.Analysis]})
@dataclass
class CompiledHookNode(CompiledNode):
resource_type: NodeType = field(metadata={"restrict": [NodeType.Operation]})
index: Optional[int] = None
@dataclass
class CompiledModelNode(CompiledNode):
resource_type: NodeType = field(metadata={"restrict": [NodeType.Model]})
# TODO: rm?
@dataclass
class CompiledRPCNode(CompiledNode):
resource_type: NodeType = field(metadata={"restrict": [NodeType.RPCCall]})
@dataclass
class CompiledSqlNode(CompiledNode):
resource_type: NodeType = field(metadata={"restrict": [NodeType.SqlOperation]})
@dataclass
class CompiledSeedNode(CompiledNode):
# keep this in sync with ParsedSeedNode!
resource_type: NodeType = field(metadata={"restrict": [NodeType.Seed]})
config: SeedConfig = field(default_factory=SeedConfig)
root_path: Optional[str] = None
@property
def empty(self):
"""Seeds are never empty"""
return False
def same_body(self, other) -> bool:
return same_seeds(self, other)
@dataclass
class CompiledSnapshotNode(CompiledNode):
resource_type: NodeType = field(metadata={"restrict": [NodeType.Snapshot]})
@dataclass
class CompiledSingularTestNode(CompiledNode):
resource_type: NodeType = field(metadata={"restrict": [NodeType.Test]})
# Was not able to make mypy happy and keep the code working. We need to
# refactor the various configs.
config: TestConfig = field(default_factory=TestConfig) # type:ignore
@dataclass
class CompiledGenericTestNode(CompiledNode, HasTestMetadata):
# keep this in sync with ParsedGenericTestNode!
resource_type: NodeType = field(metadata={"restrict": [NodeType.Test]})
column_name: Optional[str] = None
file_key_name: Optional[str] = None
# Was not able to make mypy happy and keep the code working. We need to
# refactor the various configs.
config: TestConfig = field(default_factory=TestConfig) # type:ignore
def same_contents(self, other) -> bool:
if other is None:
return False
return self.same_config(other) and self.same_fqn(other) and True
CompiledTestNode = Union[CompiledSingularTestNode, CompiledGenericTestNode]
PARSED_TYPES: Dict[Type[CompiledNode], Type[ParsedResource]] = {
CompiledAnalysisNode: ParsedAnalysisNode,
CompiledModelNode: ParsedModelNode,
CompiledHookNode: ParsedHookNode,
CompiledRPCNode: ParsedRPCNode,
CompiledSqlNode: ParsedSqlNode,
CompiledSeedNode: ParsedSeedNode,
CompiledSnapshotNode: ParsedSnapshotNode,
CompiledSingularTestNode: ParsedSingularTestNode,
CompiledGenericTestNode: ParsedGenericTestNode,
}
COMPILED_TYPES: Dict[Type[ParsedResource], Type[CompiledNode]] = {
ParsedAnalysisNode: CompiledAnalysisNode,
ParsedModelNode: CompiledModelNode,
ParsedHookNode: CompiledHookNode,
ParsedRPCNode: CompiledRPCNode,
ParsedSqlNode: CompiledSqlNode,
ParsedSeedNode: CompiledSeedNode,
ParsedSnapshotNode: CompiledSnapshotNode,
ParsedSingularTestNode: CompiledSingularTestNode,
ParsedGenericTestNode: CompiledGenericTestNode,
}
# for some types, the compiled type is the parsed type, so make this easy
CompiledType = Union[Type[CompiledNode], Type[ParsedResource]]
CompiledResource = Union[ParsedResource, CompiledNode]
def compiled_type_for(parsed: ParsedNode) -> CompiledType:
if type(parsed) in COMPILED_TYPES:
return COMPILED_TYPES[type(parsed)]
else:
return type(parsed)
def parsed_instance_for(compiled: CompiledNode) -> ParsedResource:
cls = PARSED_TYPES.get(type(compiled))
if cls is None:
# how???
raise ValueError("invalid resource_type: {}".format(compiled.resource_type))
return cls.from_dict(compiled.to_dict(omit_none=True))
NonSourceCompiledNode = Union[
CompiledAnalysisNode,
CompiledSingularTestNode,
CompiledModelNode,
CompiledHookNode,
CompiledRPCNode,
CompiledSqlNode,
CompiledGenericTestNode,
CompiledSeedNode,
CompiledSnapshotNode,
]
NonSourceParsedNode = Union[
ParsedAnalysisNode,
ParsedSingularTestNode,
ParsedHookNode,
ParsedModelNode,
ParsedRPCNode,
ParsedSqlNode,
ParsedGenericTestNode,
ParsedSeedNode,
ParsedSnapshotNode,
]
# This is anything that can be in manifest.nodes.
ManifestNode = Union[
NonSourceCompiledNode,
NonSourceParsedNode,
]
# We allow either parsed or compiled nodes, or parsed sources, as some
# 'compile()' calls in the runner actually just return the original parsed
# node they were given.
CompileResultNode = Union[
ManifestNode,
ParsedSourceDefinition,
]
# anything that participates in the graph: sources, exposures, metrics,
# or manifest nodes
GraphMemberNode = Union[
CompileResultNode,
ParsedExposure,
ParsedMetric,
ParsedEntity,
]

View File

@@ -16,24 +16,30 @@ from typing import (
TypeVar,
Callable,
Generic,
cast,
AbstractSet,
ClassVar,
)
from typing_extensions import Protocol
from uuid import UUID
from dbt.contracts.graph.nodes import (
Macro,
Documentation,
SourceDefinition,
GenericTestNode,
Exposure,
Metric,
from dbt.contracts.graph.compiled import (
CompileResultNode,
ManifestNode,
NonSourceCompiledNode,
GraphMemberNode,
)
from dbt.contracts.graph.parsed import (
ParsedMacro,
ParsedDocumentation,
ParsedSourceDefinition,
ParsedGenericTestNode,
ParsedExposure,
ParsedMetric,
ParsedEntity,
HasUniqueID,
UnpatchedSourceDefinition,
ManifestNode,
GraphMemberNode,
ResultNode,
ManifestNodes,
)
from dbt.contracts.graph.unparsed import SourcePatch
from dbt.contracts.files import SourceFile, SchemaSourceFile, FileHash, AnySourceFile
@@ -91,7 +97,7 @@ class DocLookup(dbtClassMixin):
return self.perform_lookup(unique_id, manifest)
return None
def add_doc(self, doc: Documentation):
def add_doc(self, doc: ParsedDocumentation):
if doc.name not in self.storage:
self.storage[doc.name] = {}
self.storage[doc.name][doc.package_name] = doc.unique_id
@@ -100,7 +106,7 @@ class DocLookup(dbtClassMixin):
for doc in manifest.docs.values():
self.add_doc(doc)
def perform_lookup(self, unique_id: UniqueID, manifest) -> Documentation:
def perform_lookup(self, unique_id: UniqueID, manifest) -> ParsedDocumentation:
if unique_id not in manifest.docs:
raise dbt.exceptions.InternalException(
f"Doc {unique_id} found in cache but not found in manifest"
@@ -122,7 +128,7 @@ class SourceLookup(dbtClassMixin):
return self.perform_lookup(unique_id, manifest)
return None
def add_source(self, source: SourceDefinition):
def add_source(self, source: ParsedSourceDefinition):
if source.search_name not in self.storage:
self.storage[source.search_name] = {}
@@ -133,7 +139,7 @@ class SourceLookup(dbtClassMixin):
if hasattr(source, "source_name"):
self.add_source(source)
def perform_lookup(self, unique_id: UniqueID, manifest: "Manifest") -> SourceDefinition:
def perform_lookup(self, unique_id: UniqueID, manifest: "Manifest") -> ParsedSourceDefinition:
if unique_id not in manifest.sources:
raise dbt.exceptions.InternalException(
f"Source {unique_id} found in cache but not found in manifest"
@@ -193,7 +199,7 @@ class MetricLookup(dbtClassMixin):
return self.perform_lookup(unique_id, manifest)
return None
def add_metric(self, metric: Metric):
def add_metric(self, metric: ParsedMetric):
if metric.search_name not in self.storage:
self.storage[metric.search_name] = {}
@@ -204,15 +210,46 @@ class MetricLookup(dbtClassMixin):
if hasattr(metric, "name"):
self.add_metric(metric)
def perform_lookup(self, unique_id: UniqueID, manifest: "Manifest") -> Metric:
def perform_lookup(self, unique_id: UniqueID, manifest: "Manifest") -> ParsedMetric:
if unique_id not in manifest.metrics:
raise dbt.exceptions.InternalException(
f"Metric {unique_id} found in cache but not found in manifest"
)
return manifest.metrics[unique_id]
class EntityLookup(dbtClassMixin):
def __init__(self, manifest: "Manifest"):
self.storage: Dict[str, Dict[PackageName, UniqueID]] = {}
self.populate(manifest)
# This handles both models/seeds/snapshots and sources/metrics/exposures
def get_unique_id(self, search_name, package: Optional[PackageName]):
return find_unique_id_for_package(self.storage, search_name, package)
def find(self, search_name, package: Optional[PackageName], manifest: "Manifest"):
unique_id = self.get_unique_id(search_name, package)
if unique_id is not None:
return self.perform_lookup(unique_id, manifest)
return None
def add_entity(self, entity: ParsedEntity):
if entity.search_name not in self.storage:
self.storage[entity.search_name] = {}
self.storage[entity.search_name][entity.package_name] = entity.unique_id
def populate(self, manifest):
for entity in manifest.entities.values():
if hasattr(entity, "name"):
self.add_entity(entity)
def perform_lookup(self, unique_id: UniqueID, manifest: "Manifest") -> ParsedEntity:
if unique_id not in manifest.entities:
raise dbt.exceptions.InternalException(
f"Entity {unique_id} found in cache but not found in manifest"
)
return manifest.entities[unique_id]
# This handles both models/seeds/snapshots and sources/metrics/entities/exposures
class DisabledLookup(dbtClassMixin):
def __init__(self, manifest: "Manifest"):
self.storage: Dict[str, Dict[PackageName, List[Any]]] = {}
@@ -360,7 +397,7 @@ class Locality(enum.IntEnum):
@dataclass
class MacroCandidate:
locality: Locality
macro: Macro
macro: ParsedMacro
def __eq__(self, other: object) -> bool:
if not isinstance(other, MacroCandidate):
@@ -425,14 +462,16 @@ M = TypeVar("M", bound=MacroCandidate)
class CandidateList(List[M]):
def last(self) -> Optional[Macro]:
def last(self) -> Optional[ParsedMacro]:
if not self:
return None
self.sort()
return self[-1].macro
def _get_locality(macro: Macro, root_project_name: str, internal_packages: Set[str]) -> Locality:
def _get_locality(
macro: ParsedMacro, root_project_name: str, internal_packages: Set[str]
) -> Locality:
if macro.package_name == root_project_name:
return Locality.Root
elif macro.package_name in internal_packages:
@@ -458,16 +497,17 @@ class Disabled(Generic[D]):
target: D
MaybeMetricNode = Optional[Union[Metric, Disabled[Metric]]]
MaybeMetricNode = Optional[Union[ParsedMetric, Disabled[ParsedMetric]]]
MaybeEntityNode = Optional[Union[ParsedEntity, Disabled[ParsedEntity]]]
MaybeDocumentation = Optional[Documentation]
MaybeDocumentation = Optional[ParsedDocumentation]
MaybeParsedSource = Optional[
Union[
SourceDefinition,
Disabled[SourceDefinition],
ParsedSourceDefinition,
Disabled[ParsedSourceDefinition],
]
]
@@ -507,7 +547,7 @@ class MacroMethods:
def find_macro_by_name(
self, name: str, root_project_name: str, package: Optional[str]
) -> Optional[Macro]:
) -> Optional[ParsedMacro]:
"""Find a macro in the graph by its name and package name, or None for
any package. The root project name is used to determine priority:
- locally defined macros come first
@@ -530,7 +570,7 @@ class MacroMethods:
def find_generate_macro_by_name(
self, component: str, root_project_name: str
) -> Optional[Macro]:
) -> Optional[ParsedMacro]:
"""
The `generate_X_name` macros are similar to regular ones, but ignore
imported packages.
@@ -599,11 +639,12 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
# is added it must all be added in the __reduce_ex__ method in the
# args tuple in the right position.
nodes: MutableMapping[str, ManifestNode] = field(default_factory=dict)
sources: MutableMapping[str, SourceDefinition] = field(default_factory=dict)
macros: MutableMapping[str, Macro] = field(default_factory=dict)
docs: MutableMapping[str, Documentation] = field(default_factory=dict)
exposures: MutableMapping[str, Exposure] = field(default_factory=dict)
metrics: MutableMapping[str, Metric] = field(default_factory=dict)
sources: MutableMapping[str, ParsedSourceDefinition] = field(default_factory=dict)
macros: MutableMapping[str, ParsedMacro] = field(default_factory=dict)
docs: MutableMapping[str, ParsedDocumentation] = field(default_factory=dict)
exposures: MutableMapping[str, ParsedExposure] = field(default_factory=dict)
metrics: MutableMapping[str, ParsedMetric] = field(default_factory=dict)
entities: MutableMapping[str, ParsedEntity] = field(default_factory=dict)
selectors: MutableMapping[str, Any] = field(default_factory=dict)
files: MutableMapping[str, AnySourceFile] = field(default_factory=dict)
metadata: ManifestMetadata = field(default_factory=ManifestMetadata)
@@ -625,6 +666,9 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
_metric_lookup: Optional[MetricLookup] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
)
_entity_lookup: Optional[EntityLookup] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
)
_disabled_lookup: Optional[DisabledLookup] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
)
@@ -651,7 +695,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
obj._lock = flags.MP_CONTEXT.Lock()
return obj
def sync_update_node(self, new_node: ManifestNode) -> ManifestNode:
def sync_update_node(self, new_node: NonSourceCompiledNode) -> NonSourceCompiledNode:
"""update the node with a lock. The only time we should want to lock is
when compiling an ephemeral ancestor of a node at runtime, because
multiple threads could be just-in-time compiling the same ephemeral
@@ -664,21 +708,24 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
with self._lock:
existing = self.nodes[new_node.unique_id]
if getattr(existing, "compiled", False):
# already compiled
return existing
# already compiled -> must be a NonSourceCompiledNode
return cast(NonSourceCompiledNode, existing)
_update_into(self.nodes, new_node)
return new_node
def update_exposure(self, new_exposure: Exposure):
def update_exposure(self, new_exposure: ParsedExposure):
_update_into(self.exposures, new_exposure)
def update_metric(self, new_metric: Metric):
def update_metric(self, new_metric: ParsedMetric):
_update_into(self.metrics, new_metric)
def update_entity(self, new_entity: ParsedEntity):
_update_into(self.entities, new_entity)
def update_node(self, new_node: ManifestNode):
_update_into(self.nodes, new_node)
def update_source(self, new_source: SourceDefinition):
def update_source(self, new_source: ParsedSourceDefinition):
_update_into(self.sources, new_source)
def build_flat_graph(self):
@@ -690,6 +737,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self.flat_graph = {
"exposures": {k: v.to_dict(omit_none=False) for k, v in self.exposures.items()},
"metrics": {k: v.to_dict(omit_none=False) for k, v in self.metrics.items()},
"entities": {k: v.to_dict(omit_none=False) for k, v in self.entities.items()},
"nodes": {k: v.to_dict(omit_none=False) for k, v in self.nodes.items()},
"sources": {k: v.to_dict(omit_none=False) for k, v in self.sources.items()},
}
@@ -731,7 +779,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
def find_materialization_macro_by_name(
self, project_name: str, materialization_name: str, adapter_type: str
) -> Optional[Macro]:
) -> Optional[ParsedMacro]:
candidates: CandidateList = CandidateList(
chain.from_iterable(
self._materialization_candidates_for(
@@ -752,6 +800,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self.nodes.values(),
self.sources.values(),
self.metrics.values(),
self.entities.values(),
)
for resource in all_resources:
resource_type_plural = resource.resource_type.pluralize()
@@ -780,6 +829,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
docs={k: _deepcopy(v) for k, v in self.docs.items()},
exposures={k: _deepcopy(v) for k, v in self.exposures.items()},
metrics={k: _deepcopy(v) for k, v in self.metrics.items()},
entities={k: _deepcopy(v) for k, v in self.entities.items()},
selectors={k: _deepcopy(v) for k, v in self.selectors.items()},
metadata=self.metadata,
disabled={k: _deepcopy(v) for k, v in self.disabled.items()},
@@ -796,6 +846,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self.sources.values(),
self.exposures.values(),
self.metrics.values(),
self.entities.values(),
)
)
forward_edges, backward_edges = build_node_edges(edge_members)
@@ -821,6 +872,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
docs=self.docs,
exposures=self.exposures,
metrics=self.metrics,
entities=self.entities,
selectors=self.selectors,
metadata=self.metadata,
disabled=self.disabled,
@@ -842,6 +894,8 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
return self.exposures[unique_id]
elif unique_id in self.metrics:
return self.metrics[unique_id]
elif unique_id in self.entities:
return self.entities[unique_id]
else:
# something terrible has happened
raise dbt.exceptions.InternalException(
@@ -878,6 +932,12 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self._metric_lookup = MetricLookup(self)
return self._metric_lookup
@property
def entity_lookup(self) -> EntityLookup:
if self._entity_lookup is None:
self._entity_lookup = EntityLookup(self)
return self._entity_lookup
def rebuild_ref_lookup(self):
self._ref_lookup = RefableLookup(self)
@@ -936,8 +996,8 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
search_name = f"{target_source_name}.{target_table_name}"
candidates = _search_packages(current_project, node_package)
source: Optional[SourceDefinition] = None
disabled: Optional[List[SourceDefinition]] = None
source: Optional[ParsedSourceDefinition] = None
disabled: Optional[List[ParsedSourceDefinition]] = None
for pkg in candidates:
source = self.source_lookup.find(search_name, pkg, self)
@@ -961,8 +1021,8 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
node_package: str,
) -> MaybeMetricNode:
metric: Optional[Metric] = None
disabled: Optional[List[Metric]] = None
metric: Optional[ParsedMetric] = None
disabled: Optional[List[ParsedMetric]] = None
candidates = _search_packages(current_project, node_package, target_metric_package)
for pkg in candidates:
@@ -978,6 +1038,32 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
return Disabled(disabled[0])
return None
def resolve_entity(
self,
target_entity_name: str,
target_entity_package: Optional[str],
current_project: str,
node_package: str,
) -> MaybeEntityNode:
entity: Optional[ParsedEntity] = None
disabled: Optional[List[ParsedEntity]] = None
candidates = _search_packages(current_project, node_package, target_entity_package)
for pkg in candidates:
entity = self.entity_lookup.find(target_entity_name, pkg, self)
if entity is not None and entity.config.enabled:
return entity
# it's possible that the node is disabled
if disabled is None:
disabled = self.disabled_lookup.find(f"{target_entity_name}", pkg)
if disabled:
return Disabled(disabled[0])
return None
# Called by DocsRuntimeContext.doc
def resolve_doc(
self,
@@ -985,7 +1071,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
package: Optional[str],
current_project: str,
node_package: str,
) -> Optional[Documentation]:
) -> Optional[ParsedDocumentation]:
"""Resolve the given documentation. This follows the same algorithm as
resolve_ref except the is_enabled checks are unnecessary as docs are
always enabled.
@@ -1037,7 +1123,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
# Methods that were formerly in ParseResult
def add_macro(self, source_file: SourceFile, macro: Macro):
def add_macro(self, source_file: SourceFile, macro: ParsedMacro):
if macro.unique_id in self.macros:
# detect that the macro exists and emit an error
other_path = self.macros[macro.unique_id].original_file_path
@@ -1079,34 +1165,41 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self.sources[source.unique_id] = source # type: ignore
source_file.sources.append(source.unique_id)
def add_node_nofile(self, node: ManifestNode):
def add_node_nofile(self, node: ManifestNodes):
# nodes can't be overwritten!
_check_duplicates(node, self.nodes)
self.nodes[node.unique_id] = node
def add_node(self, source_file: AnySourceFile, node: ManifestNode, test_from=None):
def add_node(self, source_file: AnySourceFile, node: ManifestNodes, test_from=None):
self.add_node_nofile(node)
if isinstance(source_file, SchemaSourceFile):
if isinstance(node, GenericTestNode):
if isinstance(node, ParsedGenericTestNode):
assert test_from
source_file.add_test(node.unique_id, test_from)
if isinstance(node, Metric):
if isinstance(node, ParsedMetric):
source_file.metrics.append(node.unique_id)
if isinstance(node, Exposure):
if isinstance(node, ParsedEntity):
source_file.entities.append(node.unique_id)
if isinstance(node, ParsedExposure):
source_file.exposures.append(node.unique_id)
else:
source_file.nodes.append(node.unique_id)
def add_exposure(self, source_file: SchemaSourceFile, exposure: Exposure):
def add_exposure(self, source_file: SchemaSourceFile, exposure: ParsedExposure):
_check_duplicates(exposure, self.exposures)
self.exposures[exposure.unique_id] = exposure
source_file.exposures.append(exposure.unique_id)
def add_metric(self, source_file: SchemaSourceFile, metric: Metric):
def add_metric(self, source_file: SchemaSourceFile, metric: ParsedMetric):
_check_duplicates(metric, self.metrics)
self.metrics[metric.unique_id] = metric
source_file.metrics.append(metric.unique_id)
def add_entity(self, source_file: SchemaSourceFile, entity: ParsedEntity):
_check_duplicates(entity, self.entities)
self.entities[entity.unique_id] = entity
source_file.entities.append(entity.unique_id)
def add_disabled_nofile(self, node: GraphMemberNode):
# There can be multiple disabled nodes for the same unique_id
if node.unique_id in self.disabled:
@@ -1114,20 +1207,22 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
else:
self.disabled[node.unique_id] = [node]
def add_disabled(self, source_file: AnySourceFile, node: ResultNode, test_from=None):
def add_disabled(self, source_file: AnySourceFile, node: CompileResultNode, test_from=None):
self.add_disabled_nofile(node)
if isinstance(source_file, SchemaSourceFile):
if isinstance(node, GenericTestNode):
if isinstance(node, ParsedGenericTestNode):
assert test_from
source_file.add_test(node.unique_id, test_from)
if isinstance(node, Metric):
if isinstance(node, ParsedMetric):
source_file.metrics.append(node.unique_id)
if isinstance(node, Exposure):
if isinstance(node, ParsedEntity):
source_file.entities.append(node.unique_id)
if isinstance(node, ParsedExposure):
source_file.exposures.append(node.unique_id)
else:
source_file.nodes.append(node.unique_id)
def add_doc(self, source_file: SourceFile, doc: Documentation):
def add_doc(self, source_file: SourceFile, doc: ParsedDocumentation):
_check_duplicates(doc, self.docs)
self.docs[doc.unique_id] = doc
source_file.docs.append(doc.unique_id)
@@ -1149,6 +1244,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self.docs,
self.exposures,
self.metrics,
self.entities,
self.selectors,
self.files,
self.metadata,
@@ -1161,6 +1257,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self._source_lookup,
self._ref_lookup,
self._metric_lookup,
self._entity_lookup,
self._disabled_lookup,
self._analysis_lookup,
)
@@ -1185,27 +1282,30 @@ class WritableManifest(ArtifactMixin):
nodes: Mapping[UniqueID, ManifestNode] = field(
metadata=dict(description=("The nodes defined in the dbt project and its dependencies"))
)
sources: Mapping[UniqueID, SourceDefinition] = field(
sources: Mapping[UniqueID, ParsedSourceDefinition] = field(
metadata=dict(description=("The sources defined in the dbt project and its dependencies"))
)
macros: Mapping[UniqueID, Macro] = field(
macros: Mapping[UniqueID, ParsedMacro] = field(
metadata=dict(description=("The macros defined in the dbt project and its dependencies"))
)
docs: Mapping[UniqueID, Documentation] = field(
docs: Mapping[UniqueID, ParsedDocumentation] = field(
metadata=dict(description=("The docs defined in the dbt project and its dependencies"))
)
exposures: Mapping[UniqueID, Exposure] = field(
exposures: Mapping[UniqueID, ParsedExposure] = field(
metadata=dict(
description=("The exposures defined in the dbt project and its dependencies")
)
)
metrics: Mapping[UniqueID, Metric] = field(
metrics: Mapping[UniqueID, ParsedMetric] = field(
metadata=dict(description=("The metrics defined in the dbt project and its dependencies"))
)
entities: Mapping[UniqueID, ParsedEntity] = field(
metadata=dict(description=("The entities defined in the dbt project and its dependencies"))
)
selectors: Mapping[UniqueID, Any] = field(
metadata=dict(description=("The selectors defined in selectors.yml"))
)
disabled: Optional[Mapping[UniqueID, List[ResultNode]]] = field(
disabled: Optional[Mapping[UniqueID, List[CompileResultNode]]] = field(
metadata=dict(description="A mapping of the disabled nodes in the target")
)
parent_map: Optional[NodeEdgeMap] = field(

View File

@@ -12,7 +12,7 @@ class MetricReference(object):
class ResolvedMetricReference(MetricReference):
"""
Simple proxy over a Metric which delegates property
Simple proxy over a ParsedMetric which delegates property
lookups to the underlying node. Also adds helper functions
for working with metrics (ie. __str__ and templating functions)
"""

View File

@@ -367,6 +367,9 @@ class BaseConfig(AdditionalPropertiesAllowed, Replaceable):
class MetricConfig(BaseConfig):
enabled: bool = True
@dataclass
class EntityConfig(BaseConfig):
enabled: bool = True
@dataclass
class ExposureConfig(BaseConfig):
@@ -604,6 +607,7 @@ class SnapshotConfig(EmptySnapshotConfig):
RESOURCE_TYPES: Dict[NodeType, Type[BaseConfig]] = {
NodeType.Metric: MetricConfig,
NodeType.Entity: EntityConfig,
NodeType.Exposure: ExposureConfig,
NodeType.Source: SourceConfig,
NodeType.Seed: SeedConfig,

View File

@@ -38,6 +38,7 @@ from dbt.contracts.graph.unparsed import (
MaturityType,
MetricFilter,
MetricTime,
EntityDimension
)
from dbt.contracts.util import Replaceable, AdditionalPropertiesMixin
from dbt.events.proto_types import NodeInfo
@@ -48,7 +49,6 @@ from dbt.events.types import (
SeedExceedsLimitAndPathChanged,
SeedExceedsLimitChecksumChanged,
)
from dbt.events.contextvars import set_contextvars
from dbt import flags
from dbt.node_types import ModelLanguage, NodeType
@@ -59,6 +59,7 @@ from .model_config import (
TestConfig,
SourceConfig,
MetricConfig,
EntityConfig,
ExposureConfig,
EmptySnapshotConfig,
SnapshotConfig,
@@ -99,49 +100,6 @@ class MacroDependsOn(dbtClassMixin, Replaceable):
self.macros.append(value)
@dataclass
class InjectedCTE(dbtClassMixin, Replaceable):
id: str
sql: str
@dataclass
class CompiledNode:
compiled: bool = False
compiled_code: Optional[str] = None
extra_ctes_injected: bool = False
extra_ctes: List[InjectedCTE] = field(default_factory=list)
relation_name: Optional[str] = None
_pre_injected_sql: Optional[str] = None
def set_cte(self, cte_id: str, sql: str):
"""This is the equivalent of what self.extra_ctes[cte_id] = sql would
do if extra_ctes were an OrderedDict
"""
for cte in self.extra_ctes:
if cte.id == cte_id:
cte.sql = sql
break
else:
self.extra_ctes.append(InjectedCTE(id=cte_id, sql=sql))
def __post_serialize__(self, dct):
dct = super().__post_serialize__(dct)
if "_pre_injected_sql" in dct:
del dct["_pre_injected_sql"]
# Remove compiled attributes
if "compiled" in dct and dct["compiled"] is False:
del dct["compiled"]
del dct["extra_ctes_injected"]
del dct["extra_ctes"]
# "omit_none" means these might not be in the dictionary
if "compiled_code" in dct:
del dct["compiled_code"]
if "relation_name" in dct:
del dct["relation_name"]
return dct
@dataclass
class DependsOn(MacroDependsOn):
nodes: List[str] = field(default_factory=list)
@@ -246,21 +204,14 @@ class NodeInfoMixin:
node_info_msg = NodeInfo(**node_info)
return node_info_msg
def update_event_status(self, **kwargs):
for k, v in kwargs.items():
self._event_status[k] = v
set_contextvars(node_info=self.node_info)
def clear_event_status(self):
self._event_status = dict()
@dataclass
class ParsedNodeDefaults(NodeInfoMixin, CompiledNode, ParsedNodeMandatory):
class ParsedNodeDefaults(NodeInfoMixin, ParsedNodeMandatory):
tags: List[str] = field(default_factory=list)
refs: List[List[str]] = field(default_factory=list)
sources: List[List[str]] = field(default_factory=list)
metrics: List[List[str]] = field(default_factory=list)
entities: List[List[str]] = field(default_factory=list)
depends_on: DependsOn = field(default_factory=DependsOn)
description: str = field(default="")
columns: Dict[str, ColumnInfo] = field(default_factory=dict)
@@ -304,30 +255,30 @@ class ParsedNode(ParsedNodeDefaults, ParsedNodeMixins, SerializableType):
@classmethod
def _deserialize(cls, dct: Dict[str, int]):
# The serialized ParsedNodes do not differ from each other
# in fields that would allow 'from_dict' to distinguis
# in fields that would allow 'from_dict' to distinguish
# between them.
resource_type = dct["resource_type"]
if resource_type == "model":
return ModelNode.from_dict(dct)
return ParsedModelNode.from_dict(dct)
elif resource_type == "analysis":
return AnalysisNode.from_dict(dct)
return ParsedAnalysisNode.from_dict(dct)
elif resource_type == "seed":
return SeedNode.from_dict(dct)
return ParsedSeedNode.from_dict(dct)
elif resource_type == "rpc":
return RPCNode.from_dict(dct)
return ParsedRPCNode.from_dict(dct)
elif resource_type == "sql":
return SqlNode.from_dict(dct)
return ParsedSqlNode.from_dict(dct)
elif resource_type == "test":
if "test_metadata" in dct:
return GenericTestNode.from_dict(dct)
return ParsedGenericTestNode.from_dict(dct)
else:
return SingularTestNode.from_dict(dct)
return ParsedSingularTestNode.from_dict(dct)
elif resource_type == "operation":
return HookNode.from_dict(dct)
return ParsedHookNode.from_dict(dct)
elif resource_type == "seed":
return SeedNode.from_dict(dct)
return ParsedSeedNode.from_dict(dct)
elif resource_type == "snapshot":
return SnapshotNode.from_dict(dct)
return ParsedSnapshotNode.from_dict(dct)
else:
return cls.from_dict(dct)
@@ -397,29 +348,29 @@ class ParsedNode(ParsedNodeDefaults, ParsedNodeMixins, SerializableType):
@dataclass
class AnalysisNode(ParsedNode):
class ParsedAnalysisNode(ParsedNode):
resource_type: NodeType = field(metadata={"restrict": [NodeType.Analysis]})
@dataclass
class HookNode(ParsedNode):
class ParsedHookNode(ParsedNode):
resource_type: NodeType = field(metadata={"restrict": [NodeType.Operation]})
index: Optional[int] = None
@dataclass
class ModelNode(ParsedNode):
class ParsedModelNode(ParsedNode):
resource_type: NodeType = field(metadata={"restrict": [NodeType.Model]})
# TODO: rm?
@dataclass
class RPCNode(ParsedNode):
class ParsedRPCNode(ParsedNode):
resource_type: NodeType = field(metadata={"restrict": [NodeType.RPCCall]})
@dataclass
class SqlNode(ParsedNode):
class ParsedSqlNode(ParsedNode):
resource_type: NodeType = field(metadata={"restrict": [NodeType.SqlOperation]})
@@ -460,7 +411,8 @@ def same_seeds(first: ParsedNode, second: ParsedNode) -> bool:
@dataclass
class SeedNode(ParsedNode):
class ParsedSeedNode(ParsedNode):
# keep this in sync with CompiledSeedNode!
resource_type: NodeType = field(metadata={"restrict": [NodeType.Seed]})
config: SeedConfig = field(default_factory=SeedConfig)
# seeds need the root_path because the contents are not loaded initially
@@ -492,7 +444,7 @@ class HasTestMetadata(dbtClassMixin):
@dataclass
class SingularTestNode(ParsedNode):
class ParsedSingularTestNode(ParsedNode):
resource_type: NodeType = field(metadata={"restrict": [NodeType.Test]})
# Was not able to make mypy happy and keep the code working. We need to
# refactor the various configs.
@@ -504,7 +456,8 @@ class SingularTestNode(ParsedNode):
@dataclass
class GenericTestNode(ParsedNode, HasTestMetadata):
class ParsedGenericTestNode(ParsedNode, HasTestMetadata):
# keep this in sync with CompiledGenericTestNode!
resource_type: NodeType = field(metadata={"restrict": [NodeType.Test]})
column_name: Optional[str] = None
file_key_name: Optional[str] = None
@@ -536,7 +489,7 @@ class IntermediateSnapshotNode(ParsedNode):
@dataclass
class SnapshotNode(ParsedNode):
class ParsedSnapshotNode(ParsedNode):
resource_type: NodeType = field(metadata={"restrict": [NodeType.Snapshot]})
config: SnapshotConfig
@@ -564,7 +517,7 @@ class ParsedMacroPatch(ParsedPatch):
@dataclass
class Macro(UnparsedBaseNode, HasUniqueID):
class ParsedMacro(UnparsedBaseNode, HasUniqueID):
name: str
macro_sql: str
resource_type: NodeType = field(metadata={"restrict": [NodeType.Macro]})
@@ -588,7 +541,7 @@ class Macro(UnparsedBaseNode, HasUniqueID):
self.docs = patch.docs
self.arguments = patch.arguments
def same_contents(self, other: Optional["Macro"]) -> bool:
def same_contents(self, other: Optional["ParsedMacro"]) -> bool:
if other is None:
return False
# the only thing that makes one macro different from another with the
@@ -597,7 +550,7 @@ class Macro(UnparsedBaseNode, HasUniqueID):
@dataclass
class Documentation(UnparsedDocumentation, HasUniqueID):
class ParsedDocumentation(UnparsedDocumentation, HasUniqueID):
name: str
block_contents: str
@@ -605,7 +558,7 @@ class Documentation(UnparsedDocumentation, HasUniqueID):
def search_name(self):
return self.name
def same_contents(self, other: Optional["Documentation"]) -> bool:
def same_contents(self, other: Optional["ParsedDocumentation"]) -> bool:
if other is None:
return False
# the only thing that makes one doc different from another with the
@@ -683,7 +636,7 @@ class ParsedSourceMandatory(
@dataclass
class SourceDefinition(NodeInfoMixin, ParsedSourceMandatory):
class ParsedSourceDefinition(NodeInfoMixin, ParsedSourceMandatory):
quoting: Quoting = field(default_factory=Quoting)
loaded_at_field: Optional[str] = None
freshness: Optional[FreshnessThreshold] = None
@@ -704,7 +657,7 @@ class SourceDefinition(NodeInfoMixin, ParsedSourceMandatory):
del dct["_event_status"]
return dct
def same_database_representation(self, other: "SourceDefinition") -> bool:
def same_database_representation(self, other: "ParsedSourceDefinition") -> bool:
return (
self.database == other.database
and self.schema == other.schema
@@ -712,26 +665,26 @@ class SourceDefinition(NodeInfoMixin, ParsedSourceMandatory):
and True
)
def same_quoting(self, other: "SourceDefinition") -> bool:
def same_quoting(self, other: "ParsedSourceDefinition") -> bool:
return self.quoting == other.quoting
def same_freshness(self, other: "SourceDefinition") -> bool:
def same_freshness(self, other: "ParsedSourceDefinition") -> bool:
return (
self.freshness == other.freshness
and self.loaded_at_field == other.loaded_at_field
and True
)
def same_external(self, other: "SourceDefinition") -> bool:
def same_external(self, other: "ParsedSourceDefinition") -> bool:
return self.external == other.external
def same_config(self, old: "SourceDefinition") -> bool:
def same_config(self, old: "ParsedSourceDefinition") -> bool:
return self.config.same_contents(
self.unrendered_config,
old.unrendered_config,
)
def same_contents(self, old: Optional["SourceDefinition"]) -> bool:
def same_contents(self, old: Optional["ParsedSourceDefinition"]) -> bool:
# existing when it didn't before is a change!
if old is None:
return True
@@ -798,7 +751,7 @@ class SourceDefinition(NodeInfoMixin, ParsedSourceMandatory):
@dataclass
class Exposure(UnparsedBaseNode, HasUniqueID, HasFqn):
class ParsedExposure(UnparsedBaseNode, HasUniqueID, HasFqn):
name: str
type: ExposureType
owner: ExposureOwner
@@ -825,34 +778,34 @@ class Exposure(UnparsedBaseNode, HasUniqueID, HasFqn):
def search_name(self):
return self.name
def same_depends_on(self, old: "Exposure") -> bool:
def same_depends_on(self, old: "ParsedExposure") -> bool:
return set(self.depends_on.nodes) == set(old.depends_on.nodes)
def same_description(self, old: "Exposure") -> bool:
def same_description(self, old: "ParsedExposure") -> bool:
return self.description == old.description
def same_label(self, old: "Exposure") -> bool:
def same_label(self, old: "ParsedExposure") -> bool:
return self.label == old.label
def same_maturity(self, old: "Exposure") -> bool:
def same_maturity(self, old: "ParsedExposure") -> bool:
return self.maturity == old.maturity
def same_owner(self, old: "Exposure") -> bool:
def same_owner(self, old: "ParsedExposure") -> bool:
return self.owner == old.owner
def same_exposure_type(self, old: "Exposure") -> bool:
def same_exposure_type(self, old: "ParsedExposure") -> bool:
return self.type == old.type
def same_url(self, old: "Exposure") -> bool:
def same_url(self, old: "ParsedExposure") -> bool:
return self.url == old.url
def same_config(self, old: "Exposure") -> bool:
def same_config(self, old: "ParsedExposure") -> bool:
return self.config.same_contents(
self.unrendered_config,
old.unrendered_config,
)
def same_contents(self, old: Optional["Exposure"]) -> bool:
def same_contents(self, old: Optional["ParsedExposure"]) -> bool:
# existing when it didn't before is a change!
# metadata/tags changes are not "changes"
if old is None:
@@ -879,7 +832,7 @@ class MetricReference(dbtClassMixin, Replaceable):
@dataclass
class Metric(UnparsedBaseNode, HasUniqueID, HasFqn):
class ParsedMetric(UnparsedBaseNode, HasUniqueID, HasFqn):
name: str
description: str
label: str
@@ -895,7 +848,7 @@ class Metric(UnparsedBaseNode, HasUniqueID, HasFqn):
resource_type: NodeType = NodeType.Metric
meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
config: MetricConfig = field(default_factory=MetricConfig)
config: EntityConfig = field(default_factory=EntityConfig)
unrendered_config: Dict[str, Any] = field(default_factory=dict)
sources: List[List[str]] = field(default_factory=list)
depends_on: DependsOn = field(default_factory=DependsOn)
@@ -911,43 +864,43 @@ class Metric(UnparsedBaseNode, HasUniqueID, HasFqn):
def search_name(self):
return self.name
def same_model(self, old: "Metric") -> bool:
def same_model(self, old: "ParsedMetric") -> bool:
return self.model == old.model
def same_window(self, old: "Metric") -> bool:
def same_window(self, old: "ParsedMetric") -> bool:
return self.window == old.window
def same_dimensions(self, old: "Metric") -> bool:
def same_dimensions(self, old: "ParsedMetric") -> bool:
return self.dimensions == old.dimensions
def same_filters(self, old: "Metric") -> bool:
def same_filters(self, old: "ParsedMetric") -> bool:
return self.filters == old.filters
def same_description(self, old: "Metric") -> bool:
def same_description(self, old: "ParsedMetric") -> bool:
return self.description == old.description
def same_label(self, old: "Metric") -> bool:
def same_label(self, old: "ParsedMetric") -> bool:
return self.label == old.label
def same_calculation_method(self, old: "Metric") -> bool:
def same_calculation_method(self, old: "ParsedMetric") -> bool:
return self.calculation_method == old.calculation_method
def same_expression(self, old: "Metric") -> bool:
def same_expression(self, old: "ParsedMetric") -> bool:
return self.expression == old.expression
def same_timestamp(self, old: "Metric") -> bool:
def same_timestamp(self, old: "ParsedMetric") -> bool:
return self.timestamp == old.timestamp
def same_time_grains(self, old: "Metric") -> bool:
def same_time_grains(self, old: "ParsedMetric") -> bool:
return self.time_grains == old.time_grains
def same_config(self, old: "Metric") -> bool:
def same_config(self, old: "ParsedMetric") -> bool:
return self.config.same_contents(
self.unrendered_config,
old.unrendered_config,
)
def same_contents(self, old: Optional["Metric"]) -> bool:
def same_contents(self, old: Optional["ParsedMetric"]) -> bool:
# existing when it didn't before is a change!
# metadata/tags changes are not "changes"
if old is None:
@@ -968,41 +921,80 @@ class Metric(UnparsedBaseNode, HasUniqueID, HasFqn):
and True
)
@dataclass
class ParsedEntity(UnparsedBaseNode, HasUniqueID, HasFqn):
name: str
model: str
description: str
dimensions: Dict[str, EntityDimension] = field(default_factory=dict)
model_unique_id: Optional[str] = None
resource_type: NodeType = NodeType.Metric
meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
config: MetricConfig = field(default_factory=MetricConfig)
unrendered_config: Dict[str, Any] = field(default_factory=dict)
sources: List[List[str]] = field(default_factory=list)
depends_on: DependsOn = field(default_factory=DependsOn)
refs: List[List[str]] = field(default_factory=list)
entities: List[List[str]] = field(default_factory=list)
created_at: float = field(default_factory=lambda: time.time())
ManifestNode = Union[
AnalysisNode,
SingularTestNode,
HookNode,
ModelNode,
RPCNode,
SqlNode,
GenericTestNode,
SeedNode,
SnapshotNode,
]
@property
def depends_on_nodes(self):
return self.depends_on.nodes
ResultNode = Union[
ManifestNode,
SourceDefinition,
]
@property
def search_name(self):
return self.name
GraphMemberNode = Union[
ResultNode,
Exposure,
Metric,
def same_model(self, old: "ParsedEntity") -> bool:
return self.model == old.model
def same_description(self, old: "ParsedEntity") -> bool:
return self.description == old.description
def same_dimensions(self, old: "ParsedEntity") -> bool:
return self.dimensions == old.dimensions
def same_config(self, old: "ParsedEntity") -> bool:
return self.config.same_contents(
self.unrendered_config,
old.unrendered_config,
)
def same_contents(self, old: Optional["ParsedEntity"]) -> bool:
# existing when it didn't before is a change!
# metadata/tags changes are not "changes"
if old is None:
return True
return (
self.same_model(old)
and self.same_description(old)
and self.same_dimensions(old)
and self.same_config(old)
and True
)
ManifestNodes = Union[
ParsedAnalysisNode,
ParsedSingularTestNode,
ParsedHookNode,
ParsedModelNode,
ParsedRPCNode,
ParsedSqlNode,
ParsedGenericTestNode,
ParsedSeedNode,
ParsedSnapshotNode,
]
Resource = Union[
Documentation,
Macro,
ParsedResource = Union[
ParsedDocumentation,
ParsedMacro,
ParsedNode,
Exposure,
Metric,
SourceDefinition,
]
TestNode = Union[
SingularTestNode,
GenericTestNode,
ParsedExposure,
ParsedMetric,
ParsedEntity,
ParsedSourceDefinition,
]

View File

@@ -523,3 +523,47 @@ class UnparsedMetric(dbtClassMixin, Replaceable):
if data.get("model") is not None and data.get("calculation_method") == "derived":
raise ValidationError("Derived metrics cannot have a 'model' property")
@dataclass
class EntityDimension(dbtClassMixin, Mergeable):
"""This class is used for the dimension information at the entity level. It
closely matches the implementation of columns for models."""
name: str
description: str = ""
column_name: Optional[str] = None
date_type: Optional[str] = None
default_timestamp: Optional[bool] = None
primary_key: Optional[bool] = None
time_grains: Optional[List[str]] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
meta: Dict[str, Any] = field(default_factory=dict)
@dataclass
class EntityInheritence(EntityDimension):
"""This class is used for entity dimension inheritence. This class is optional
but if it is present then include needs to be present. Exclude cannot be present
without some idea of what is being included, whereas exclude is fully optional.
The acceptable inputs for include are either a list of columns/dimensions or *
to represent all fields. The acceptable inputs for exclude are a list of columns/
dimensions
"""
include: Union[List[str],str] = field(default_factory=list)
exclude: Optional[List[str]] = field(default_factory=list)
@dataclass
class UnparsedEntity(dbtClassMixin, Replaceable):
"""This class is used for entity information"""
name: str
model: str
description: str = ""
dimensions: Optional[Union[Optional[Sequence[EntityDimension]],Optional[EntityInheritence]]] = None
# dimensions: Optional[Sequence[EntityDimension]] = None
meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
config: Dict[str, Any] = field(default_factory=dict)
@classmethod
def validate(cls, data):
super(UnparsedEntity, cls).validate(data)
errors = []
## TODO: Add validation here around include/exclude and others

View File

@@ -55,12 +55,6 @@ class LocalPackage(Package):
RawVersion = Union[str, float]
@dataclass
class TarballPackage(Package):
tarball: str
name: str
@dataclass
class GitPackage(Package):
git: str
@@ -88,7 +82,7 @@ class RegistryPackage(Package):
return [str(self.version)]
PackageSpec = Union[LocalPackage, TarballPackage, GitPackage, RegistryPackage]
PackageSpec = Union[LocalPackage, GitPackage, RegistryPackage]
@dataclass
@@ -214,6 +208,7 @@ class Project(HyphenatedDbtClassMixin, Replaceable):
sources: Dict[str, Any] = field(default_factory=dict)
tests: Dict[str, Any] = field(default_factory=dict)
metrics: Dict[str, Any] = field(default_factory=dict)
entities: Dict[str, Any] = field(default_factory=dict)
exposures: Dict[str, Any] = field(default_factory=dict)
vars: Optional[Dict[str, Any]] = field(
default=None,
@@ -222,7 +217,7 @@ class Project(HyphenatedDbtClassMixin, Replaceable):
),
)
packages: List[PackageSpec] = field(default_factory=list)
query_comment: Optional[Union[QueryComment, NoValue, str]] = field(default_factory=NoValue)
query_comment: Optional[Union[QueryComment, NoValue, str]] = NoValue()
@classmethod
def validate(cls, data):
@@ -257,6 +252,7 @@ class UserConfig(ExtensibleDbtClassMixin, Replaceable, UserConfigContract):
static_parser: Optional[bool] = None
indirect_selection: Optional[str] = None
cache_selected_only: Optional[bool] = None
event_buffer_size: Optional[int] = None
@dataclass

View File

@@ -1,5 +1,6 @@
from dbt.contracts.graph.manifest import CompileResultNode
from dbt.contracts.graph.unparsed import FreshnessThreshold
from dbt.contracts.graph.nodes import SourceDefinition, ResultNode
from dbt.contracts.graph.parsed import ParsedSourceDefinition
from dbt.contracts.util import (
BaseArtifactMetadata,
ArtifactMixin,
@@ -10,9 +11,11 @@ from dbt.contracts.util import (
from dbt.exceptions import InternalException
from dbt.events.functions import fire_event
from dbt.events.types import TimingInfoCollected
from dbt.events.proto_types import RunResultMsg, TimingInfoMsg
from dbt.events.contextvars import get_node_info
from dbt.logger import TimingProcessor
from dbt.events.proto_types import RunResultMsg
from dbt.logger import (
TimingProcessor,
JsonOnly,
)
from dbt.utils import lowercase, cast_to_str, cast_to_int
from dbt.dataclass_schema import dbtClassMixin, StrEnum
@@ -45,14 +48,7 @@ class TimingInfo(dbtClassMixin):
def end(self):
self.completed_at = datetime.utcnow()
def to_msg(self):
timsg = TimingInfoMsg(
name=self.name, started_at=self.started_at, completed_at=self.completed_at
)
return timsg
# This is a context manager
class collect_timing_info:
def __init__(self, name: str):
self.timing_info = TimingInfo(name=name)
@@ -63,13 +59,8 @@ class collect_timing_info:
def __exit__(self, exc_type, exc_value, traceback):
self.timing_info.end()
# Note: when legacy logger is removed, we can remove the following line
with TimingProcessor(self.timing_info):
fire_event(
TimingInfoCollected(
timing_info=self.timing_info.to_msg(), node_info=get_node_info()
)
)
with JsonOnly(), TimingProcessor(self.timing_info):
fire_event(TimingInfoCollected())
class RunningStatus(StrEnum):
@@ -137,14 +128,13 @@ class BaseResult(dbtClassMixin):
msg.thread = self.thread_id
msg.execution_time = self.execution_time
msg.num_failures = cast_to_int(self.failures)
msg.timing_info = [ti.to_msg() for ti in self.timing]
# adapter_response
# timing_info, adapter_response, message
return msg
@dataclass
class NodeResult(BaseResult):
node: ResultNode
node: CompileResultNode
@dataclass
@@ -283,7 +273,7 @@ class RunOperationResultsArtifact(RunOperationResult, ArtifactMixin):
@dataclass
class SourceFreshnessResult(NodeResult):
node: SourceDefinition
node: ParsedSourceDefinition
status: FreshnessStatus
max_loaded_at: datetime
snapshotted_at: datetime

View File

@@ -5,7 +5,7 @@ from typing import Optional, List, Any, Dict, Sequence
from dbt.dataclass_schema import dbtClassMixin
from dbt.contracts.graph.nodes import ResultNode
from dbt.contracts.graph.compiled import CompileResultNode
from dbt.contracts.results import (
RunResult,
RunResultsArtifact,
@@ -32,7 +32,7 @@ class RemoteResult(VersionedSchema):
class RemoteCompileResultMixin(RemoteResult):
raw_code: str
compiled_code: str
node: ResultNode
node: CompileResultNode
timing: List[TimingInfo]

View File

@@ -16,8 +16,6 @@ Defines the base classes of `PinnedPackage` and `UnpinnedPackage`.
`downloads_directory` sets the directory packages will be downloaded to.
`_install` has retry logic if the download or untarring process hit exceptions (see `dbt.utils._connection_exception_retry`).
## `git.py`
Extends `PinnedPackage` and `UnpinnedPackage` specific to dbt packages defined with git urls.
@@ -30,10 +28,8 @@ Extends `PinnedPackage` and `UnpinnedPackage` specific to dbt packages defined l
Extends `PinnedPackage` and `UnpinnedPackage` specific to dbt packages defined on the dbt Hub registry.
`install` has retry logic if the download or untarring process hit exceptions (see `dbt.utils._connection_exception_retry`).
## `resolver.py`
Resolves the package definition into package objects to download.
## `tarball.py`
Extends `PinnedPackage` and `UnpinnedPackage` specific to dbt packages defined by a URL to a tarball hosted on an HTTP server.

View File

@@ -1,16 +1,13 @@
import abc
import os
import functools
import tempfile
from contextlib import contextmanager
from pathlib import Path
from typing import List, Optional, Generic, TypeVar
from dbt.clients import system
from dbt.contracts.project import ProjectPackageMetadata
from dbt.events.functions import fire_event
from dbt.events.types import DepsSetDownloadDirectory
from dbt.utils import _connection_exception_retry as connection_exception_retry
DOWNLOADS_PATH = None
@@ -100,34 +97,6 @@ class PinnedPackage(BasePackage):
def get_subdirectory(self):
return None
def _install(self, project, renderer):
metadata = self.fetch_metadata(project, renderer)
tar_name = f"{self.package}.{self.version}.tar.gz"
tar_path = (Path(get_downloads_path()) / tar_name).resolve(strict=False)
system.make_directory(str(tar_path.parent))
download_url = metadata.downloads.tarball
deps_path = project.packages_install_path
package_name = self.get_project_name(project, renderer)
download_untar_fn = functools.partial(
self.download_and_untar, download_url, str(tar_path), deps_path, package_name
)
connection_exception_retry(download_untar_fn, 5)
def download_and_untar(self, download_url, tar_path, deps_path, package_name):
"""
Sometimes the download of the files fails and we want to retry. Sometimes the
download appears successful but the file did not make it through as expected
(generally due to a github incident). Either way we want to retry downloading
and untarring to see if we can get a success. Call this within
`_connection_exception_retry`
"""
system.download(download_url, tar_path)
system.untar_package(tar_path, deps_path, package_name)
SomePinned = TypeVar("SomePinned", bound=PinnedPackage)
SomeUnpinned = TypeVar("SomeUnpinned", bound="UnpinnedPackage")

View File

@@ -1,20 +1,23 @@
import os
import functools
from typing import List
from dbt import semver
from dbt import flags
from dbt.version import get_installed_version
from dbt.clients import registry
from dbt.clients import registry, system
from dbt.contracts.project import (
RegistryPackageMetadata,
RegistryPackage,
)
from dbt.deps.base import PinnedPackage, UnpinnedPackage
from dbt.deps.base import PinnedPackage, UnpinnedPackage, get_downloads_path
from dbt.exceptions import (
package_version_not_found,
VersionsNotCompatibleException,
DependencyException,
package_not_found,
)
from dbt.utils import _connection_exception_retry as connection_exception_retry
class RegistryPackageMixin:
@@ -57,7 +60,32 @@ class RegistryPinnedPackage(RegistryPackageMixin, PinnedPackage):
return RegistryPackageMetadata.from_dict(dct)
def install(self, project, renderer):
self._install(project, renderer)
metadata = self.fetch_metadata(project, renderer)
tar_name = "{}.{}.tar.gz".format(self.package, self.version)
tar_path = os.path.realpath(os.path.join(get_downloads_path(), tar_name))
system.make_directory(os.path.dirname(tar_path))
download_url = metadata.downloads.tarball
deps_path = project.packages_install_path
package_name = self.get_project_name(project, renderer)
download_untar_fn = functools.partial(
self.download_and_untar, download_url, tar_path, deps_path, package_name
)
connection_exception_retry(download_untar_fn, 5)
def download_and_untar(self, download_url, tar_path, deps_path, package_name):
"""
Sometimes the download of the files fails and we want to retry. Sometimes the
download appears successful but the file did not make it through as expected
(generally due to a github incident). Either way we want to retry downloading
and untarring to see if we can get a success. Call this within
`_connection_exception_retry`
"""
system.download(download_url, tar_path)
system.untar_package(tar_path, deps_path, package_name)
class RegistryUnpinnedPackage(RegistryPackageMixin, UnpinnedPackage[RegistryPinnedPackage]):

View File

@@ -7,18 +7,16 @@ from dbt.config import Project, RuntimeConfig
from dbt.config.renderer import DbtProjectYamlRenderer
from dbt.deps.base import BasePackage, PinnedPackage, UnpinnedPackage
from dbt.deps.local import LocalUnpinnedPackage
from dbt.deps.tarball import TarballUnpinnedPackage
from dbt.deps.git import GitUnpinnedPackage
from dbt.deps.registry import RegistryUnpinnedPackage
from dbt.contracts.project import (
LocalPackage,
TarballPackage,
GitPackage,
RegistryPackage,
)
PackageContract = Union[LocalPackage, TarballPackage, GitPackage, RegistryPackage]
PackageContract = Union[LocalPackage, GitPackage, RegistryPackage]
@dataclass
@@ -71,8 +69,6 @@ class PackageListing:
for contract in src:
if isinstance(contract, LocalPackage):
pkg = LocalUnpinnedPackage.from_contract(contract)
elif isinstance(contract, TarballPackage):
pkg = TarballUnpinnedPackage.from_contract(contract)
elif isinstance(contract, GitPackage):
pkg = GitUnpinnedPackage.from_contract(contract)
elif isinstance(contract, RegistryPackage):

View File

@@ -1,74 +0,0 @@
from dbt.contracts.project import RegistryPackageMetadata, TarballPackage
from dbt.deps.base import PinnedPackage, UnpinnedPackage
class TarballPackageMixin:
def __init__(self, tarball: str) -> None:
super().__init__()
self.tarball = tarball
@property
def name(self):
return self.tarball
def source_type(self) -> str:
return "tarball"
class TarballPinnedPackage(TarballPackageMixin, PinnedPackage):
def __init__(self, tarball: str, package: str) -> None:
super().__init__(tarball)
# setup to recycle RegistryPinnedPackage fns
self.package = package
self.version = "tarball"
@property
def name(self):
return self.package
def get_version(self):
return self.version
def nice_version_name(self):
return f"tarball (url: {self.tarball})"
def _fetch_metadata(self, project, renderer):
"""
recycle RegistryPackageMetadata so that we can use the install and
download_and_untar from RegistryPinnedPackage next.
build RegistryPackageMetadata from info passed via packages.yml since no
'metadata' service exists in this case.
"""
dct = {
"name": self.package,
"packages": [], # note: required by RegistryPackageMetadata
"downloads": {"tarball": self.tarball},
}
return RegistryPackageMetadata.from_dict(dct)
def install(self, project, renderer):
self._install(project, renderer)
class TarballUnpinnedPackage(TarballPackageMixin, UnpinnedPackage[TarballPinnedPackage]):
def __init__(
self,
tarball: str,
package: str,
) -> None:
super().__init__(tarball)
# setup to recycle RegistryPinnedPackage fns
self.package = package
self.version = "tarball"
@classmethod
def from_contract(cls, contract: TarballPackage) -> "TarballUnpinnedPackage":
return cls(tarball=contract.tarball, package=contract.name)
def incorporate(self, other: "TarballUnpinnedPackage") -> "TarballUnpinnedPackage":
return TarballUnpinnedPackage(tarball=self.tarball, package=self.package)
def resolved(self) -> TarballPinnedPackage:
return TarballPinnedPackage(tarball=self.tarball, package=self.package)

View File

@@ -1,7 +1,6 @@
import traceback
from dataclasses import dataclass
from dbt.events.functions import fire_event
from dbt.events.contextvars import get_node_info
from dbt.events.types import (
AdapterEventDebug,
AdapterEventInfo,
@@ -16,39 +15,27 @@ class AdapterLogger:
name: str
def debug(self, msg, *args):
event = AdapterEventDebug(
name=self.name, base_msg=msg, args=args, node_info=get_node_info()
)
event = AdapterEventDebug(name=self.name, base_msg=msg, args=args)
fire_event(event)
def info(self, msg, *args):
event = AdapterEventInfo(
name=self.name, base_msg=msg, args=args, node_info=get_node_info()
)
event = AdapterEventInfo(name=self.name, base_msg=msg, args=args)
fire_event(event)
def warning(self, msg, *args):
event = AdapterEventWarning(
name=self.name, base_msg=msg, args=args, node_info=get_node_info()
)
event = AdapterEventWarning(name=self.name, base_msg=msg, args=args)
fire_event(event)
def error(self, msg, *args):
event = AdapterEventError(
name=self.name, base_msg=msg, args=args, node_info=get_node_info()
)
event = AdapterEventError(name=self.name, base_msg=msg, args=args)
fire_event(event)
# The default exc_info=True is what makes this method different
def exception(self, msg, *args):
event = AdapterEventError(
name=self.name, base_msg=msg, args=args, node_info=get_node_info()
)
event = AdapterEventError(name=self.name, base_msg=msg, args=args)
event.exc_info = traceback.format_exc()
fire_event(event)
def critical(self, msg, *args):
event = AdapterEventError(
name=self.name, base_msg=msg, args=args, node_info=get_node_info()
)
event = AdapterEventError(name=self.name, base_msg=msg, args=args)
fire_event(event)

View File

@@ -1,5 +1,4 @@
from dataclasses import dataclass
from enum import Enum
import os
import threading
from datetime import datetime
@@ -44,16 +43,6 @@ def get_thread_name() -> str:
return threading.current_thread().name
# EventLevel is an Enum, but mixing in the 'str' type is suggested in the Python
# documentation, and provides support for json conversion, which fails otherwise.
class EventLevel(str, Enum):
DEBUG = "debug"
TEST = "test"
INFO = "info"
WARN = "warn"
ERROR = "error"
@dataclass
class BaseEvent:
"""BaseEvent for proto message generated python events"""
@@ -73,15 +62,15 @@ class BaseEvent:
self.info.code = self.code()
self.info.name = type(self).__name__
def level_tag(self) -> str:
return "debug"
# This is here because although we know that info should always
# exist, mypy doesn't.
def log_level(self) -> EventLevel:
def log_level(self) -> str:
return self.info.level # type: ignore
def level_tag(self) -> EventLevel:
return EventLevel.DEBUG
def message(self) -> str:
def message(self):
raise Exception("message() not implemented for event")
@@ -96,32 +85,32 @@ class DynamicLevel(BaseEvent):
class TestLevel(BaseEvent):
__test__ = False
def level_tag(self) -> EventLevel:
return EventLevel.TEST
def level_tag(self) -> str:
return "test"
@dataclass # type: ignore[misc]
class DebugLevel(BaseEvent):
def level_tag(self) -> EventLevel:
return EventLevel.DEBUG
def level_tag(self) -> str:
return "debug"
@dataclass # type: ignore[misc]
class InfoLevel(BaseEvent):
def level_tag(self) -> EventLevel:
return EventLevel.INFO
def level_tag(self) -> str:
return "info"
@dataclass # type: ignore[misc]
class WarnLevel(BaseEvent):
def level_tag(self) -> EventLevel:
return EventLevel.WARN
def level_tag(self) -> str:
return "warn"
@dataclass # type: ignore[misc]
class ErrorLevel(BaseEvent):
def level_tag(self) -> EventLevel:
return EventLevel.ERROR
def level_tag(self) -> str:
return "error"
# Included to ensure classes with str-type message members are initialized correctly.

View File

@@ -1,84 +0,0 @@
import contextlib
import contextvars
from typing import Any, Generator, Mapping, Dict
from dbt.events.proto_types import NodeInfo
LOG_PREFIX = "log_"
LOG_PREFIX_LEN = len(LOG_PREFIX)
_log_context_vars: Dict[str, contextvars.ContextVar] = {}
def get_contextvars() -> Dict[str, Any]:
rv = {}
ctx = contextvars.copy_context()
for k in ctx:
if k.name.startswith(LOG_PREFIX) and ctx[k] is not Ellipsis:
rv[k.name[LOG_PREFIX_LEN:]] = ctx[k]
return rv
def get_node_info():
cvars = get_contextvars()
if "node_info" in cvars:
return cvars["node_info"]
else:
return NodeInfo()
def clear_contextvars() -> None:
ctx = contextvars.copy_context()
for k in ctx:
if k.name.startswith(LOG_PREFIX):
k.set(Ellipsis)
# put keys and values into context. Returns the contextvar.Token mapping
# Save and pass to reset_contextvars
def set_contextvars(**kwargs: Any) -> Mapping[str, contextvars.Token]:
cvar_tokens = {}
for k, v in kwargs.items():
log_key = f"{LOG_PREFIX}{k}"
try:
var = _log_context_vars[log_key]
except KeyError:
var = contextvars.ContextVar(log_key, default=Ellipsis)
_log_context_vars[log_key] = var
cvar_tokens[k] = var.set(v)
return cvar_tokens
# reset by Tokens
def reset_contextvars(**kwargs: contextvars.Token) -> None:
for k, v in kwargs.items():
log_key = f"{LOG_PREFIX}{k}"
var = _log_context_vars[log_key]
var.reset(v)
# remove from contextvars
def unset_contextvars(*keys: str) -> None:
for k in keys:
if k in _log_context_vars:
log_key = f"{LOG_PREFIX}{k}"
_log_context_vars[log_key].set(Ellipsis)
# Context manager or decorator to set and unset the context vars
@contextlib.contextmanager
def log_contextvars(**kwargs: Any) -> Generator[None, None, None]:
context = get_contextvars()
saved = {k: context[k] for k in context.keys() & kwargs.keys()}
set_contextvars(**kwargs)
try:
yield
finally:
unset_contextvars(*kwargs.keys())
set_contextvars(**saved)

View File

@@ -1,186 +0,0 @@
from colorama import Style
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
import json
import logging
from logging.handlers import RotatingFileHandler
import threading
from typing import Any, Callable, List, Optional, TextIO
from uuid import uuid4
from dbt.events.base_types import BaseEvent, EventLevel
# A Filter is a function which takes a BaseEvent and returns True if the event
# should be logged, False otherwise.
Filter = Callable[[BaseEvent], bool]
# Default filter which logs every event
def NoFilter(_: BaseEvent) -> bool:
return True
# A Scrubber removes secrets from an input string, returning a sanitized string.
Scrubber = Callable[[str], str]
# Provide a pass-through scrubber implementation, also used as a default
def NoScrubber(s: str) -> str:
return s
class LineFormat(Enum):
PlainText = 1
DebugText = 2
Json = 3
# Map from dbt event levels to python log levels
_log_level_map = {
EventLevel.DEBUG: 10,
EventLevel.TEST: 10,
EventLevel.INFO: 20,
EventLevel.WARN: 30,
EventLevel.ERROR: 40,
}
@dataclass
class LoggerConfig:
name: str
filter: Filter = NoFilter
scrubber: Scrubber = NoScrubber
line_format: LineFormat = LineFormat.PlainText
level: EventLevel = EventLevel.WARN
use_colors: bool = False
output_stream: Optional[TextIO] = None
output_file_name: Optional[str] = None
logger: Optional[Any] = None
class _Logger:
def __init__(self, event_manager: "EventManager", config: LoggerConfig) -> None:
self.name: str = config.name
self.filter: Filter = config.filter
self.scrubber: Scrubber = config.scrubber
self.level: EventLevel = config.level
self.event_manager: EventManager = event_manager
self._python_logger: Optional[logging.Logger] = config.logger
self._stream: Optional[TextIO] = config.output_stream
if config.output_file_name:
log = logging.getLogger(config.name)
log.setLevel(_log_level_map[config.level])
handler = RotatingFileHandler(
filename=str(config.output_file_name),
encoding="utf8",
maxBytes=10 * 1024 * 1024, # 10 mb
backupCount=5,
)
handler.setFormatter(logging.Formatter(fmt="%(message)s"))
log.handlers.clear()
log.addHandler(handler)
self._python_logger = log
def create_line(self, e: BaseEvent) -> str:
raise NotImplementedError()
def write_line(self, e: BaseEvent):
line = self.create_line(e)
python_level = _log_level_map[e.log_level()]
if self._python_logger is not None:
self._python_logger.log(python_level, line)
elif self._stream is not None and _log_level_map[self.level] <= python_level:
self._stream.write(line + "\n")
def flush(self):
if self._python_logger is not None:
for handler in self._python_logger.handlers:
handler.flush()
elif self._stream is not None:
self._stream.flush()
class _TextLogger(_Logger):
def __init__(self, event_manager: "EventManager", config: LoggerConfig) -> None:
super().__init__(event_manager, config)
self.use_colors = config.use_colors
self.use_debug_format = config.line_format == LineFormat.DebugText
def create_line(self, e: BaseEvent) -> str:
return self.create_debug_line(e) if self.use_debug_format else self.create_info_line(e)
def create_info_line(self, e: BaseEvent) -> str:
ts: str = datetime.utcnow().strftime("%H:%M:%S")
scrubbed_msg: str = self.scrubber(e.message()) # type: ignore
return f"{self._get_color_tag()}{ts} {scrubbed_msg}"
def create_debug_line(self, e: BaseEvent) -> str:
log_line: str = ""
# Create a separator if this is the beginning of an invocation
# TODO: This is an ugly hack, get rid of it if we can
if type(e).__name__ == "MainReportVersion":
separator = 30 * "="
log_line = f"\n\n{separator} {datetime.utcnow()} | {self.event_manager.invocation_id} {separator}\n"
ts: str = datetime.utcnow().strftime("%H:%M:%S.%f")
scrubbed_msg: str = self.scrubber(e.message()) # type: ignore
# log_level() for DynamicLevel events returns str instead of EventLevel
level = e.log_level().value if isinstance(e.log_level(), EventLevel) else e.log_level()
log_line += (
f"{self._get_color_tag()}{ts} [{level:<5}]{self._get_thread_name()} {scrubbed_msg}"
)
return log_line
def _get_color_tag(self) -> str:
return "" if not self.use_colors else Style.RESET_ALL
def _get_thread_name(self) -> str:
thread_name = ""
if threading.current_thread().name:
thread_name = threading.current_thread().name
thread_name = thread_name[:10]
thread_name = thread_name.ljust(10, " ")
thread_name = f" [{thread_name}]:"
return thread_name
class _JsonLogger(_Logger):
def create_line(self, e: BaseEvent) -> str:
from dbt.events.functions import event_to_dict
event_dict = event_to_dict(e)
raw_log_line = json.dumps(event_dict, sort_keys=True)
line = self.scrubber(raw_log_line) # type: ignore
return line
class EventManager:
def __init__(self) -> None:
self.loggers: List[_Logger] = []
self.callbacks: List[Callable[[BaseEvent], None]] = []
self.invocation_id: str = str(uuid4())
def fire_event(self, e: BaseEvent) -> None:
for logger in self.loggers:
if logger.filter(e): # type: ignore
logger.write_line(e)
for callback in self.callbacks:
callback(e)
def add_logger(self, config: LoggerConfig):
logger = (
_JsonLogger(self, config)
if config.line_format == LineFormat.Json
else _TextLogger(self, config)
)
logger.event_manager = self
self.loggers.append(logger)
def flush(self):
for logger in self.loggers:
logger.flush()

View File

@@ -1,145 +1,123 @@
import betterproto
from dbt.constants import METADATA_ENV_PREFIX
from dbt.events.base_types import BaseEvent, Cache, EventLevel, NoFile, NoStdOut
from dbt.events.eventmgr import EventManager, LoggerConfig, LineFormat, NoFilter
from dbt.events.helpers import env_secrets, scrub_secrets
from dbt.events.proto_types import EventInfo
from dbt.events.types import EmptyLine
import dbt.flags as flags
from dbt.logger import GLOBAL_LOGGER, make_log_dir_if_missing
from functools import partial
import json
import os
import sys
from typing import Callable, Dict, Optional, TextIO
import uuid
from colorama import Style
from dbt.events.base_types import NoStdOut, BaseEvent, NoFile, Cache
from dbt.events.types import EventBufferFull, MainReportVersion, EmptyLine
from dbt.events.proto_types import EventInfo
from dbt.events.helpers import env_secrets, scrub_secrets
import dbt.flags as flags
from dbt.constants import METADATA_ENV_PREFIX
from dbt.logger import make_log_dir_if_missing, GLOBAL_LOGGER
from datetime import datetime
import json
import io
from io import StringIO, TextIOWrapper
import logbook
import logging
from logging import Logger
import sys
from logging.handlers import RotatingFileHandler
import os
import uuid
import threading
from typing import Optional, Union, Callable, Dict
from collections import deque
LOG_VERSION = 3
EVENT_HISTORY = None
# create the global file logger with no configuration
FILE_LOG = logging.getLogger("default_file")
null_handler = logging.NullHandler()
FILE_LOG.addHandler(null_handler)
# set up logger to go to stdout with defaults
# setup_event_logger will be called once args have been parsed
STDOUT_LOG = logging.getLogger("default_stdout")
STDOUT_LOG.setLevel(logging.INFO)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.INFO)
STDOUT_LOG.addHandler(stdout_handler)
format_color = True
format_json = False
invocation_id: Optional[str] = None
metadata_vars: Optional[Dict[str, str]] = None
# The default event manager will not log anything, but some tests run code that
# generates events, without configuring the event manager.
EVENT_MANAGER: EventManager = EventManager()
# make sure that logs / exceptions go *somewhere* if they occur before the
# EventManager has been actually configured. this should only be needed if something
# has been configured / initialized incorrectly
def setup_basic_logger():
EVENT_MANAGER.add_logger(_get_stdout_config(level=EventLevel.INFO))
def setup_event_logger(log_path: str, level_override: Optional[EventLevel] = None):
cleanup_event_logger()
def setup_event_logger(log_path, level_override=None):
global format_json, format_color, STDOUT_LOG, FILE_LOG
make_log_dir_if_missing(log_path)
if flags.ENABLE_LEGACY_LOGGER:
EVENT_MANAGER.add_logger(_get_logbook_log_config(level_override))
else:
EVENT_MANAGER.add_logger(_get_stdout_config(level_override))
if _CAPTURE_STREAM:
# Create second stdout logger to support test which want to know what's
# being sent to stdout.
capture_config = _get_stdout_config(level_override)
capture_config.output_stream = _CAPTURE_STREAM
EVENT_MANAGER.add_logger(capture_config)
format_json = flags.LOG_FORMAT == "json"
# USE_COLORS can be None if the app just started and the cli flags
# havent been applied yet
format_color = True if flags.USE_COLORS else False
# TODO this default should live somewhere better
log_dest = os.path.join(log_path, "dbt.log")
level = level_override or (logging.DEBUG if flags.DEBUG else logging.INFO)
# create and add the file logger to the event manager
EVENT_MANAGER.add_logger(_get_logfile_config(os.path.join(log_path, "dbt.log")))
# overwrite the STDOUT_LOG logger with the configured one
STDOUT_LOG = logging.getLogger("configured_std_out")
STDOUT_LOG.setLevel(level)
FORMAT = "%(message)s"
stdout_passthrough_formatter = logging.Formatter(fmt=FORMAT)
def _get_stdout_config(level: Optional[EventLevel]) -> LoggerConfig:
fmt = LineFormat.PlainText
if flags.LOG_FORMAT == "json":
fmt = LineFormat.Json
elif flags.DEBUG:
fmt = LineFormat.DebugText
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(stdout_passthrough_formatter)
stdout_handler.setLevel(level)
# clear existing stdout TextIOWrapper stream handlers
STDOUT_LOG.handlers = [
h
for h in STDOUT_LOG.handlers
if not (hasattr(h, "stream") and isinstance(h.stream, TextIOWrapper)) # type: ignore
]
STDOUT_LOG.addHandler(stdout_handler)
return LoggerConfig(
name="stdout_log",
level=level or (EventLevel.DEBUG if flags.DEBUG else EventLevel.INFO),
use_colors=bool(flags.USE_COLORS),
line_format=fmt,
scrubber=env_scrubber,
filter=partial(
_stdout_filter, bool(flags.LOG_CACHE_EVENTS), bool(flags.DEBUG), bool(flags.QUIET)
),
output_stream=sys.stdout,
# overwrite the FILE_LOG logger with the configured one
FILE_LOG = logging.getLogger("configured_file")
FILE_LOG.setLevel(logging.DEBUG) # always debug regardless of user input
file_passthrough_formatter = logging.Formatter(fmt=FORMAT)
file_handler = RotatingFileHandler(
filename=log_dest, encoding="utf8", maxBytes=10 * 1024 * 1024, backupCount=5 # 10 mb
)
def _stdout_filter(
log_cache_events: bool, debug_mode: bool, quiet_mode: bool, evt: BaseEvent
) -> bool:
return (
not isinstance(evt, NoStdOut)
and (not isinstance(evt, Cache) or log_cache_events)
and (evt.log_level() != EventLevel.DEBUG or debug_mode)
and (evt.log_level() == EventLevel.ERROR or not quiet_mode)
and not (flags.LOG_FORMAT == "json" and type(evt) == EmptyLine)
)
def _get_logfile_config(log_path: str) -> LoggerConfig:
return LoggerConfig(
name="file_log",
line_format=LineFormat.Json if flags.LOG_FORMAT == "json" else LineFormat.DebugText,
use_colors=bool(flags.USE_COLORS),
level=EventLevel.DEBUG, # File log is *always* debug level
scrubber=env_scrubber,
filter=partial(_logfile_filter, bool(flags.LOG_CACHE_EVENTS)),
output_file_name=log_path,
)
def _logfile_filter(log_cache_events: bool, evt: BaseEvent) -> bool:
return (
not isinstance(evt, NoFile)
and not (isinstance(evt, Cache) and not log_cache_events)
and not (flags.LOG_FORMAT == "json" and type(evt) == EmptyLine)
)
def _get_logbook_log_config(level: Optional[EventLevel]) -> LoggerConfig:
config = _get_stdout_config(level)
config.name = "logbook_log"
config.filter = NoFilter if flags.LOG_CACHE_EVENTS else lambda e: not isinstance(e, Cache)
config.logger = GLOBAL_LOGGER
return config
def env_scrubber(msg: str) -> str:
return scrub_secrets(msg, env_secrets())
def cleanup_event_logger():
# Reset to a no-op manager to release streams associated with logs. This is
# especially important for tests, since pytest replaces the stdout stream
# during test runs, and closes the stream after the test is over.
EVENT_MANAGER.loggers.clear()
EVENT_MANAGER.callbacks.clear()
# This global, and the following two functions for capturing stdout logs are
# an unpleasant hack we intend to remove as part of API-ification. The GitHub
# issue #6350 was opened for that work.
_CAPTURE_STREAM: Optional[TextIO] = None
file_handler.setFormatter(file_passthrough_formatter)
file_handler.setLevel(logging.DEBUG) # always debug regardless of user input
FILE_LOG.handlers.clear()
FILE_LOG.addHandler(file_handler)
# used for integration tests
def capture_stdout_logs(stream: TextIO):
global _CAPTURE_STREAM
_CAPTURE_STREAM = stream
def capture_stdout_logs() -> StringIO:
global STDOUT_LOG
capture_buf = io.StringIO()
stdout_capture_handler = logging.StreamHandler(capture_buf)
stdout_handler.setLevel(logging.DEBUG)
STDOUT_LOG.addHandler(stdout_capture_handler)
return capture_buf
def stop_capture_stdout_logs():
global _CAPTURE_STREAM
_CAPTURE_STREAM = None
# used for integration tests
def stop_capture_stdout_logs() -> None:
global STDOUT_LOG
STDOUT_LOG.handlers = [
h
for h in STDOUT_LOG.handlers
if not (hasattr(h, "stream") and isinstance(h.stream, StringIO)) # type: ignore
]
# returns a dictionary representation of the event fields.
# the message may contain secrets which must be scrubbed at the usage site.
def event_to_json(event: BaseEvent) -> str:
def event_to_json(
event: BaseEvent,
) -> str:
event_dict = event_to_dict(event)
raw_log_line = json.dumps(event_dict, sort_keys=True)
return raw_log_line
@@ -148,16 +126,92 @@ def event_to_json(event: BaseEvent) -> str:
def event_to_dict(event: BaseEvent) -> dict:
event_dict = dict()
try:
# We could use to_json here, but it wouldn't sort the keys.
# The 'to_json' method just does json.dumps on the dict anyway.
event_dict = event.to_dict(casing=betterproto.Casing.SNAKE, include_default_values=True) # type: ignore
except AttributeError as exc:
event_type = type(event).__name__
raise Exception(f"type {event_type} is not serializable. {str(exc)}")
# We don't want an empty NodeInfo in output
if "node_info" in event_dict and event_dict["node_info"]["node_name"] == "":
del event_dict["node_info"]
return event_dict
# translates an Event to a completely formatted text-based log line
# type hinting everything as strings so we don't get any unintentional string conversions via str()
def reset_color() -> str:
global format_color
return "" if not format_color else Style.RESET_ALL
def create_info_text_log_line(e: BaseEvent) -> str:
color_tag: str = reset_color()
ts: str = get_ts().strftime("%H:%M:%S") # TODO: get this from the event.ts?
scrubbed_msg: str = scrub_secrets(e.message(), env_secrets())
log_line: str = f"{color_tag}{ts} {scrubbed_msg}"
return log_line
def create_debug_text_log_line(e: BaseEvent) -> str:
log_line: str = ""
# Create a separator if this is the beginning of an invocation
if type(e) == MainReportVersion:
separator = 30 * "="
log_line = f"\n\n{separator} {get_ts()} | {get_invocation_id()} {separator}\n"
color_tag: str = reset_color()
ts: str = get_ts().strftime("%H:%M:%S.%f")
scrubbed_msg: str = scrub_secrets(e.message(), env_secrets())
# Make the levels all 5 characters so they line up
level: str = f"{e.log_level():<5}"
thread = ""
if threading.current_thread().name:
thread_name = threading.current_thread().name
thread_name = thread_name[:10]
thread_name = thread_name.ljust(10, " ")
thread = f" [{thread_name}]:"
log_line = log_line + f"{color_tag}{ts} [{level}]{thread} {scrubbed_msg}"
return log_line
# translates an Event to a completely formatted json log line
def create_json_log_line(e: BaseEvent) -> Optional[str]:
if type(e) == EmptyLine:
return None # will not be sent to logger
raw_log_line = event_to_json(e)
return scrub_secrets(raw_log_line, env_secrets())
# calls create_stdout_text_log_line() or create_json_log_line() according to logger config
def create_log_line(e: BaseEvent, file_output=False) -> Optional[str]:
global format_json
if format_json:
return create_json_log_line(e) # json output, both console and file
elif file_output is True or flags.DEBUG:
return create_debug_text_log_line(e) # default file output
else:
return create_info_text_log_line(e) # console output
# allows for reuse of this obnoxious if else tree.
# do not use for exceptions, it doesn't pass along exc_info, stack_info, or extra
def send_to_logger(l: Union[Logger, logbook.Logger], level: str, log_line: str):
if not log_line:
return
if level == "test":
# TODO after implmenting #3977 send to new test level
l.debug(log_line)
elif level == "debug":
l.debug(log_line)
elif level == "info":
l.info(log_line)
elif level == "warn":
l.warning(log_line)
elif level == "error":
l.error(log_line)
else:
raise AssertionError(
f"While attempting to log {log_line}, encountered the unhandled level: {level}"
)
def warn_or_error(event, node=None):
if flags.WARN_ERROR:
from dbt.exceptions import raise_compiler_error
@@ -179,7 +233,39 @@ def fire_event_if(conditional: bool, lazy_e: Callable[[], BaseEvent]) -> None:
# (i.e. - mutating the event history, printing to stdout, logging
# to files, etc.)
def fire_event(e: BaseEvent) -> None:
EVENT_MANAGER.fire_event(e)
# skip logs when `--log-cache-events` is not passed
if isinstance(e, Cache) and not flags.LOG_CACHE_EVENTS:
return
add_to_event_history(e)
# backwards compatibility for plugins that require old logger (dbt-rpc)
if flags.ENABLE_LEGACY_LOGGER:
# using Event::message because the legacy logger didn't differentiate messages by
# destination
log_line = create_log_line(e)
if log_line:
send_to_logger(GLOBAL_LOGGER, level=e.log_level(), log_line=log_line)
return # exit the function to avoid using the current logger as well
# always logs debug level regardless of user input
if not isinstance(e, NoFile):
log_line = create_log_line(e, file_output=True)
# doesn't send exceptions to exception logger
if log_line:
send_to_logger(FILE_LOG, level=e.log_level(), log_line=log_line)
if not isinstance(e, NoStdOut):
# explicitly checking the debug flag here so that potentially expensive-to-construct
# log messages are not constructed if debug messages are never shown.
if e.log_level() == "debug" and not flags.DEBUG:
return # eat the message in case it was one of the expensive ones
if e.log_level() != "error" and flags.QUIET:
return # eat all non-exception messages in quiet mode
log_line = create_log_line(e)
if log_line:
send_to_logger(STDOUT_LOG, level=e.log_level(), log_line=log_line)
def get_metadata_vars() -> Dict[str, str]:
@@ -199,13 +285,47 @@ def reset_metadata_vars() -> None:
def get_invocation_id() -> str:
return EVENT_MANAGER.invocation_id
global invocation_id
if invocation_id is None:
invocation_id = str(uuid.uuid4())
return invocation_id
def set_invocation_id() -> None:
# This is primarily for setting the invocation_id for separate
# commands in the dbt servers. It shouldn't be necessary for the CLI.
EVENT_MANAGER.invocation_id = str(uuid.uuid4())
global invocation_id
invocation_id = str(uuid.uuid4())
# exactly one time stamp per concrete event
def get_ts() -> datetime:
ts = datetime.utcnow()
return ts
# preformatted time stamp
def get_ts_rfc3339() -> str:
ts = get_ts()
ts_rfc3339 = ts.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
return ts_rfc3339
def add_to_event_history(event):
if flags.EVENT_BUFFER_SIZE == 0:
return
global EVENT_HISTORY
if EVENT_HISTORY is None:
reset_event_history()
EVENT_HISTORY.append(event)
# We only set the EventBufferFull message for event buffers >= 10,000
if flags.EVENT_BUFFER_SIZE >= 10000 and len(EVENT_HISTORY) == (flags.EVENT_BUFFER_SIZE - 1):
fire_event(EventBufferFull())
def reset_event_history():
global EVENT_HISTORY
EVENT_HISTORY = deque(maxlen=flags.EVENT_BUFFER_SIZE)
# Currently used to set the level in EventInfo, so logging events can

View File

@@ -345,10 +345,9 @@ class AdapterEventDebug(betterproto.Message):
"""E001"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
name: str = betterproto.string_field(3)
base_msg: str = betterproto.string_field(4)
args: List[str] = betterproto.string_field(5)
name: str = betterproto.string_field(2)
base_msg: str = betterproto.string_field(3)
args: List[str] = betterproto.string_field(4)
@dataclass
@@ -356,10 +355,9 @@ class AdapterEventInfo(betterproto.Message):
"""E002"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
name: str = betterproto.string_field(3)
base_msg: str = betterproto.string_field(4)
args: List[str] = betterproto.string_field(5)
name: str = betterproto.string_field(2)
base_msg: str = betterproto.string_field(3)
args: List[str] = betterproto.string_field(4)
@dataclass
@@ -367,10 +365,9 @@ class AdapterEventWarning(betterproto.Message):
"""E003"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
name: str = betterproto.string_field(3)
base_msg: str = betterproto.string_field(4)
args: List[str] = betterproto.string_field(5)
name: str = betterproto.string_field(2)
base_msg: str = betterproto.string_field(3)
args: List[str] = betterproto.string_field(4)
@dataclass
@@ -378,11 +375,10 @@ class AdapterEventError(betterproto.Message):
"""E004"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
name: str = betterproto.string_field(3)
base_msg: str = betterproto.string_field(4)
args: List[str] = betterproto.string_field(5)
exc_info: str = betterproto.string_field(6)
name: str = betterproto.string_field(2)
base_msg: str = betterproto.string_field(3)
args: List[str] = betterproto.string_field(4)
exc_info: str = betterproto.string_field(5)
@dataclass
@@ -390,9 +386,8 @@ class NewConnection(betterproto.Message):
"""E005"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
conn_type: str = betterproto.string_field(3)
conn_name: str = betterproto.string_field(4)
conn_type: str = betterproto.string_field(2)
conn_name: str = betterproto.string_field(3)
@dataclass
@@ -424,9 +419,8 @@ class RollbackFailed(betterproto.Message):
"""E009"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
conn_name: str = betterproto.string_field(3)
exc_info: str = betterproto.string_field(4)
conn_name: str = betterproto.string_field(2)
exc_info: str = betterproto.string_field(3)
@dataclass
@@ -434,8 +428,7 @@ class ConnectionClosed(betterproto.Message):
"""E010"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
conn_name: str = betterproto.string_field(3)
conn_name: str = betterproto.string_field(2)
@dataclass
@@ -443,8 +436,7 @@ class ConnectionLeftOpen(betterproto.Message):
"""E011"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
conn_name: str = betterproto.string_field(3)
conn_name: str = betterproto.string_field(2)
@dataclass
@@ -452,8 +444,7 @@ class Rollback(betterproto.Message):
"""E012"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
conn_name: str = betterproto.string_field(3)
conn_name: str = betterproto.string_field(2)
@dataclass
@@ -481,9 +472,8 @@ class ConnectionUsed(betterproto.Message):
"""E015"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
conn_type: str = betterproto.string_field(3)
conn_name: str = betterproto.string_field(4)
conn_type: str = betterproto.string_field(2)
conn_name: str = betterproto.string_field(3)
@dataclass
@@ -491,9 +481,8 @@ class SQLQuery(betterproto.Message):
"""E016"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
conn_name: str = betterproto.string_field(3)
sql: str = betterproto.string_field(4)
conn_name: str = betterproto.string_field(2)
sql: str = betterproto.string_field(3)
@dataclass
@@ -501,9 +490,8 @@ class SQLQueryStatus(betterproto.Message):
"""E017"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
status: str = betterproto.string_field(3)
elapsed: float = betterproto.float_field(4)
status: str = betterproto.string_field(2)
elapsed: float = betterproto.float_field(3)
@dataclass
@@ -511,8 +499,7 @@ class SQLCommit(betterproto.Message):
"""E018"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
conn_name: str = betterproto.string_field(3)
conn_name: str = betterproto.string_field(2)
@dataclass
@@ -680,8 +667,7 @@ class NewConnectionOpening(betterproto.Message):
"""E037"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
connection_state: str = betterproto.string_field(3)
connection_state: str = betterproto.string_field(2)
@dataclass
@@ -922,7 +908,6 @@ class PartialParsingDeletedMetric(betterproto.Message):
info: "EventInfo" = betterproto.message_field(1)
unique_id: str = betterproto.string_field(2)
@dataclass
class ManifestWrongMetadataVersion(betterproto.Message):
"""I022"""
@@ -1261,9 +1246,14 @@ class JinjaLogWarning(betterproto.Message):
"""I061"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
msg: str = betterproto.string_field(3)
msg: str = betterproto.string_field(2)
@dataclass
class PartialParsingDeletedEntity(betterproto.Message):
"""I062"""
info: "EventInfo" = betterproto.message_field(1)
unique_id: str = betterproto.string_field(2)
@dataclass
class GitSparseCheckoutSubdirectory(betterproto.Message):
@@ -1354,8 +1344,7 @@ class JinjaLogInfo(betterproto.Message):
"""M011"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
msg: str = betterproto.string_field(3)
msg: str = betterproto.string_field(2)
@dataclass
@@ -1363,8 +1352,7 @@ class JinjaLogDebug(betterproto.Message):
"""M012"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
msg: str = betterproto.string_field(3)
msg: str = betterproto.string_field(2)
@dataclass
@@ -1664,6 +1652,7 @@ class NodeStart(betterproto.Message):
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
unique_id: str = betterproto.string_field(3)
@dataclass
@@ -1672,6 +1661,7 @@ class NodeFinished(betterproto.Message):
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
unique_id: str = betterproto.string_field(3)
run_result: "RunResultMsg" = betterproto.message_field(4)
@@ -1690,7 +1680,14 @@ class ConcurrencyLine(betterproto.Message):
info: "EventInfo" = betterproto.message_field(1)
num_threads: int = betterproto.int32_field(2)
target_name: str = betterproto.string_field(3)
node_count: int = betterproto.int32_field(4)
@dataclass
class CompilingNode(betterproto.Message):
"""Q028"""
info: "EventInfo" = betterproto.message_field(1)
unique_id: str = betterproto.string_field(2)
@dataclass
@@ -1698,7 +1695,7 @@ class WritingInjectedSQLForNode(betterproto.Message):
"""Q029"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
unique_id: str = betterproto.string_field(2)
@dataclass
@@ -1707,6 +1704,7 @@ class NodeCompiling(betterproto.Message):
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
unique_id: str = betterproto.string_field(3)
@dataclass
@@ -1715,6 +1713,7 @@ class NodeExecuting(betterproto.Message):
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
unique_id: str = betterproto.string_field(3)
@dataclass
@@ -1792,9 +1791,8 @@ class CatchableExceptionOnRun(betterproto.Message):
"""W002"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
exc: str = betterproto.string_field(3)
exc_info: str = betterproto.string_field(4)
exc: str = betterproto.string_field(2)
exc_info: str = betterproto.string_field(3)
@dataclass
@@ -1912,8 +1910,6 @@ class TimingInfoCollected(betterproto.Message):
"""Z010"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
timing_info: "TimingInfoMsg" = betterproto.message_field(3)
@dataclass
@@ -2159,6 +2155,13 @@ class TrackingInitializeFailure(betterproto.Message):
exc_info: str = betterproto.string_field(2)
@dataclass
class EventBufferFull(betterproto.Message):
"""Z045"""
info: "EventInfo" = betterproto.message_field(1)
@dataclass
class RunResultWarningMessage(betterproto.Message):
"""Z046"""

View File

@@ -265,46 +265,41 @@ message ExposureNameDeprecation {
// E001
message AdapterEventDebug {
EventInfo info = 1;
NodeInfo node_info = 2;
string name = 3;
string base_msg = 4;
repeated string args = 5;
string name = 2;
string base_msg = 3;
repeated string args = 4;
}
// E002
message AdapterEventInfo {
EventInfo info = 1;
NodeInfo node_info = 2;
string name = 3;
string base_msg = 4;
repeated string args = 5;
string name = 2;
string base_msg = 3;
repeated string args = 4;
}
// E003
message AdapterEventWarning {
EventInfo info = 1;
NodeInfo node_info = 2;
string name = 3;
string base_msg = 4;
repeated string args = 5;
string name = 2;
string base_msg = 3;
repeated string args = 4;
}
// E004
message AdapterEventError {
EventInfo info = 1;
NodeInfo node_info = 2;
string name = 3;
string base_msg = 4;
repeated string args = 5;
string exc_info = 6;
string name = 2;
string base_msg = 3;
repeated string args = 4;
string exc_info = 5;
}
// E005
message NewConnection {
EventInfo info = 1;
NodeInfo node_info = 2;
string conn_type = 3;
string conn_name = 4;
string conn_type = 2;
string conn_name = 3;
}
// E006
@@ -328,30 +323,26 @@ message ConnectionClosedInCleanup {
// E009
message RollbackFailed {
EventInfo info = 1;
NodeInfo node_info = 2;
string conn_name = 3;
string exc_info = 4;
string conn_name = 2;
string exc_info = 3;
}
// E010
message ConnectionClosed {
EventInfo info = 1;
NodeInfo node_info = 2;
string conn_name = 3;
string conn_name = 2;
}
// E011
message ConnectionLeftOpen {
EventInfo info = 1;
NodeInfo node_info = 2;
string conn_name = 3;
string conn_name = 2;
}
// E012
message Rollback {
EventInfo info = 1;
NodeInfo node_info = 2;
string conn_name = 3;
string conn_name = 2;
}
// E013
@@ -373,32 +364,28 @@ message ListRelations {
// E015
message ConnectionUsed {
EventInfo info = 1;
NodeInfo node_info = 2;
string conn_type = 3;
string conn_name = 4;
string conn_type = 2;
string conn_name = 3;
}
// E016
message SQLQuery {
EventInfo info = 1;
NodeInfo node_info = 2;
string conn_name = 3;
string sql = 4;
string conn_name = 2;
string sql = 3;
}
// E017
message SQLQueryStatus {
EventInfo info = 1;
NodeInfo node_info = 2;
string status = 3;
float elapsed = 4;
string status = 2;
float elapsed = 3;
}
// E018
message SQLCommit {
EventInfo info = 1;
NodeInfo node_info = 2;
string conn_name = 3;
string conn_name = 2;
}
// E019
@@ -520,8 +507,7 @@ message PluginLoadError {
// E037
message NewConnectionOpening {
EventInfo info = 1;
NodeInfo node_info = 2;
string connection_state = 3;
string connection_state = 2;
}
// E038
@@ -960,8 +946,7 @@ message NodeNotFoundOrDisabled {
// I061
message JinjaLogWarning {
EventInfo info = 1;
NodeInfo node_info = 2;
string msg = 3;
string msg = 2;
}
// M - Deps generation
@@ -1033,15 +1018,13 @@ message SelectorReportInvalidSelector {
// M011
message JinjaLogInfo {
EventInfo info = 1;
NodeInfo node_info = 2;
string msg = 3;
string msg = 2;
}
// M012
message JinjaLogDebug {
EventInfo info = 1;
NodeInfo node_info = 2;
string msg = 3;
string msg = 2;
}
// M013
@@ -1287,12 +1270,14 @@ message DefaultSelector {
message NodeStart {
EventInfo info = 1;
NodeInfo node_info = 2;
string unique_id = 3;
}
// Q025
message NodeFinished {
EventInfo info = 1;
NodeInfo node_info = 2;
string unique_id = 3;
RunResultMsg run_result = 4;
}
@@ -1307,27 +1292,32 @@ message ConcurrencyLine {
EventInfo info = 1;
int32 num_threads = 2;
string target_name = 3;
int32 node_count = 4;
}
// Skipped Q028
// Q028
message CompilingNode {
EventInfo info = 1;
string unique_id = 2;
}
// Q029
message WritingInjectedSQLForNode {
EventInfo info = 1;
NodeInfo node_info = 2;
string unique_id = 2;
}
// Q030
message NodeCompiling {
EventInfo info = 1;
NodeInfo node_info = 2;
string unique_id = 3;
}
// Q031
message NodeExecuting {
EventInfo info = 1;
NodeInfo node_info = 2;
string unique_id = 3;
}
// Q032
@@ -1393,9 +1383,8 @@ message NoNodesSelected {
// W002
message CatchableExceptionOnRun {
EventInfo info = 1;
NodeInfo node_info = 2;
string exc = 3;
string exc_info = 4;
string exc = 2;
string exc_info = 3;
}
// W003
@@ -1487,8 +1476,6 @@ message SystemReportReturnCode {
// Z010
message TimingInfoCollected {
EventInfo info = 1;
NodeInfo node_info = 2;
TimingInfoMsg timing_info = 3;
}
// Z011
@@ -1674,7 +1661,10 @@ message TrackingInitializeFailure {
string exc_info = 2;
}
// Skipped Z045
// Z045
message EventBufferFull {
EventInfo info = 1;
}
// Z046
message RunResultWarningMessage {

View File

@@ -14,9 +14,9 @@ from dbt.events.base_types import (
)
from dbt.events.format import format_fancy_output_line, pluralize
# The generated classes quote the included message classes, requiring the following lines
# The generated classes quote the included message classes, requiring the following line
from dbt.events.proto_types import EventInfo, RunResultMsg, ListOfStrings # noqa
from dbt.events.proto_types import NodeInfo, ReferenceKeyMsg, TimingInfoMsg # noqa
from dbt.events.proto_types import NodeInfo, ReferenceKeyMsg # noqa
from dbt.events import proto_types as pt
from dbt.node_types import NodeType
@@ -1526,6 +1526,14 @@ class JinjaLogWarning(WarnLevel, pt.JinjaLogWarning):
def message(self) -> str:
return self.msg
@dataclass
class PartialParsingDeletedEntity(DebugLevel, pt.PartialParsingDeletedEntity):
def code(self):
return "I062"
def message(self) -> str:
return f"Partial parsing: deleted entity {self.unique_id}"
# =======================================================
# M - Deps generation
@@ -1914,7 +1922,6 @@ class LogTestResult(DynamicLevel, pt.LogTestResult):
@classmethod
def status_to_level(cls, status):
# The statuses come from TestStatus
# TODO should this return EventLevel enum instead?
level_lookup = {
"fail": "error",
"pass": "info",
@@ -2044,7 +2051,6 @@ class LogFreshnessResult(DynamicLevel, pt.LogFreshnessResult):
@classmethod
def status_to_level(cls, status):
# The statuses come from FreshnessStatus
# TODO should this return EventLevel enum instead?
level_lookup = {
"runtime error": "error",
"pass": "info",
@@ -2085,7 +2091,7 @@ class NodeStart(DebugLevel, pt.NodeStart):
return "Q024"
def message(self) -> str:
return f"Began running node {self.node_info.unique_id}"
return f"Began running node {self.unique_id}"
@dataclass
@@ -2094,7 +2100,7 @@ class NodeFinished(DebugLevel, pt.NodeFinished):
return "Q025"
def message(self) -> str:
return f"Finished running node {self.node_info.unique_id}"
return f"Finished running node {self.unique_id}"
@dataclass
@@ -2120,7 +2126,13 @@ class ConcurrencyLine(InfoLevel, pt.ConcurrencyLine): # noqa
return f"Concurrency: {self.num_threads} threads (target='{self.target_name}')"
# Skipped Q028
@dataclass
class CompilingNode(DebugLevel, pt.CompilingNode):
def code(self):
return "Q028"
def message(self) -> str:
return f"Compiling {self.unique_id}"
@dataclass
@@ -2129,7 +2141,7 @@ class WritingInjectedSQLForNode(DebugLevel, pt.WritingInjectedSQLForNode):
return "Q029"
def message(self) -> str:
return f'Writing injected SQL for node "{self.node_info.unique_id}"'
return f'Writing injected SQL for node "{self.unique_id}"'
@dataclass
@@ -2138,7 +2150,7 @@ class NodeCompiling(DebugLevel, pt.NodeCompiling):
return "Q030"
def message(self) -> str:
return f"Began compiling node {self.node_info.unique_id}"
return f"Began compiling node {self.unique_id}"
@dataclass
@@ -2147,7 +2159,7 @@ class NodeExecuting(DebugLevel, pt.NodeExecuting):
return "Q031"
def message(self) -> str:
return f"Began executing node {self.node_info.unique_id}"
return f"Began executing node {self.unique_id}"
@dataclass
@@ -2389,7 +2401,7 @@ class TimingInfoCollected(DebugLevel, pt.TimingInfoCollected):
return "Z010"
def message(self) -> str:
return f"Timing info for {self.node_info.unique_id} ({self.timing_info.name}): {self.timing_info.started_at} => {self.timing_info.completed_at}"
return "finished collecting timing info"
# This prints the stack trace at the debug level while allowing just the nice exception message
@@ -2706,6 +2718,18 @@ class TrackingInitializeFailure(DebugLevel, pt.TrackingInitializeFailure): # no
return "Got an exception trying to initialize tracking"
@dataclass
class EventBufferFull(WarnLevel, pt.EventBufferFull):
def code(self):
return "Z045"
def message(self) -> str:
return (
"Internal logging/event buffer full."
"Earliest logs/events will be dropped as new ones are fired (FIFO)."
)
# this is the message from the result object
@dataclass
class RunResultWarningMessage(WarnLevel, EventStringFunctor, pt.RunResultWarningMessage):

View File

@@ -4,7 +4,6 @@ from typing import NoReturn, Optional, Mapping, Any
from dbt.events.helpers import env_secrets, scrub_secrets
from dbt.events.types import JinjaLogWarning
from dbt.events.contextvars import get_node_info
from dbt.node_types import NodeType
import dbt.dataclass_schema
@@ -997,10 +996,7 @@ def raise_duplicate_alias(
def warn(msg, node=None):
dbt.events.functions.warn_or_error(
JinjaLogWarning(msg=msg, node_info=get_node_info()),
node=node,
)
dbt.events.functions.warn_or_error(JinjaLogWarning(msg=msg), node=node)
return ""

View File

@@ -39,6 +39,7 @@ PRINTER_WIDTH = 80
WHICH = None
INDIRECT_SELECTION = None
LOG_CACHE_EVENTS = None
EVENT_BUFFER_SIZE = 100000
QUIET = None
NO_PRINT = None
CACHE_SELECTED_ONLY = None
@@ -50,6 +51,7 @@ _NON_BOOLEAN_FLAGS = [
"PRINTER_WIDTH",
"PROFILES_DIR",
"INDIRECT_SELECTION",
"EVENT_BUFFER_SIZE",
"TARGET_PATH",
"LOG_PATH",
]
@@ -76,6 +78,7 @@ flag_defaults = {
"PRINTER_WIDTH": 80,
"INDIRECT_SELECTION": "eager",
"LOG_CACHE_EVENTS": False,
"EVENT_BUFFER_SIZE": 100000,
"QUIET": False,
"NO_PRINT": False,
"CACHE_SELECTED_ONLY": False,
@@ -131,7 +134,7 @@ def set_from_args(args, user_config):
global STRICT_MODE, FULL_REFRESH, WARN_ERROR, USE_EXPERIMENTAL_PARSER, STATIC_PARSER
global WRITE_JSON, PARTIAL_PARSE, USE_COLORS, STORE_FAILURES, PROFILES_DIR, DEBUG, LOG_FORMAT
global INDIRECT_SELECTION, VERSION_CHECK, FAIL_FAST, SEND_ANONYMOUS_USAGE_STATS
global PRINTER_WIDTH, WHICH, LOG_CACHE_EVENTS, QUIET, NO_PRINT, CACHE_SELECTED_ONLY
global PRINTER_WIDTH, WHICH, LOG_CACHE_EVENTS, EVENT_BUFFER_SIZE, QUIET, NO_PRINT, CACHE_SELECTED_ONLY
global TARGET_PATH, LOG_PATH
STRICT_MODE = False # backwards compatibility
@@ -156,6 +159,7 @@ def set_from_args(args, user_config):
PRINTER_WIDTH = get_flag_value("PRINTER_WIDTH", args, user_config)
INDIRECT_SELECTION = get_flag_value("INDIRECT_SELECTION", args, user_config)
LOG_CACHE_EVENTS = get_flag_value("LOG_CACHE_EVENTS", args, user_config)
EVENT_BUFFER_SIZE = get_flag_value("EVENT_BUFFER_SIZE", args, user_config)
QUIET = get_flag_value("QUIET", args, user_config)
NO_PRINT = get_flag_value("NO_PRINT", args, user_config)
CACHE_SELECTED_ONLY = get_flag_value("CACHE_SELECTED_ONLY", args, user_config)
@@ -178,7 +182,7 @@ def _set_overrides_from_env():
def get_flag_value(flag, args, user_config):
flag_value = _load_flag_value(flag, args, user_config)
if flag == "PRINTER_WIDTH": # must be ints
if flag in ["PRINTER_WIDTH", "EVENT_BUFFER_SIZE"]: # must be ints
flag_value = int(flag_value)
if flag == "PROFILES_DIR":
flag_value = os.path.abspath(flag_value)
@@ -239,6 +243,7 @@ def get_flag_dict():
"printer_width": PRINTER_WIDTH,
"indirect_selection": INDIRECT_SELECTION,
"log_cache_events": LOG_CACHE_EVENTS,
"event_buffer_size": EVENT_BUFFER_SIZE,
"quiet": QUIET,
"no_print": NO_PRINT,
}

View File

@@ -20,7 +20,7 @@ from .selector_spec import (
INTERSECTION_DELIMITER = ","
DEFAULT_INCLUDES: List[str] = ["fqn:*", "source:*", "exposure:*", "metric:*"]
DEFAULT_INCLUDES: List[str] = ["fqn:*", "source:*", "exposure:*", "metric:*", "entity:*"]
DEFAULT_EXCLUDES: List[str] = []

View File

@@ -5,12 +5,8 @@ from queue import PriorityQueue
from typing import Dict, Set, List, Generator, Optional
from .graph import UniqueId
from dbt.contracts.graph.nodes import (
SourceDefinition,
Exposure,
Metric,
GraphMemberNode,
)
from dbt.contracts.graph.parsed import ParsedSourceDefinition, ParsedExposure, ParsedMetric, ParsedEntity
from dbt.contracts.graph.compiled import GraphMemberNode
from dbt.contracts.graph.manifest import Manifest
from dbt.node_types import NodeType
@@ -52,7 +48,7 @@ class GraphQueue:
if node.resource_type != NodeType.Model:
return False
# must be a Model - tell mypy this won't be a Source or Exposure or Metric
assert not isinstance(node, (SourceDefinition, Exposure, Metric))
assert not isinstance(node, (ParsedSourceDefinition, ParsedExposure, ParsedMetric, ParsedEntity))
if node.is_ephemeral:
return False
return True

Some files were not shown because too many files have changed in this diff Show More