Compare commits

...

69 Commits

Author SHA1 Message Date
Peter Allen Webb
e6a0a1a86a Remove source indicator. 2025-03-04 11:43:12 -05:00
Peter Allen Webb
e8d10ea3c3 Add source indicator. 2025-03-04 11:33:06 -05:00
Peter Allen Webb
c6a18a3fb0 Change script invocation path. 2025-03-04 11:23:41 -05:00
Peter Allen Webb
18ea5d1c73 More debug logging. 2025-03-04 11:15:50 -05:00
Peter Allen Webb
0e5dc412c6 Set execute bit on scripts. 2025-03-04 11:11:44 -05:00
Peter Allen Webb
463bb6c1d0 Add debug logging. 2025-03-04 11:07:25 -05:00
Peter Allen Webb
f1fc49ba8c Add sudos. 2025-03-04 10:57:27 -05:00
Peter Allen Webb
2e4eccb55c Change owner of db creation script so postgres can run it. 2025-03-04 10:53:56 -05:00
Emily Rockman
0e5761dbbb try postgres update 2025-03-04 08:58:06 -06:00
Emily Rockman
8c3b1799a7 updates to ubuntu-latest instead 2025-03-04 08:43:31 -06:00
Emily Rockman
466ee24b86 update ubuntu 20.04 to 24.04 2025-03-04 08:35:25 -06:00
Quigley Malcolm
94b6ae13b3 Rewrite execution of microbatch models to avoid blocking the main thread (#11332)
* Push orchestration of batches previously in the `RunTask` into `MicrobatchModelRunner`

* Split `MicrobatchModelRunner` into two separate runners

`MicrobatchModelRunner` is now an orchestrator of `MicrobatchBatchRunner`s, the latter being what handle actual batch execution

* Introduce new `DbtThreadPool` that knows if it's been closed

* Enable `MicrobatchModelRunner` to shutdown gracefully when it detects the thread pool has been closed
2025-03-03 15:21:24 -06:00
Michelle Ark
f7c4c3c9cc add database to ModelNodeArgs in partial parsing test to better reflect prod (#11330) 2025-02-21 16:36:28 -05:00
Kshitij Aranke
71a93b0cd3 Add secondary_profiles to profile.py (#11308)
* Add secondary_profiles to profile.py

* Add more tests for edge cases

* Add changie

* Allow inferring target name and add tests for the same

* Incorporate review feedback

* remove unnecessary nesting

* Use typing_extensions.Self

* use quoted type again

* address pr comments round 2
2025-02-20 16:38:36 +00:00
Emily Rockman
7bdf27af31 Update artifact triggers and concurrency rules (#11327)
* update trigger

* fix concurrency

* remove duplicate counts and check lt gt not eq
2025-02-19 12:26:36 -06:00
Gerda Shank
e60b41d9fa Add invocation_started_at (#11291) 2025-02-18 11:32:04 -05:00
Kshitij Aranke
2ba765d360 Fix #11275: _get_doc_blocks is crashing parsing if .format is called (#11310)
* Fix #11275: get_doc_blocks is crashing parsing

* Add changie
2025-02-18 15:23:28 +00:00
dependabot[bot]
93e27548ce Bump peter-evans/create-pull-request from 6 to 7 (#10680)
* Bump peter-evans/create-pull-request from 6 to 7

Bumps [peter-evans/create-pull-request](https://github.com/peter-evans/create-pull-request) from 6 to 7.
- [Release notes](https://github.com/peter-evans/create-pull-request/releases)
- [Commits](https://github.com/peter-evans/create-pull-request/compare/v6...v7)

---
updated-dependencies:
- dependency-name: peter-evans/create-pull-request
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>

* Add automated changelog yaml from template for bot PR

* Delete .changes/unreleased/Dependencies-20240909-004501.yaml

remove changelog, not needed

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Github Build Bot <buildbot@fishtownanalytics.com>
Co-authored-by: Emily Rockman <emily.rockman@dbtlabs.com>
2025-02-18 08:18:31 -06:00
Quigley Malcolm
aa89740311 Allow sampling from snapshots and of snapshots (#11311)
* Allow for rendering of refs/sources in snapshots to be sampled

Of note the parameterization of `test_resolve_event_time_filter` in
tests/unit/context/test_providers.py is getting large and cumbersome.
It may be time soon to split it into a few distinct tests to facilitate
the necessity of fewer parametrized arguments for a given test.

* Simplify `isinstance` checks when resolving event time filter

Previously we were doing `isintance(a, class1) or (isinstance(a, class2)`
but this can be simplified to `isintance(a, (class1, class2))`. Woops.

* Ensure sampling of refs of snapshots is possible

Notably we didn't have to add `insinstance(self.target, SnapshotConfig)` to the
checks in `resolve_event_time_filter` because `SnapshotConfig` is a subclass
of `NodeConfig`.

* Add changie doc
2025-02-14 16:02:31 -06:00
Quigley Malcolm
aa306693a5 Allow for running sample mode with build command (#11307) 2025-02-14 12:40:13 -06:00
Quigley Malcolm
7041e5822f Ability to sample seeds (#11304)
* Allow for sampling of seeds

* Add changie doc

* Fix seed column types for `TestSampleSeedRefs` tests
2025-02-13 17:12:38 -06:00
Quigley Malcolm
a08255e4cb Combine --sample and --sample-window into one CLI param (#11303)
* Combine `--sample` and `--sample-window` CLI params

* Add changie doc
2025-02-13 15:59:41 -06:00
Emily Rockman
2cde93bf63 Require 2 CODEOWNER reviews for artifact changes (#11256)
* first pass

* resolve TODOs

* updates after testing
2025-02-11 13:06:28 -06:00
Kshitij Aranke
f29836fcf3 Round 2: Add doc_blocks to manifest for nodes and columns (#11294)
* Reapply "Add `doc_blocks` to manifest for nodes and columns (#11224)" (#11283)

This reverts commit 55e0df181f89241f1d222425f928459f3453ea81.

* Expand doc_blocks backcompat test

* Refactor to method, add docstring
2025-02-11 16:01:16 +00:00
William Deng
7f32e42230 Update ShowTaskDirect to correctly handle --limit -1 (#11284)
* Update  to support  properly

* changelog
2025-02-10 16:53:04 +00:00
Kshitij Aranke
55e0df181f Revert "Add doc_blocks to manifest for nodes and columns (#11224)" (#11283)
This reverts commit d71f309c1e31c15a455046b570f27b0b9a84f7ef.
2025-02-07 17:12:06 +00:00
Kshitij Aranke
588cbabe94 Don't automatically run Artifact Schema Check on PRs (#11260)
* Don't automatically run Artifact Schema Check on PRs

* Add branch as argument
2025-02-06 18:46:31 +00:00
Quigley Malcolm
5f873da929 Sample Mode Alpha (#11247)
* Add `--sample` flag to `run` command

* Remove no longer needed `if` statement around EventTimeFilter creation for microbatch models

Upon the initial implementation of microbatch models, the the `start` for a batch was _optional_.
However, in c3d87b89fb they became guaranteed. Thus the if statement
guarding when `start/end` isn't present for microbatch models was no longer actually doing anything.
Hence, the if statement was safe to remove.

* Get sample mode working with `--event-time-start/end`

This is temporary as a POC. In the end, sample mode can't depend on the arguments
`--event-time-start/end` and will need to be split into their own CLI args / project
config, something like `--sample-window`. The issue with using `--event-time-start/end`
is that if people set those in the project configs, then their microbatch models would
_always_ run with those values even outside of sample mode. Despite that, this is a
useful checkpoint even though it will go away.

* Begin using `--sample-window` for sample mode instead of `--event-time-start/end`

Using `--event-time-start/end` for sample mode was conflicting with microbatch models
when _not_ running in sample mode. We will have to do _slightly_ more work to plumb
this new way of specifying sample time to microbatch models.

* Move `SampleWindow` class to `sample_window.py` in `event_time` submodule

This is mostly symbolic. We are going to be adding some utilities for "event_time"
type things, which will all live in the `event_time` submodule. Additionally we plan
to refactor `/incremental/materializations/microbatch.py` into the sub module as well.

* Create an `offset_timestamp` separate from MicrobatchBuilder

The `MicrobatchBuilder.offset_timestamp` _truncates_ the timestamp before
offsetting it. We don't want to do that, we want to offset the "raw" timestamp.
We could have split renamed the microbatch builder function name to
`truncate_and_offset_timestamp` and separated the offset logic into a separate
abstract function. However, the offset logic in the MicrobatchBuilder context
depends on the truncation. We might later on be able to refactor the Microbatch
provided function by instead truncating _after_ offsetting instead of before.
But that is out of scope for this initial work, and we should instead revisit it
later.

* Add `types-python-dateutil` to dev requirements

The previous commit began using a submodule of the dateutil builtin
python library. We weren't previously using this library, and thus didn't
need the type stubs for it. But now that we do use it, we need to have
the type stubs during development.

* Begin supporting microbatch models in sample mode

* Move parsing logic of `SampleWindowType` to `SampleWindow`

* Allow for specificaion of "specific" sample windows

In most cases people will want to set "relative" sample windows, i.e.
"3 days" to sample the last three days. However, there are some cases
where people will want to "specific" sample windows for some chunk of
historic time, i.e. `{'start': '2024-01-01', 'end': '2024-01-31'}`.

* Fix tests of `BaseResolver.resolve_event_time_filter` for sample mode changes

* Add `--no-sample` as it's necessary for retry

* Add guards to accessing of `sample` and `sample_window`

This was necessary because these aren't _always_ available. I had expected
to need to do this after putting the `sample` flag behind an environment
variable (which I haven't done yet). However, we needed to add the guards
sooner because the `render` logic is called multiple times throughout the
dbt process, and earlier on the flags aren't available.

* Gate sample mode functionality via env var `DBT_EXPERIMENTAL_SAMPLE_MODE`

At this point sample mode is _alpha_ and should not be depended upon. To make
this crystal clear we've gated the functionality behind an environment variable.
We'll likely remove this gate in the coming month.

* Add sample mode tests for incremental models

* Add changie doc for sample mode initial implementation

* Fixup sample mode functional tests

I had updated the `later_input_model.sql` to be easier to test with. However,
I didn't correspondingly update the inital `input_model.sql` to match.

* Ensure microbatch creates correct number of batches when sample mode env var isn't present

Previously microbatch was creating the _right_ number of batches when:
1. sample mode _wasn't_ being used
2. sample mode _was_ being used AND the env var was present

Unfortunately sample mode _wasn't_ creating the right number of batches when:
3. sample mode _was_ being used AND the env var _wasn't_ present.

In case (3) sample mode shouldn't be run. Unfortunately we weren't gating sample
mode by the environment variable during batch creation. This lead to a situtation
where in creating batches it was using sample mode but in the rendering of refs
it _wasn't_ using sample mode. Putting it in an inbetween state... This commit
fixes that issue.

Additionally of note, we currently have duplicate sample mode gating logic in the
batch creation as well as in the rendering of refs. We should probably consolidate
this logic into a singular importable function, that way any future changes of how
sample mode is gated is easier to implement.

* Correct comment in SampleWindow post serialization method

* Hide CLI sample mode options

We are doing this _temporarily_ while sample mode as a feature is in
alpha/beta and locked behind an environment variable. When we remove the
environment variable we should also unhide these.
2025-02-03 18:08:28 -06:00
Anders
fdabe9534c post-hoc addition of roadmap (#11259) 2025-01-30 11:48:21 -06:00
Chenyu Li
c0423707b0 loosen validation for freshness (#11253) 2025-01-28 14:20:36 -08:00
Mike Alfare
48d9afa677 point to the dbt-adapters subdirectory post-monorepo migration (#11244) 2025-01-27 19:01:01 -05:00
Kshitij Aranke
d71f309c1e Add doc_blocks to manifest for nodes and columns (#11224) 2025-01-27 19:49:02 +00:00
Emily Rockman
cb323ef78c ADAP-1183: Use the new location for dbt-postgres (#11234)
* use the new location for dbt-postgres

* Update docker/README.md
2025-01-27 10:35:14 -06:00
Quigley Malcolm
22bc1c374e [TIDY FIRST] Click option defintiion organization (alphabetization) (#11236)
* First pass of alphebetizing click option definitions

* Second pass at organizing cli param click options
2025-01-23 13:34:58 -06:00
Kshitij Aranke
31881d2a3b Misc fixes for group info in logging (#11218) 2025-01-21 11:07:25 +00:00
dependabot[bot]
1dcdcd2f52 Bump codecov/codecov-action from 4 to 5 (#11009)
* Bump codecov/codecov-action from 4 to 5

Bumps [codecov/codecov-action](https://github.com/codecov/codecov-action) from 4 to 5.
- [Release notes](https://github.com/codecov/codecov-action/releases)
- [Changelog](https://github.com/codecov/codecov-action/blob/main/CHANGELOG.md)
- [Commits](https://github.com/codecov/codecov-action/compare/v4...v5)

---
updated-dependencies:
- dependency-name: codecov/codecov-action
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>

* Add automated changelog yaml from template for bot PR

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Github Build Bot <buildbot@fishtownanalytics.com>
2025-01-17 11:35:19 -06:00
Gerda Shank
3de3b827bf Fix generic test not null and unique custom configs (#11208) 2025-01-15 17:02:03 -05:00
Kshitij Aranke
8a8857a85c Fix error counts for exposures (#11207) 2025-01-10 21:07:15 +00:00
Gerda Shank
e4d5a4e777 Re-cast definition of unique_key in SnapshotConfig (#11205) 2025-01-09 16:03:17 -05:00
internetcoffeephone
b414ef2cc5 Fix microbatch dbt list --output JSON (#11187)
Currently, running this command on a project containing a microbatch
model results in an error, as microbatch models require a datetime
value in their config which cannot be serialized by the default JSON
serializer.

There already exists a custom JSON serializer within the dbt-core
project that converts datetime to ISO string format. This change uses
the above serializer to resolve the error.
2025-01-09 10:59:49 -05:00
Michelle Ark
57e279cc1b Fix exception type of run operation macro not found (#11193) 2025-01-08 10:17:40 -05:00
Quigley Malcolm
2eb1a5c3ea Always emit warning when microbatch models lack any filtered input node (#11196)
* Update `TestMicrobatchWithInputWithoutEventTime` to check running again raises warning

The first time the project is run, the appropriate warning about inputs is raised. However,
the warning is only being raised when a full parse happens. When partial parsing happens
the warning isn't getting raised. In the next commit we'll fix this issue. This commit updates
the test to show that the second run (with partial parsing) doesn't raise the update, and thus
the test fails.

* Update manifest loading to _always_ check microbatch model inputs

Of note we are at the point where multiple validations are iterating
all of the nodes in a manifest. We should refactor these _soon_ such that
we are not iterating over the same list multiple times.

* Add changie doc
2025-01-08 09:16:30 -06:00
Kshitij Aranke
dcc9a0ca29 Create LogNodeResult event (#11195)
* Create LogNodeResult event

* add msg directly during object creation
2025-01-08 14:14:29 +00:00
Devon Fulcher
892c545985 Meta config for dimensions, measures, and entities (#11190) 2025-01-07 11:17:24 -08:00
Chenyu Li
a8702b8374 add model freshness for adaptive job (#11170) 2025-01-07 10:02:52 -08:00
Daniel Cole
1592987de8 fix: partial parsing - reparse downstream nodes when adding versioning (#11184) 2025-01-07 12:12:47 -05:00
Gerda Shank
710600546a Move initialization of keyword_args up (#11188) 2025-01-06 15:01:21 -05:00
Emily Rockman
0bf38ce294 Revert "Fix circular import (#11137)" (#11175)
This reverts commit 95c090bed0b745c272a977d11fc248168e94bb25.
2024-12-24 12:18:26 -05:00
Grace Goheen
459d156e85 Roadmap update (Dec 2024) (#11173)
* roadmap post december 2024

* fix yml spacing

* fix code snippet format
2024-12-20 12:56:33 -07:00
dmr
95c090bed0 Fix circular import (#11137)
Co-authored-by: Doug Beatty <44704949+dbeatty10@users.noreply.github.com>
2024-12-19 12:45:42 -08:00
Chenyu Li
f2222d2621 Custom SQL for get source maxLoadedAt (#11163) 2024-12-19 11:49:07 -08:00
Patrick Yost
97ffc37405 Add tags to SavedQueries (#10987) 2024-12-19 10:18:50 -08:00
Gerda Shank
bf18b59845 Fix for dbt_project.yml "tests" config resulting in incorrect state:modified (#11166) 2024-12-18 17:21:45 -05:00
Gerda Shank
88e953e8aa Check modified contracts when doing state:modified (#11161) 2024-12-18 15:40:18 -05:00
Gerda Shank
6076cf7114 Fix yaml snapshot specification with data tests (#11156) 2024-12-18 14:24:27 -05:00
Doug Beatty
a1757934ef Auto-response for bug reports during holiday break (#11152) 2024-12-17 07:47:31 -06:00
Quigley Malcolm
6c61cb7f7a Warn if concurrent_batches config is set to True, but the available adapter doesn't support it (#11145)
* Begin producing warning when attempting to force concurrent batches without adapter support

Batches of microbatch models can be executed sequentially or concurrently. We try to figure out which to do intelligently. As part of that, we implemented an override, the model config `concurrent_batches`, to allow the user to bypass _some_ of our automatic detection. However, a user _cannot_ for batches to run concurrently if the adapter doesn't support concurrent batches (declaring support is opt in). Thus, if an adapter _doesn't_ support running batches concurrently, and a user tries to force concurrent execution via `concurrent_batches`, then we need to warn the user that that isn't happening.

* Add custom event type for warning about invalid `concurrent_batches` config

* Fire `InvalidConcurrentBatchesConfig` warning via `warn_or_error` so it can be silenced
2024-12-16 10:35:08 -06:00
Chenyu Li
4b1f1c4029 add allow additional property for Model and SourceDefinition (#11138) 2024-12-15 23:30:48 -08:00
Kshitij Aranke
7df04b0fe4 Create a no-op exposure runner (#11082) 2024-12-12 15:28:34 +00:00
dave-connors-3
662101590d update adapter version messages (#10919) 2024-12-11 10:56:38 -05:00
Michelle Ark
fc6167a2ee fix MicrobatchExecutionDebug message (#11071)
* fix MicrobatchExecutionDebug message

* Fix typing in `describe_batch` to convince mypy `batch_start` exists when needed

---------

Co-authored-by: Quigley Malcolm <quigley.malcolm@dbtlabs.com>
2024-12-10 09:59:46 -06:00
Quigley Malcolm
983cbb4f28 Fix microbatch model PartialSuccess status to result in non-zero exit code (#11115)
* Update partial success test to assert partial successes mean that the run failed

* Update results interpretation to include `PartialSuccess` as failure status
2024-12-10 09:48:41 -06:00
Quigley Malcolm
c9582c2323 Fix erroneous additional batch execution (#11113)
* Update single batch test case to check for generic exceptions

* Explicitly skip last final batch execution when there is only one batch

Previously if there was only one batch, we would try to execute _two_
batches. The first batch, and a "last" non existent batch. This would
result in an unhandled exception.

* Changie doc
2024-12-10 09:28:07 -06:00
Michelle Ark
03fdb4c157 Microbatch first last batch serial (#11072)
* microbatch: split out first and last batch to run in serial

* only run pre_hook on first batch, post_hook on last batch

* refactor: internalize parallel to RunTask._submit_batch

* Add optional `force_sequential` to `_submit_batch` to allow for skipping parallelism check

* Force last batch to run sequentially

* Force first batch to run sequentially

* Remove batch_idx check in `should_run_in_parallel`

`should_run_in_parallel` shouldn't, and no longer needs to, take into
consideration where in batch exists in a larger context. The first and
last batch for a microbatch model are now forced to run sequentially
by `handle_microbatch_model`

* Begin skipping batches if first batch fails

* Write custom `on_skip` for `MicrobatchModelRunner` to better handle when batches are skipped

This was necessary specifically because the default on skip set the `X of Y` part
of the skipped log using the `node_index` and the `num_nodes`. If there was 2
nodes and we are on the 4th batch of the second node, we'd get a message like
`SKIPPED 4 of 2...` which didn't make much sense. We're likely in a future commit
going to add a custom event for logging the start, result, and skipping of batches
for better readability of the logs.

* Add microbatch pre-hook, post-hook, and sequential first/last batch tests

* Fix/Add tests around first batch failure vs latter batch failure

* Correct MicrobatchModelRunner.on_skip to handle skipping the entire node

Previously `MicrobatchModelRunner.on_skip` only handled when a _batch_ of
the model was being skipped. However, that method is also used when the
entire microbatch model is being skipped due to an upstream node error. Because
we previously _weren't_ handling this second case, it'd cause an unhandled
runtime exception. Thus, we now need to check whether we're running a batch or not,
and there is no batch, then use the super's on_skip method.

* Correct conditional logic for setting pre- and post-hooks for batches

Previously we were doing an if+elif for setting pre- and post-hooks
for batches, where in the `if` matched if the batch wasn't the first
batch, and the `elif` matched if the batch wasn't the last batch. The
issue with this is that if the `if` was hit, the `elif` _wouldn't_ be hit.
This caused the first batch to appropriately not run the `post-hook` but
then every hook after would run the `post-hook`.

* Add two new event types `LogStartBatch` and `LogBatchResult`

* Update MicrobatchModelRunner to use new batch specific log events

* Fix event testing

* Update microbatch integration tests to catch batch specific event types

---------

Co-authored-by: Quigley Malcolm <quigley.malcolm@dbtlabs.com>
2024-12-07 12:43:15 -06:00
Peter Webb
afe25a99fe Improve the Performance Characteristics of add_test_edges() (#11092)
* New function to add graph edges.

* Clean up, leave out flag temporarily for testing.

* Put new test edge behavior behind flag.

* Final draft of documentaiton.
2024-12-05 16:33:16 -05:00
Gerda Shank
e32b8a90ac Implement partial parsing for singular data tests configs in yaml files (#11100) 2024-12-05 15:57:56 -05:00
Peter Webb
1472b86ee2 Improve performance of select_children() and select_parents() (#11099)
* Improve performance of select_children() and select_parents()

* Add changelog entry.
2024-12-05 15:03:57 -05:00
William Deng
ff6745c795 Update core to support DSI 0.8.3 (#10990)
Co-authored-by: Courtney Holcomb <courtneyeholcomb@gmail.com>
2024-12-05 09:48:33 -08:00
Thomas Reynaud
fdfe03d561 Access DEBUG flag through get_flags() (#11069) 2024-12-04 11:03:12 -05:00
172 changed files with 6682 additions and 1935 deletions

View File

@@ -0,0 +1,6 @@
kind: Breaking Changes
body: Add invocations_started_at field to artifact metadata
time: 2025-02-10T12:33:06.722803-05:00
custom:
Author: gshank
Issue: "11272"

View File

@@ -0,0 +1,6 @@
kind: Dependencies
body: Upgrading dbt-semantic-interfaces to 0.8.3 for custom grain support in offset windows
time: 2024-11-12T16:38:15.351519-05:00
custom:
Author: WilliamDee
Issue: None

View File

@@ -0,0 +1,6 @@
kind: "Dependencies"
body: "Bump codecov/codecov-action from 4 to 5"
time: 2024-11-18T00:11:13.00000Z
custom:
Author: dependabot[bot]
Issue: 11009

View File

@@ -0,0 +1,7 @@
kind: Features
body: Ensure pre/post hooks only run on first/last batch respectively for microbatch
model batches
time: 2024-12-06T19:53:08.928793-06:00
custom:
Author: MichelleArk QMalcolm
Issue: 11094 11104

View File

@@ -0,0 +1,6 @@
kind: Features
body: Support "tags" in Saved Queries
time: 2024-12-16T09:54:35.327675-08:00
custom:
Author: theyostalservice
Issue: "11155"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Calculate source freshness via a SQL query
time: 2024-12-17T17:16:31.841076-08:00
custom:
Author: ChenyuLInx
Issue: "8797"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Add freshness definition on model for adaptive job
time: 2024-12-18T17:07:29.55754-08:00
custom:
Author: ChenyuLInx
Issue: "11123"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Meta config for dimensions measures and entities
time: 2025-01-06T13:28:29.176439-06:00
custom:
Author: DevonFulcher
Issue: None

View File

@@ -0,0 +1,6 @@
kind: Features
body: Add doc_blocks to manifest for nodes and columns
time: 2025-01-22T17:03:28.866522Z
custom:
Author: aranke
Issue: 11000 11001

View File

@@ -0,0 +1,6 @@
kind: Features
body: Initial implementation of sample mode
time: 2025-02-02T14:00:54.074209-06:00
custom:
Author: QMalcolm
Issue: 11227 11230 11231 11248 11252 11254 11258

View File

@@ -0,0 +1,6 @@
kind: Features
body: Combine `--sample` and `--sample-window` CLI params
time: 2025-02-12T15:56:58.546879-06:00
custom:
Author: QMalcolm
Issue: "11299"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Allow for sampling of ref'd seeds
time: 2025-02-12T17:37:43.554156-06:00
custom:
Author: QMalcolm
Issue: "11300"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Enable sample mode for 'build' command
time: 2025-02-13T18:29:32.238857-06:00
custom:
Author: QMalcolm
Issue: "11298"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Allow sampling nodes snapshots depend on and of snapshots as a dependency
time: 2025-02-14T15:29:57.118017-06:00
custom:
Author: QMalcolm
Issue: "11301"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: update adapter version messages
time: 2024-10-25T10:43:39.274723-05:00
custom:
Author: dave-connors-3
Issue: "10230"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Access DBUG flag more consistently with the rest of the codebase in ManifestLoader
time: 2024-11-28T16:29:36.236729+01:00
custom:
Author: Threynaud
Issue: "11068"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Improve the performance characteristics of add_test_edges()
time: 2024-12-04T10:04:29.096231-05:00
custom:
Author: peterallenwebb
Issue: "10950"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Implement partial parsing for singular data test configs in yaml files
time: 2024-12-05T14:53:07.295536-05:00
custom:
Author: gshank
Issue: "10801"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Fix debug log messages for microbatch batch execution information
time: 2024-12-09T11:38:06.972743-06:00
custom:
Author: MichelleArk QMalcolm
Issue: "11111"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Fix running of extra "last" batch when there is only one batch
time: 2024-12-09T13:33:17.253326-06:00
custom:
Author: QMalcolm
Issue: "11112"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Fix interpretation of `PartialSuccess` to result in non-zero exit code
time: 2024-12-09T15:07:11.391313-06:00
custom:
Author: QMalcolm
Issue: "11114"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Warn about invalid usages of `concurrent_batches` config
time: 2024-12-12T11:36:11.451962-06:00
custom:
Author: QMalcolm
Issue: "11122"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Error writing generic test at run time
time: 2024-12-16T13:46:45.936573-05:00
custom:
Author: gshank
Issue: "11110"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Run check_modified_contract for state:modified
time: 2024-12-17T15:48:48.053054-05:00
custom:
Author: gshank
Issue: "11034"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Fix unrendered_config for tests from dbt_project.yml
time: 2024-12-18T11:26:40.270022-05:00
custom:
Author: gshank
Issue: "11146"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Make partial parsing reparse referencing nodes of newly versioned models.
time: 2025-01-02T14:05:43.629959-05:00
custom:
Author: d-cole
Issue: "8872"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Ensure warning about microbatch lacking filter inputs is always fired
time: 2025-01-07T17:37:19.373261-06:00
custom:
Author: QMalcolm
Issue: "11159"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Fix microbatch dbt list --output json
time: 2025-01-09T12:33:09.958795+01:00
custom:
Author: internetcoffeephone
Issue: 10556 11098

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Fix for custom fields in generic test config for not_null and unique tests
time: 2025-01-10T15:58:24.479245-05:00
custom:
Author: gshank
Issue: "11208"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Loosen validation on freshness to accomodate previously wrong but harmless config.
time: 2025-01-28T13:55:09.318833-08:00
custom:
Author: ChenyuLInx peterallenwebb
Issue: "11123"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Handle `--limit -1` properly in `ShowTaskDirect` so that it propagates None instead of a negative int
time: 2025-02-07T13:14:24.725503-05:00
custom:
Author: WilliamDee
Issue: None

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: _get_doc_blocks is crashing parsing if .format is called
time: 2025-02-18T13:47:45.659731Z
custom:
Author: aranke
Issue: "11310"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Fix microbatch execution to not block main thread nor hang
time: 2025-03-03T13:14:40.432874-06:00
custom:
Author: QMalcolm
Issue: 11243 11306

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Create a no-op exposure runner
time: 2024-12-02T16:47:15.766574Z
custom:
Author: aranke
Issue: ' '

View File

@@ -0,0 +1,7 @@
kind: Under the Hood
body: Improve selection peformance by optimizing the select_children() and select_parents()
functions.
time: 2024-12-05T14:31:44.584216-05:00
custom:
Author: peterallenwebb
Issue: "11099"

View File

@@ -0,0 +1,7 @@
kind: Under the Hood
body: Change exception type from DbtInternalException to UndefinedMacroError when
macro not found in 'run operation' command
time: 2025-01-07T12:39:55.234321-05:00
custom:
Author: michelleark
Issue: "11192"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Create LogNodeResult event
time: 2025-01-07T20:58:38.821036Z
custom:
Author: aranke
Issue: ' '

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Fix error counts for exposures
time: 2025-01-10T20:20:57.01632Z
custom:
Author: aranke
Issue: ' '

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Misc fixes for group info in logging
time: 2025-01-17T15:22:15.497485Z
custom:
Author: aranke
Issue: '11218'

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Add secondary profiles to profile.py
time: 2025-02-14T12:38:53.964266Z
custom:
Author: aranke
Issue: XPLAT-241

View File

@@ -61,7 +61,7 @@ body:
label: Environment
description: |
examples:
- **OS**: Ubuntu 20.04
- **OS**: Ubuntu 24.04
- **Python**: 3.9.12 (`python3 --version`)
- **dbt-core**: 1.1.1 (`dbt --version`)
value: |

View File

@@ -55,7 +55,7 @@ body:
label: Environment
description: |
examples:
- **OS**: Ubuntu 20.04
- **OS**: Ubuntu 24.04
- **Python**: 3.9.12 (`python3 --version`)
- **dbt-core (working version)**: 1.1.1 (`dbt --version`)
- **dbt-core (regression version)**: 1.2.0 (`dbt --version`)

View File

@@ -11,9 +11,17 @@ runs:
sudo sh -c 'echo "deb https://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
curl -fsSL https://www.postgresql.org/media/keys/ACCC4CF8.asc|sudo gpg --dearmor -o /etc/apt/trusted.gpg.d/postgresql.gpg
sudo apt update -y
sudo apt install postgresql-16
sudo apt install postgresql-17
sudo apt-get -y install postgresql postgresql-contrib
sudo systemctl start postgresql
sudo systemctl enable postgresql
pg_isready
sudo -u postgres bash ${{ github.action_path }}/setup_db.sh
echo "Updating setup_db.sh script ownership and execute bit"
sudo chown postgres ${{ github.action_path }}/../../../test/setup_db.sh
sudo chown postgres ${{ github.action_path }}/setup_db.sh
sudo chmod +x ${{ github.action_path }}/../../../test/setup_db.sh
sudo chmod +x ${{ github.action_path }}/setup_db.sh
ls -la ${{ github.action_path }}/../../../test/setup_db.sh
ls -la ${{ github.action_path }}/setup_db.sh
echo "Running setup_db.sh"
sudo -u postgres bash ${{ github.action_path }}/../../../test/setup_db.sh

153
.github/workflows/artifact-reviews.yml vendored Normal file
View File

@@ -0,0 +1,153 @@
# **what?**
# Enforces 2 reviews when artifact or validation files are modified.
# **why?**
# Ensure artifact changes receive proper review from designated team members. GitHub doesn't support
# multiple reviews on a single PR based on files changed, so we need to enforce this manually.
# **when?**
# This will run when PRs are opened, synchronized, reopened, edited, or when reviews
# are submitted and dismissed.
name: "Enforce Additional Reviews on Artifact and Validations Changes"
on:
pull_request_target:
types: [opened, synchronize, reopened, edited]
# retrigger check on review events
pull_request_review:
types: [submitted, edited, dismissed]
# only run this once per PR at a time
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number }}
cancel-in-progress: false # wait for in-progress runs to complete to prevent race condition
env:
required_approvals: 2
team: "core-group"
jobs:
cleanup-old-runs:
# this job is only run once per PR at a time. Since it uses two types of triggers,
# when the pull_request trigger fails, that run stays around when the pull_request_review
# triggers a new run. This job will clean up those old runs so we only end up with a single run.
name: "Cleanup Previous Runs"
runs-on: ubuntu-latest
steps:
- name: "Dismiss previous workflow runs"
run: |
# Get all check runs for this PR's SHA
cleanup_checks=$(gh api repos/${{ github.repository }}/commits/${{ github.event.pull_request.head.sha }}/check-runs \
--jq '.check_runs[] | select(.name == "Cleanup Previous Runs")')
review_checks=$(gh api repos/${{ github.repository }}/commits/${{ github.event.pull_request.head.sha }}/check-runs \
--jq '.check_runs[] | select(.name == "Validate Additional Reviews")')
# For each check run from this workflow (except current), dismiss it
{ echo "$cleanup_checks"; echo "$review_checks"; } | jq -r '. | select(.id != ${{ github.run_id }}) | .id' | \
while read -r check_id; do
echo "Dismissing check $check_id"
gh api repos/${{ github.repository }}/check-runs/$check_id \
-X PATCH \
-F status="completed" \
-F conclusion="neutral" \
-F "output[title]=Superseded" \
-F "output[summary]=This check was superseded by a newer run"
done
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
check-reviews:
name: "Validate Additional Reviews"
needs: [cleanup-old-runs]
runs-on: ubuntu-latest
steps:
- name: "Checkout code"
uses: actions/checkout@v4
- name: "Get list of changed files"
id: changed_files
run: |
CHANGED_FILES=$(gh api repos/${{ github.repository }}/pulls/${{ github.event.pull_request.number }}/files | jq -r '.[].filename')
echo "Changed files:"
echo "$CHANGED_FILES"
echo "CHANGED_FILES<<EOF" >> $GITHUB_OUTPUT
echo "$CHANGED_FILES" >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: "Check if any artifact files were changed"
id: artifact_files_changed
run: |
artifact_changes=false
while IFS= read -r file; do
echo "Debug: Checking file: '$file'"
if [[ "$file" == "core/dbt/artifacts/"* ]] ; then
artifact_changes=true
break
fi
done <<< "${{ steps.changed_files.outputs.CHANGED_FILES }}"
echo "artifact_changes=$artifact_changes" >> $GITHUB_OUTPUT
- name: "Get Core Team Members"
if: ${{ steps.artifact_files_changed.outputs.artifact_changes == 'true' }}
id: core_members
run: |
gh api -H "Accept: application/vnd.github+json" \
/orgs/dbt-labs/teams/${{ env.team }}/members > core_members.json
# Extract usernames and set as multiline output
echo "membership<<EOF" >> $GITHUB_OUTPUT
jq -r '.[].login' core_members.json >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
env:
GH_TOKEN: ${{ secrets.IT_TEAM_MEMBERSHIP }}
- name: "Verify ${{ env.required_approvals }} core team approvals"
id: check_approvals
if: ${{ steps.artifact_files_changed.outputs.artifact_changes == 'true' }}
run: |
# Get all reviews
REVIEWS=$(gh api repos/${{ github.repository }}/pulls/${{ github.event.pull_request.number }}/reviews)
# Count approved reviews from core team members (only most recent review per user)
CORE_APPROVALS=0
while IFS= read -r member; do
echo "member: $member"
APPROVED=$(echo "$REVIEWS" | jq --arg user "$member" '
group_by(.user.login) |
map(select(.[0].user.login == $user) |
sort_by(.submitted_at) |
last) |
map(select(.state == "APPROVED")) |
length')
CORE_APPROVALS=$((CORE_APPROVALS + APPROVED))
done <<< "${{ steps.core_members.outputs.membership }}"
echo "CORE_APPROVALS=$CORE_APPROVALS" >> $GITHUB_OUTPUT
echo $CORE_APPROVALS
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: "Notify and fail if not enough approvals"
if: ${{ steps.artifact_files_changed.outputs.artifact_changes == 'true' && steps.check_approvals.outputs.CORE_APPROVALS < fromJSON(env.required_approvals) }}
run: |
title="PR Approval Requirements Not Met"
message="Changes to artifact directory files requires at least ${{ env.required_approvals }} approvals from core team members. Current number of core team approvals: ${{ steps.check_approvals.outputs.CORE_APPROVALS }} "
echo "::error title=$title::$message"
exit 1
- name: "Notify of sufficient approvals"
if: ${{ steps.artifact_files_changed.outputs.artifact_changes == 'true' && steps.check_approvals.outputs.CORE_APPROVALS >= fromJSON(env.required_approvals) }}
run: |
title="Extra requirements met"
message="Changes to artifact directory files requires at least ${{ env.required_approvals }} approvals from core team members. Current number of core team approvals: ${{ steps.check_approvals.outputs.CORE_APPROVALS }} "
echo "::notice title=$title::$message"
- name: "Notify of no extra requirements"
if: ${{ steps.artifact_files_changed.outputs.artifact_changes != 'true' }}
run: |
title="No extra requirements"
message="No additional reviews required"
echo "::notice title=$title::$message"

View File

@@ -0,0 +1,50 @@
# **what?**
# Check if the an issue is opened near or during an extended holiday period.
# If so, post an automatically-generated comment about the holiday for bug reports.
# Also provide specific information to customers of dbt Cloud.
# **why?**
# Explain why responses will be delayed during our holiday period.
# **when?**
# This will run when new issues are opened.
name: Auto-Respond to Bug Reports During Holiday Period
on:
issues:
types:
- opened
permissions:
contents: read
issues: write
jobs:
auto-response:
runs-on: ubuntu-latest
steps:
- name: Check if current date is within holiday period
id: date-check
run: |
current_date=$(date -u +"%Y-%m-%d")
start_date="2024-12-23"
end_date="2025-01-05"
if [[ "$current_date" < "$start_date" || "$current_date" > "$end_date" ]]; then
echo "outside_holiday=true" >> $GITHUB_ENV
else
echo "outside_holiday=false" >> $GITHUB_ENV
fi
- name: Post comment
if: ${{ env.outside_holiday == 'false' && contains(github.event.issue.labels.*.name, 'bug') }}
run: |
gh issue comment ${{ github.event.issue.number }} --repo ${{ github.repository }} --body "Thank you for your bug report! Our team is will be out of the office for [Christmas and our Global Week of Rest](https://handbook.getdbt.com/docs/time_off#2024-us-holidays), from December 25, 2024, through January 3, 2025.
We will review your issue as soon as possible after returning.
Thank you for your understanding, and happy holidays! 🎄🎉
If you are a customer of dbt Cloud, please contact our Customer Support team via the dbt Cloud web interface or email **support@dbtlabs.com**."
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -112,7 +112,7 @@ jobs:
- name: Upload Unit Test Coverage to Codecov
if: ${{ matrix.python-version == '3.11' }}
uses: codecov/codecov-action@v4
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
flags: unit
@@ -163,8 +163,9 @@ jobs:
fail-fast: false
matrix:
python-version: [ "3.9", "3.10", "3.11", "3.12" ]
os: [ubuntu-20.04]
os: [ubuntu-latest]
split-group: ${{ fromJson(needs.integration-metadata.outputs.split-groups) }}
# this include is where we add the mac and windows os
include: ${{ fromJson(needs.integration-metadata.outputs.include) }}
env:
TOXENV: integration
@@ -230,7 +231,7 @@ jobs:
- name: Upload Integration Test Coverage to Codecov
if: ${{ matrix.python-version == '3.11' }}
uses: codecov/codecov-action@v4
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
flags: integration

View File

@@ -253,7 +253,7 @@ jobs:
push: 'origin origin/${{ matrix.target-branch }}'
- name: Create Pull Request
uses: peter-evans/create-pull-request@v6
uses: peter-evans/create-pull-request@v7
with:
author: 'Github Build Bot <buildbot@fishtownanalytics.com>'
base: ${{ matrix.base-branch }}

View File

@@ -9,15 +9,21 @@
# occur so we want to proactively alert to it.
#
# **when?**
# On pushes to `develop` and release branches. Manual runs are also enabled.
# Only can be run manually
name: Artifact Schema Check
on:
pull_request:
types: [ opened, reopened, labeled, unlabeled, synchronize ]
paths-ignore: [ '.changes/**', '.github/**', 'tests/**', '**.md', '**.yml' ]
# pull_request:
# types: [ opened, reopened, labeled, unlabeled, synchronize ]
# paths-ignore: [ '.changes/**', '.github/**', 'tests/**', '**.md', '**.yml' ]
workflow_dispatch:
inputs:
target_branch:
description: "The branch to check against"
type: string
default: 'main'
required: true
# no special access is needed
permissions: read-all
@@ -43,6 +49,7 @@ jobs:
uses: actions/checkout@v4
with:
path: ${{ env.DBT_REPO_DIRECTORY }}
ref: ${{ inputs.target_branch }}
- name: Check for changes in core/dbt/artifacts
# https://github.com/marketplace/actions/paths-changes-filter

View File

@@ -45,7 +45,7 @@ jobs:
# run the performance measurements on the current or default branch
test-schema:
name: Test Log Schema
runs-on: ubuntu-20.04
runs-on: ubuntu-latest
timeout-minutes: 30
needs:
- integration-metadata

View File

@@ -32,7 +32,7 @@ This is the docs website code. It comes from the dbt-docs repository, and is gen
## Adapters
dbt uses an adapter-plugin pattern to extend support to different databases, warehouses, query engines, etc.
Note: dbt-postgres used to exist in dbt-core but is now in [its own repo](https://github.com/dbt-labs/dbt-postgres)
Note: dbt-postgres used to exist in dbt-core but is now in [a separate repo](https://github.com/dbt-labs/dbt-adapters/dbt-postgres)
Each adapter is a mix of python, Jinja2, and SQL. The adapter code also makes heavy use of Jinja2 to wrap modular chunks of SQL functionality, define default implementations, and allow plugins to override it.

View File

@@ -46,7 +46,12 @@ from dbt.artifacts.resources.v1.metric import (
MetricTimeWindow,
MetricTypeParams,
)
from dbt.artifacts.resources.v1.model import Model, ModelConfig, TimeSpine
from dbt.artifacts.resources.v1.model import (
Model,
ModelConfig,
ModelFreshness,
TimeSpine,
)
from dbt.artifacts.resources.v1.owner import Owner
from dbt.artifacts.resources.v1.saved_query import (
Export,

View File

@@ -75,3 +75,6 @@ class BatchSize(StrEnum):
day = "day"
month = "month"
year = "year"
def plural(self) -> str:
return str(self) + "s"

View File

@@ -15,6 +15,20 @@ from dbt_semantic_interfaces.type_enums import TimeGranularity
NodeVersion = Union[str, float]
def _backcompat_doc_blocks(doc_blocks: Any) -> List[str]:
"""
Make doc_blocks backwards-compatible for scenarios where a user specifies `doc_blocks` on a model or column.
Mashumaro will raise a serialization error if the specified `doc_blocks` isn't a list of strings.
In such a scenario, this method returns an empty list to avoid a serialization error.
Further along, `_get_doc_blocks` in `manifest.py` populates the correct `doc_blocks` for the happy path.
"""
if isinstance(doc_blocks, list) and all(isinstance(x, str) for x in doc_blocks):
return doc_blocks
return []
@dataclass
class MacroDependsOn(dbtClassMixin):
macros: List[str] = field(default_factory=list)
@@ -68,6 +82,12 @@ class ColumnInfo(AdditionalPropertiesMixin, ExtensibleDbtClassMixin):
tags: List[str] = field(default_factory=list)
_extra: Dict[str, Any] = field(default_factory=dict)
granularity: Optional[TimeGranularity] = None
doc_blocks: List[str] = field(default_factory=list)
def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None) -> dict:
dct = super().__post_serialize__(dct, context)
dct["doc_blocks"] = _backcompat_doc_blocks(dct["doc_blocks"])
return dct
@dataclass
@@ -197,13 +217,18 @@ class ParsedResource(ParsedResourceMandatory):
unrendered_config_call_dict: Dict[str, Any] = field(default_factory=dict)
relation_name: Optional[str] = None
raw_code: str = ""
doc_blocks: List[str] = field(default_factory=list)
def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
if context and context.get("artifact") and "config_call_dict" in dct:
del dct["config_call_dict"]
if context and context.get("artifact") and "unrendered_config_call_dict" in dct:
del dct["unrendered_config_call_dict"]
dct["doc_blocks"] = _backcompat_doc_blocks(dct["doc_blocks"])
return dct

View File

@@ -46,7 +46,15 @@ class MetricInputMeasure(dbtClassMixin):
@dataclass
class MetricTimeWindow(dbtClassMixin):
count: int
granularity: TimeGranularity
granularity: str
@property
def window_string(self) -> str: # noqa: D
return f"{self.count} {self.granularity}"
@property
def is_standard_granularity(self) -> bool: # noqa: D
return self.granularity.casefold() in {item.value.casefold() for item in TimeGranularity}
@dataclass
@@ -55,7 +63,7 @@ class MetricInput(dbtClassMixin):
filter: Optional[WhereFilterIntersection] = None
alias: Optional[str] = None
offset_window: Optional[MetricTimeWindow] = None
offset_to_grain: Optional[TimeGranularity] = None
offset_to_grain: Optional[str] = None
def as_reference(self) -> MetricReference:
return MetricReference(element_name=self.name)
@@ -83,7 +91,7 @@ class ConversionTypeParams(dbtClassMixin):
@dataclass
class CumulativeTypeParams(dbtClassMixin):
window: Optional[MetricTimeWindow] = None
grain_to_date: Optional[TimeGranularity] = None
grain_to_date: Optional[str] = None
period_agg: PeriodAggregation = PeriodAggregation.FIRST
@@ -95,7 +103,9 @@ class MetricTypeParams(dbtClassMixin):
denominator: Optional[MetricInput] = None
expr: Optional[str] = None
window: Optional[MetricTimeWindow] = None
grain_to_date: Optional[TimeGranularity] = None
grain_to_date: Optional[TimeGranularity] = (
None # legacy, use cumulative_type_params.grain_to_date
)
metrics: Optional[List[MetricInput]] = None
conversion_type_params: Optional[ConversionTypeParams] = None
cumulative_type_params: Optional[CumulativeTypeParams] = None
@@ -121,7 +131,7 @@ class Metric(GraphResource):
type_params: MetricTypeParams
filter: Optional[WhereFilterIntersection] = None
metadata: Optional[SourceFileMetadata] = None
time_granularity: Optional[TimeGranularity] = None
time_granularity: Optional[str] = None
resource_type: Literal[NodeType.Metric]
meta: Dict[str, Any] = field(default_factory=dict, metadata=MergeBehavior.Update.meta())
tags: List[str] = field(default_factory=list)

View File

@@ -1,8 +1,9 @@
import enum
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Literal, Optional
from dbt.artifacts.resources.types import AccessType, NodeType
from dbt.artifacts.resources.types import AccessType, NodeType, TimePeriod
from dbt.artifacts.resources.v1.components import (
CompiledResource,
DeferRelation,
@@ -11,7 +12,7 @@ from dbt.artifacts.resources.v1.components import (
from dbt.artifacts.resources.v1.config import NodeConfig
from dbt_common.contracts.config.base import MergeBehavior
from dbt_common.contracts.constraints import ModelLevelConstraint
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.dataclass_schema import ExtensibleDbtClassMixin, dbtClassMixin
@dataclass
@@ -34,6 +35,23 @@ class TimeSpine(dbtClassMixin):
custom_granularities: List[CustomGranularity] = field(default_factory=list)
class ModelFreshnessDependsOnOptions(enum.Enum):
all = "all"
any = "any"
@dataclass
class ModelBuildAfter(ExtensibleDbtClassMixin):
depends_on: ModelFreshnessDependsOnOptions = ModelFreshnessDependsOnOptions.any
count: int = 0
period: TimePeriod = TimePeriod.hour
@dataclass
class ModelFreshness(ExtensibleDbtClassMixin):
build_after: ModelBuildAfter = field(default_factory=ModelBuildAfter)
@dataclass
class Model(CompiledResource):
resource_type: Literal[NodeType.Model]
@@ -46,6 +64,7 @@ class Model(CompiledResource):
defer_relation: Optional[DeferRelation] = None
primary_key: List[str] = field(default_factory=list)
time_spine: Optional[TimeSpine] = None
freshness: Optional[ModelFreshness] = None
def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)

View File

@@ -1,10 +1,10 @@
from dataclasses import dataclass
from typing import Optional
from typing import List, Optional, Union
from dbt_common.contracts.config.properties import AdditionalPropertiesAllowed
@dataclass
class Owner(AdditionalPropertiesAllowed):
email: Optional[str] = None
email: Union[str, List[str], None] = None
name: Optional[str] = None

View File

@@ -2,16 +2,18 @@ from __future__ import annotations
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Literal, Optional
from typing import Any, Dict, List, Literal, Optional, Union
from dbt.artifacts.resources.base import GraphResource
from dbt.artifacts.resources.types import NodeType
from dbt.artifacts.resources.v1.components import DependsOn, RefArgs
from dbt.artifacts.resources.v1.config import list_str, metas
from dbt.artifacts.resources.v1.semantic_layer_components import (
SourceFileMetadata,
WhereFilterIntersection,
)
from dbt_common.contracts.config.base import BaseConfig, CompareBehavior, MergeBehavior
from dbt_common.contracts.config.metadata import ShowBehavior
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_semantic_interfaces.type_enums.export_destination_type import (
ExportDestinationType,
@@ -95,6 +97,10 @@ class SavedQuery(SavedQueryMandatory):
depends_on: DependsOn = field(default_factory=DependsOn)
created_at: float = field(default_factory=lambda: time.time())
refs: List[RefArgs] = field(default_factory=list)
tags: Union[List[str], str] = field(
default_factory=list_str,
metadata=metas(ShowBehavior.Hide, MergeBehavior.Append, CompareBehavior.Exclude),
)
@property
def metrics(self) -> List[str]:

View File

@@ -12,17 +12,21 @@ from dbt_semantic_interfaces.parsing.where_filter.where_filter_parser import (
class WhereFilter(dbtClassMixin):
where_sql_template: str
@property
def call_parameter_sets(self) -> FilterCallParameterSets:
return WhereFilterParser.parse_call_parameter_sets(self.where_sql_template)
def call_parameter_sets(
self, custom_granularity_names: Sequence[str]
) -> FilterCallParameterSets:
return WhereFilterParser.parse_call_parameter_sets(
self.where_sql_template, custom_granularity_names=custom_granularity_names
)
@dataclass
class WhereFilterIntersection(dbtClassMixin):
where_filters: List[WhereFilter]
@property
def filter_expression_parameter_sets(self) -> Sequence[Tuple[str, FilterCallParameterSets]]:
def filter_expression_parameter_sets(
self, custom_granularity_names: Sequence[str]
) -> Sequence[Tuple[str, FilterCallParameterSets]]:
raise NotImplementedError

View File

@@ -31,6 +31,14 @@ https://github.com/dbt-labs/dbt-semantic-interfaces/blob/main/dbt_semantic_inter
"""
@dataclass
class SemanticLayerElementConfig(dbtClassMixin):
meta: Dict[str, Any] = field(
default_factory=dict,
metadata=MergeBehavior.Update.meta(),
)
@dataclass
class Defaults(dbtClassMixin):
agg_time_dimension: Optional[str] = None
@@ -72,6 +80,7 @@ class Dimension(dbtClassMixin):
type_params: Optional[DimensionTypeParams] = None
expr: Optional[str] = None
metadata: Optional[SourceFileMetadata] = None
config: Optional[SemanticLayerElementConfig] = None
@property
def reference(self) -> DimensionReference:
@@ -106,6 +115,7 @@ class Entity(dbtClassMixin):
label: Optional[str] = None
role: Optional[str] = None
expr: Optional[str] = None
config: Optional[SemanticLayerElementConfig] = None
@property
def reference(self) -> EntityReference:
@@ -147,6 +157,7 @@ class Measure(dbtClassMixin):
agg_params: Optional[MeasureAggregationParameters] = None
non_additive_dimension: Optional[NonAdditiveDimension] = None
agg_time_dimension: Optional[str] = None
config: Optional[SemanticLayerElementConfig] = None
@property
def reference(self) -> MeasureReference:

View File

@@ -20,7 +20,7 @@ class SnapshotMetaColumnNames(dbtClassMixin):
class SnapshotConfig(NodeConfig):
materialized: str = "snapshot"
strategy: Optional[str] = None
unique_key: Optional[Union[str, List[str]]] = None
unique_key: Union[str, List[str], None] = None
target_schema: Optional[str] = None
target_database: Optional[str] = None
updated_at: Optional[str] = None

View File

@@ -59,6 +59,7 @@ class ParsedSourceMandatory(GraphResource, HasRelationMetadata):
class SourceDefinition(ParsedSourceMandatory):
quoting: Quoting = field(default_factory=Quoting)
loaded_at_field: Optional[str] = None
loaded_at_query: Optional[str] = None
freshness: Optional[FreshnessThreshold] = None
external: Optional[ExternalTable] = None
description: str = ""
@@ -73,3 +74,4 @@ class SourceDefinition(ParsedSourceMandatory):
created_at: float = field(default_factory=lambda: time.time())
unrendered_database: Optional[str] = None
unrendered_schema: Optional[str] = None
doc_blocks: List[str] = field(default_factory=list)

View File

@@ -12,7 +12,7 @@ from dbt_common.clients.system import read_json, write_json
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.events.functions import get_metadata_vars
from dbt_common.exceptions import DbtInternalError, DbtRuntimeError
from dbt_common.invocation import get_invocation_id
from dbt_common.invocation import get_invocation_id, get_invocation_started_at
BASE_SCHEMAS_URL = "https://schemas.getdbt.com/"
SCHEMA_PATH = "dbt/{name}/v{version}.json"
@@ -57,6 +57,9 @@ class BaseArtifactMetadata(dbtClassMixin):
dbt_version: str = __version__
generated_at: datetime = dataclasses.field(default_factory=datetime.utcnow)
invocation_id: Optional[str] = dataclasses.field(default_factory=get_invocation_id)
invocation_started_at: Optional[datetime] = dataclasses.field(
default_factory=get_invocation_started_at
)
env: Dict[str, str] = dataclasses.field(default_factory=get_metadata_vars)
def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):

View File

@@ -28,6 +28,7 @@ from dbt.artifacts.schemas.base import (
schema_version,
)
from dbt.artifacts.schemas.upgrades import upgrade_manifest_json
from dbt_common.exceptions import DbtInternalError
NodeEdgeMap = Dict[str, List[str]]
UniqueID = str
@@ -180,3 +181,13 @@ class WritableManifest(ArtifactMixin):
if manifest_schema_version < cls.dbt_schema_version.version:
data = upgrade_manifest_json(data, manifest_schema_version)
return cls.from_dict(data)
@classmethod
def validate(cls, _):
# When dbt try to load an artifact with additional optional fields
# that are not present in the schema, from_dict will work fine.
# As long as validate is not called, the schema will not be enforced.
# This is intentional, as it allows for safer schema upgrades.
raise DbtInternalError(
"The WritableManifest should never be validated directly to allow for schema upgrades."
)

View File

@@ -64,6 +64,7 @@ class NodeStatus(StrEnum):
PartialSuccess = "partial success"
Pass = "pass"
RuntimeErr = "runtime error"
NoOp = "no-op"
class RunStatus(StrEnum):
@@ -71,6 +72,7 @@ class RunStatus(StrEnum):
Error = NodeStatus.Error
Skipped = NodeStatus.Skipped
PartialSuccess = NodeStatus.PartialSuccess
NoOp = NodeStatus.NoOp
class TestStatus(StrEnum):

View File

@@ -140,6 +140,7 @@ def global_flags(func):
@p.warn_error
@p.warn_error_options
@p.write_json
@p.use_fast_test_edges
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
@@ -178,6 +179,7 @@ def cli(ctx, **kwargs):
@p.project_dir
@p.resource_type
@p.exclude_resource_type
@p.sample
@p.select
@p.selector
@p.show
@@ -554,6 +556,7 @@ def parse(ctx, **kwargs):
@p.empty
@p.event_time_start
@p.event_time_end
@p.sample
@p.select
@p.selector
@p.target_path

View File

@@ -1,6 +1,10 @@
from click import Choice, ParamType
from typing import Optional
import pytz
from click import Choice, Context, Parameter, ParamType
from dbt.config.utils import normalize_warn_error_options, parse_cli_yaml_string
from dbt.event_time.sample_window import SampleWindow
from dbt.events import ALL_EVENT_NAMES
from dbt.exceptions import OptionNotYamlDictError, ValidationError
from dbt_common.exceptions import DbtValidationError
@@ -88,3 +92,30 @@ class ChoiceTuple(Choice):
super().convert(value, param, ctx)
return value
class SampleType(ParamType):
name = "SAMPLE"
def convert(
self, value, param: Optional[Parameter], ctx: Optional[Context]
) -> Optional[SampleWindow]:
if value is None:
return None
if isinstance(value, str):
try:
# Try and identify if it's a "dict" or a "str"
if value.lstrip()[0] == "{":
param_option_name: str = param.opts[0] if param.opts else param.name # type: ignore
parsed_dict = parse_cli_yaml_string(value, param_option_name.strip("-"))
sample_window = SampleWindow.from_dict(parsed_dict)
sample_window.start = sample_window.start.replace(tzinfo=pytz.UTC)
sample_window.end = sample_window.end.replace(tzinfo=pytz.UTC)
return sample_window
else:
return SampleWindow.from_relative_string(value)
except Exception as e:
self.fail(e.__str__(), param, ctx)
else:
self.fail(f"Cannot load SAMPLE_WINDOW from type {type(value)}", param, ctx)

View File

@@ -2,11 +2,29 @@ from pathlib import Path
import click
from dbt.cli.option_types import YAML, ChoiceTuple, Package, WarnErrorOptionsType
from dbt.cli.option_types import (
YAML,
ChoiceTuple,
Package,
SampleType,
WarnErrorOptionsType,
)
from dbt.cli.options import MultiOption
from dbt.cli.resolvers import default_profiles_dir, default_project_dir
from dbt.version import get_version_information
# --- shared option specs --- #
model_decls = ("-m", "--models", "--model")
select_decls = ("-s", "--select")
select_attrs = {
"envvar": None,
"help": "Specify the nodes to include.",
"cls": MultiOption,
"multiple": True,
"type": tuple,
}
# --- The actual option definitions --- #
add_package = click.option(
"--add-package",
help="Add a package to current package spec, specify it as package-name@version. Change the source with --source flag.",
@@ -33,10 +51,10 @@ cache_selected_only = click.option(
help="At start of run, populate relational cache only for schemas containing selected nodes, or for all schemas of interest.",
)
introspect = click.option(
"--introspect/--no-introspect",
envvar="DBT_INTROSPECT",
help="Whether to scaffold introspective queries as part of compilation",
clean_project_files_only = click.option(
"--clean-project-files-only / --no-clean-project-files-only",
envvar="DBT_CLEAN_PROJECT_FILES_ONLY",
help="If disabled, dbt clean will delete all paths specified in clean-paths, even if they're outside the dbt project.",
default=True,
)
@@ -69,6 +87,13 @@ debug = click.option(
help="Display debug logging during dbt execution. Useful for debugging and making bug reports.",
)
debug_connection = click.option(
"--connection",
envvar=None,
help="Test the connection to the target database independent of dependency checks.",
is_flag=True,
)
# flag was previously named DEFER_MODE
defer = click.option(
"--defer/--no-defer",
@@ -76,6 +101,19 @@ defer = click.option(
help="If set, resolve unselected nodes by deferring to the manifest within the --state directory.",
)
defer_state = click.option(
"--defer-state",
envvar="DBT_DEFER_STATE",
help="Override the state directory for deferral only.",
type=click.Path(
dir_okay=True,
file_okay=False,
readable=True,
resolve_path=False,
path_type=Path,
),
)
deprecated_defer = click.option(
"--deprecated-defer",
envvar="DBT_DEFER_TO_STATE",
@@ -84,6 +122,44 @@ deprecated_defer = click.option(
hidden=True,
)
deprecated_favor_state = click.option(
"--deprecated-favor-state",
envvar="DBT_FAVOR_STATE_MODE",
help="Internal flag for deprecating old env var.",
)
# Renamed to --export-saved-queries
deprecated_include_saved_query = click.option(
"--include-saved-query/--no-include-saved-query",
envvar="DBT_INCLUDE_SAVED_QUERY",
help="Include saved queries in the list of resources to be selected for build command",
is_flag=True,
hidden=True,
)
deprecated_print = click.option(
"--deprecated-print/--deprecated-no-print",
envvar="DBT_NO_PRINT",
help="Internal flag for deprecating old env var.",
default=True,
hidden=True,
callback=lambda ctx, param, value: not value,
)
deprecated_state = click.option(
"--deprecated-state",
envvar="DBT_ARTIFACT_STATE_PATH",
help="Internal flag for deprecating old env var.",
hidden=True,
type=click.Path(
dir_okay=True,
file_okay=False,
readable=True,
resolve_path=True,
path_type=Path,
),
)
empty = click.option(
"--empty/--no-empty",
envvar="DBT_EMPTY",
@@ -91,6 +167,13 @@ empty = click.option(
is_flag=True,
)
empty_catalog = click.option(
"--empty-catalog",
help="If specified, generate empty catalog.json file during the `dbt docs generate` command.",
default=False,
is_flag=True,
)
event_time_end = click.option(
"--event-time-end",
envvar="DBT_EVENT_TIME_END",
@@ -116,6 +199,33 @@ exclude = click.option(
help="Specify the nodes to exclude.",
)
exclude_resource_type = click.option(
"--exclude-resource-types",
"--exclude-resource-type",
envvar="DBT_EXCLUDE_RESOURCE_TYPES",
help="Specify the types of resources that dbt will exclude",
type=ChoiceTuple(
[
"metric",
"semantic_model",
"saved_query",
"source",
"analysis",
"model",
"test",
"unit_test",
"exposure",
"snapshot",
"seed",
"default",
],
case_sensitive=False,
),
cls=MultiOption,
multiple=True,
default=(),
)
export_saved_queries = click.option(
"--export-saved-queries/--no-export-saved-queries",
envvar="DBT_EXPORT_SAVED_QUERIES",
@@ -137,12 +247,6 @@ favor_state = click.option(
help="If set, defer to the argument provided to the state flag for resolving unselected nodes, even if the node(s) exist as a database object in the current environment.",
)
deprecated_favor_state = click.option(
"--deprecated-favor-state",
envvar="DBT_FAVOR_STATE_MODE",
help="Internal flag for deprecating old env var.",
)
full_refresh = click.option(
"--full-refresh",
"-f",
@@ -167,6 +271,26 @@ indirect_selection = click.option(
default="eager",
)
inline = click.option(
"--inline",
envvar=None,
help="Pass SQL inline to dbt compile and show",
)
inline_direct = click.option(
"--inline-direct",
envvar=None,
help="Internal flag to pass SQL inline to dbt show. Do not load the entire project or apply templating.",
hidden=True,
)
introspect = click.option(
"--introspect/--no-introspect",
envvar="DBT_INTROSPECT",
help="Whether to scaffold introspective queries as part of compilation",
default=True,
)
lock = click.option(
"--lock",
envvar=None,
@@ -212,20 +336,6 @@ log_level_file = click.option(
default="debug",
)
use_colors = click.option(
"--use-colors/--no-use-colors",
envvar="DBT_USE_COLORS",
help="Specify whether log output is colorized in the console and the log file. Use --use-colors-file/--no-use-colors-file to colorize the log file differently than the console.",
default=True,
)
use_colors_file = click.option(
"--use-colors-file/--no-use-colors-file",
envvar="DBT_USE_COLORS_FILE",
help="Specify whether log file output is colorized by overriding the default value and the general --use-colors/--no-use-colors setting.",
default=True,
)
log_file_max_bytes = click.option(
"--log-file-max-bytes",
envvar="DBT_LOG_FILE_MAX_BYTES",
@@ -249,6 +359,8 @@ macro_debugging = click.option(
hidden=True,
)
models = click.option(*model_decls, **select_attrs) # type: ignore[arg-type]
# This less standard usage of --output where output_path below is more standard
output = click.option(
"--output",
@@ -258,22 +370,6 @@ output = click.option(
default="selector",
)
show_output_format = click.option(
"--output",
envvar=None,
help="Output format for dbt compile and dbt show",
type=click.Choice(["json", "text"], case_sensitive=False),
default="text",
)
show_limit = click.option(
"--limit",
envvar=None,
help="Limit the number of results returned by dbt show",
type=click.INT,
default=5,
)
output_keys = click.option(
"--output-keys",
envvar=None,
@@ -303,6 +399,14 @@ partial_parse = click.option(
default=True,
)
partial_parse_file_diff = click.option(
"--partial-parse-file-diff/--no-partial-parse-file-diff",
envvar="DBT_PARTIAL_PARSE_FILE_DIFF",
help="Internal flag for whether to compute a file diff during partial parsing.",
hidden=True,
default=True,
)
partial_parse_file_path = click.option(
"--partial-parse-file-path",
envvar="DBT_PARTIAL_PARSE_FILE_PATH",
@@ -312,11 +416,10 @@ partial_parse_file_path = click.option(
type=click.Path(exists=True, dir_okay=False, resolve_path=True),
)
partial_parse_file_diff = click.option(
"--partial-parse-file-diff/--no-partial-parse-file-diff",
envvar="DBT_PARTIAL_PARSE_FILE_DIFF",
help="Internal flag for whether to compute a file diff during partial parsing.",
hidden=True,
print = click.option(
"--print/--no-print",
envvar="DBT_PRINT",
help="Output all {{ print() }} macro calls.",
default=True,
)
@@ -335,22 +438,6 @@ port = click.option(
type=click.INT,
)
print = click.option(
"--print/--no-print",
envvar="DBT_PRINT",
help="Output all {{ print() }} macro calls.",
default=True,
)
deprecated_print = click.option(
"--deprecated-print/--deprecated-no-print",
envvar="DBT_NO_PRINT",
help="Internal flag for deprecating old env var.",
default=True,
hidden=True,
callback=lambda ctx, param, value: not value,
)
printer_width = click.option(
"--printer-width",
envvar="DBT_PRINTER_WIDTH",
@@ -399,6 +486,8 @@ quiet = click.option(
help="Suppress all non-error logging to stdout. Does not affect {{ print() }} macro calls.",
)
raw_select = click.option(*select_decls, **select_attrs) # type: ignore[arg-type]
record_timing_info = click.option(
"--record-timing-info",
"-r",
@@ -435,71 +524,19 @@ resource_type = click.option(
default=(),
)
exclude_resource_type = click.option(
"--exclude-resource-types",
"--exclude-resource-type",
envvar="DBT_EXCLUDE_RESOURCE_TYPES",
help="Specify the types of resources that dbt will exclude",
type=ChoiceTuple(
[
"metric",
"semantic_model",
"saved_query",
"source",
"analysis",
"model",
"test",
"unit_test",
"exposure",
"snapshot",
"seed",
"default",
],
case_sensitive=False,
),
cls=MultiOption,
multiple=True,
default=(),
)
# Renamed to --export-saved-queries
deprecated_include_saved_query = click.option(
"--include-saved-query/--no-include-saved-query",
envvar="DBT_INCLUDE_SAVED_QUERY",
help="Include saved queries in the list of resources to be selected for build command",
is_flag=True,
hidden=True,
)
model_decls = ("-m", "--models", "--model")
select_decls = ("-s", "--select")
select_attrs = {
"envvar": None,
"help": "Specify the nodes to include.",
"cls": MultiOption,
"multiple": True,
"type": tuple,
}
inline = click.option(
"--inline",
envvar=None,
help="Pass SQL inline to dbt compile and show",
)
inline_direct = click.option(
"--inline-direct",
envvar=None,
help="Internal flag to pass SQL inline to dbt show. Do not load the entire project or apply templating.",
hidden=True,
sample = click.option(
"--sample",
envvar="DBT_SAMPLE",
help="Run in sample mode with given SAMPLE_WINDOW spec, such that ref/source calls are sampled by the sample window.",
default=None,
type=SampleType(),
hidden=True, # TODO: Unhide
)
# `--select` and `--models` are analogous for most commands except `dbt list` for legacy reasons.
# Most CLI arguments should use the combined `select` option that aliases `--models` to `--select`.
# However, if you need to split out these separators (like `dbt ls`), use the `models` and `raw_select` options instead.
# See https://github.com/dbt-labs/dbt-core/pull/6774#issuecomment-1408476095 for more info.
models = click.option(*model_decls, **select_attrs) # type: ignore[arg-type]
raw_select = click.option(*select_decls, **select_attrs) # type: ignore[arg-type]
select = click.option(*select_decls, *model_decls, **select_attrs) # type: ignore[arg-type]
selector = click.option(
@@ -515,13 +552,6 @@ send_anonymous_usage_stats = click.option(
default=True,
)
clean_project_files_only = click.option(
"--clean-project-files-only / --no-clean-project-files-only",
envvar="DBT_CLEAN_PROJECT_FILES_ONLY",
help="If disabled, dbt clean will delete all paths specified in clean-paths, even if they're outside the dbt project.",
default=True,
)
show = click.option(
"--show",
envvar=None,
@@ -529,6 +559,29 @@ show = click.option(
is_flag=True,
)
show_limit = click.option(
"--limit",
envvar=None,
help="Limit the number of results returned by dbt show",
type=click.INT,
default=5,
)
show_output_format = click.option(
"--output",
envvar=None,
help="Output format for dbt compile and dbt show",
type=click.Choice(["json", "text"], case_sensitive=False),
default="text",
)
show_resource_report = click.option(
"--show-resource-report/--no-show-resource-report",
default=False,
envvar="DBT_SHOW_RESOURCE_REPORT",
hidden=True,
)
# TODO: The env var is a correction!
# The original env var was `DBT_TEST_SINGLE_THREADED`.
# This broke the existing naming convention.
@@ -550,13 +603,6 @@ skip_profile_setup = click.option(
is_flag=True,
)
empty_catalog = click.option(
"--empty-catalog",
help="If specified, generate empty catalog.json file during the `dbt docs generate` command.",
default=False,
is_flag=True,
)
source = click.option(
"--source",
envvar=None,
@@ -565,13 +611,6 @@ source = click.option(
default="hub",
)
static = click.option(
"--static",
help="Generate an additional static_index.html with manifest and catalog built-in.",
default=False,
is_flag=True,
)
state = click.option(
"--state",
envvar="DBT_STATE",
@@ -585,31 +624,11 @@ state = click.option(
),
)
defer_state = click.option(
"--defer-state",
envvar="DBT_DEFER_STATE",
help="Override the state directory for deferral only.",
type=click.Path(
dir_okay=True,
file_okay=False,
readable=True,
resolve_path=False,
path_type=Path,
),
)
deprecated_state = click.option(
"--deprecated-state",
envvar="DBT_ARTIFACT_STATE_PATH",
help="Internal flag for deprecating old env var.",
hidden=True,
type=click.Path(
dir_okay=True,
file_okay=False,
readable=True,
resolve_path=True,
path_type=Path,
),
static = click.option(
"--static",
help="Generate an additional static_index.html with manifest and catalog built-in.",
default=False,
is_flag=True,
)
static_parser = click.option(
@@ -640,20 +659,6 @@ target_path = click.option(
type=click.Path(),
)
upgrade = click.option(
"--upgrade",
envvar=None,
help="Upgrade packages to the latest version.",
is_flag=True,
)
debug_connection = click.option(
"--connection",
envvar=None,
help="Test the connection to the target database independent of dependency checks.",
is_flag=True,
)
threads = click.option(
"--threads",
envvar=None,
@@ -662,12 +667,40 @@ threads = click.option(
type=click.INT,
)
upgrade = click.option(
"--upgrade",
envvar=None,
help="Upgrade packages to the latest version.",
is_flag=True,
)
use_colors = click.option(
"--use-colors/--no-use-colors",
envvar="DBT_USE_COLORS",
help="Specify whether log output is colorized in the console and the log file. Use --use-colors-file/--no-use-colors-file to colorize the log file differently than the console.",
default=True,
)
use_colors_file = click.option(
"--use-colors-file/--no-use-colors-file",
envvar="DBT_USE_COLORS_FILE",
help="Specify whether log file output is colorized by overriding the default value and the general --use-colors/--no-use-colors setting.",
default=True,
)
use_experimental_parser = click.option(
"--use-experimental-parser/--no-use-experimental-parser",
envvar="DBT_USE_EXPERIMENTAL_PARSER",
help="Enable experimental parsing features.",
)
use_fast_test_edges = click.option(
"--use-fast-test-edges/--no-use-fast-test-edges",
envvar="DBT_USE_FAST_TEST_EDGES",
default=False,
hidden=True,
)
vars = click.option(
"--vars",
envvar=None,
@@ -728,10 +761,3 @@ write_json = click.option(
help="Whether or not to write the manifest.json and run_results.json files to the target directory",
default=True,
)
show_resource_report = click.option(
"--show-resource-report/--no-show-resource-report",
default=False,
envvar="DBT_SHOW_RESOURCE_REPORT",
hidden=True,
)

View File

@@ -1,8 +1,9 @@
import dataclasses
import json
import os
import pickle
from collections import defaultdict
from typing import Any, Dict, List, Optional, Tuple
from collections import defaultdict, deque
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
import networkx as nx # type: ignore
import sqlparse
@@ -117,6 +118,16 @@ def _get_tests_for_node(manifest: Manifest, unique_id: UniqueID) -> List[UniqueI
return tests
@dataclasses.dataclass
class SeenDetails:
node_id: UniqueID
visits: int = 0
ancestors: Set[UniqueID] = dataclasses.field(default_factory=set)
awaits_tests: Set[Tuple[UniqueID, Tuple[UniqueID, ...]]] = dataclasses.field(
default_factory=set
)
class Linker:
def __init__(self, data=None) -> None:
if data is None:
@@ -195,19 +206,62 @@ class Linker:
raise RuntimeError("Found a cycle: {}".format(cycle))
def add_test_edges(self, manifest: Manifest) -> None:
if not get_flags().USE_FAST_TEST_EDGES:
self.add_test_edges_1(manifest)
else:
self.add_test_edges_2(manifest)
def add_test_edges_1(self, manifest: Manifest) -> None:
"""This method adds additional edges to the DAG. For a given non-test
executable node, add an edge from an upstream test to the given node if
the set of nodes the test depends on is a subset of the upstream nodes
for the given node."""
# Given a graph:
# HISTORICAL NOTE: To understand the motivation behind this function,
# consider a node A with tests and a node B which depends (either directly
# or indirectly) on A. It would be nice if B were not executed until
# all of the tests on A are finished. After all, we don't want to
# propagate bad data. We can enforce that behavior by adding new
# dependencies (edges) from tests to nodes that should wait on them.
#
# This function implements a rough approximation of the behavior just
# described. In fact, for tests that only depend on a single node, it
# always works.
#
# Things get trickier for tests that depend on multiple nodes. In that
# case, if we are not careful, we will introduce cycles. That seems to
# be the reason this function adds dependencies from a downstream node to
# an upstream test if and only if the downstream node is already a
# descendant of all the nodes the upstream test depends on. By following
# that rule, it never makes the node dependent on new upstream nodes other
# than the tests themselves, and no cycles will be created.
#
# One drawback (Drawback 1) of the approach taken in this function is
# that it could still allow a downstream node to proceed before all
# testing is done on its ancestors, if it happens to have ancestors that
# are not also ancestors of a test with multiple dependencies.
#
# Another drawback (Drawback 2) is that the approach below adds far more
# edges than are strictly needed. After all, if we have A -> B -> C,
# there is no need to add a new edge A -> C. But this function often does.
#
# Drawback 2 is resolved in the new add_test_edges_2() implementation
# below, which is also typically much faster. Drawback 1 has been left in
# place in order to conservatively retain existing behavior, and so that
# the new implementation can be verified against this existing
# implementation by ensuring both resulting graphs have the same transitive
# reduction.
# MOTIVATING IDEA: Given a graph...
#
# model1 --> model2 --> model3
# | |
# | \/
# \/ test 2
# test1
#
# Produce the following graph:
# ...produce the following...
#
# model1 --> model2 --> model3
# | /\ | /\ /\
# | | \/ | |
@@ -247,6 +301,139 @@ class Linker:
if test_depends_on.issubset(upstream_nodes):
self.graph.add_edge(upstream_test, node_id, edge_type="parent_test")
def add_test_edges_2(self, manifest: Manifest):
graph = self.graph
new_edges = self._get_test_edges_2(graph, manifest)
for e in new_edges:
graph.add_edge(e[0], e[1], edge_type="parent_test")
@staticmethod
def _get_test_edges_2(
graph: nx.DiGraph, manifest: Manifest
) -> Iterable[Tuple[UniqueID, UniqueID]]:
# This function enforces the same execution behavior as add_test_edges,
# but executes far more quickly and adds far fewer edges. See the
# HISTORICAL NOTE above.
#
# The idea is to first scan for "single-tested" nodes (which have tests
# that depend only upon on that node) and "multi-tested" nodes (which
# have tests that depend on multiple nodes). Single-tested nodes are
# handled quickly and easily.
#
# The less common but more complex case of multi-tested nodes is handled
# by a specialized function.
new_edges: List[Tuple[UniqueID, UniqueID]] = []
source_nodes: List[UniqueID] = []
executable_nodes: Set[UniqueID] = set()
multi_tested_nodes = set()
# Dictionary mapping nodes with single-dep tests to a list of those tests.
single_tested_nodes: dict[UniqueID, List[UniqueID]] = defaultdict(list)
for node_id in graph.nodes:
manifest_node = manifest.nodes.get(node_id, None)
if manifest_node is None:
continue
if next(graph.predecessors(node_id), None) is None:
source_nodes.append(node_id)
if manifest_node.resource_type != NodeType.Test:
executable_nodes.add(node_id)
else:
test_deps = manifest_node.depends_on_nodes
if len(test_deps) == 1:
single_tested_nodes[test_deps[0]].append(node_id)
elif len(test_deps) > 1:
multi_tested_nodes.update(manifest_node.depends_on_nodes)
# Now that we have all the necessary information conveniently organized,
# add new edges for single-tested nodes.
for node_id, test_ids in single_tested_nodes.items():
succs = [s for s in graph.successors(node_id) if s in executable_nodes]
for succ_id in succs:
for test_id in test_ids:
new_edges.append((test_id, succ_id))
# Get the edges for multi-tested nodes separately, if needed.
if len(multi_tested_nodes) > 0:
multi_test_edges = Linker._get_multi_test_edges(
graph, manifest, source_nodes, executable_nodes, multi_tested_nodes
)
new_edges += multi_test_edges
return new_edges
@staticmethod
def _get_multi_test_edges(
graph: nx.DiGraph,
manifest: Manifest,
source_nodes: Iterable[UniqueID],
executable_nodes: Set[UniqueID],
multi_tested_nodes,
) -> List[Tuple[UniqueID, UniqueID]]:
# Works through the graph in a breadth-first style, processing nodes from
# a ready queue which initially consists of nodes with no ancestors,
# and adding more nodes to the ready queue after all their ancestors
# have been processed. All the while, the relevant details of all nodes
# "seen" by the search so far are maintained in a SeenDetails record,
# including the ancestor set which tests it is "awaiting" (i.e. tests of
# its ancestors). The processing step adds test edges when every dependency
# of an awaited test is an ancestor of a node that is being processed.
# Downstream nodes are then exempted from awaiting the test.
#
# Memory consumption is potentially O(n^2) with n the number of nodes in
# the graph, since the average number of ancestors and tests being awaited
# for each of the n nodes could itself be O(n) but we only track ancestors
# that are multi-tested, which should keep things closer to O(n) in real-
# world scenarios.
new_edges: List[Tuple[UniqueID, UniqueID]] = []
ready: deque = deque(source_nodes)
details = {node_id: SeenDetails(node_id) for node_id in source_nodes}
while len(ready) > 0:
curr_details: SeenDetails = details[ready.pop()]
test_ids = _get_tests_for_node(manifest, curr_details.node_id)
new_awaits_for_succs = curr_details.awaits_tests.copy()
for test_id in test_ids:
deps: List[UniqueID] = sorted(manifest.nodes[test_id].depends_on_nodes)
if len(deps) > 1:
# Tests with only one dep were already handled.
new_awaits_for_succs.add((test_id, tuple(deps)))
for succ_id in [
s for s in graph.successors(curr_details.node_id) if s in executable_nodes
]:
suc_details = details.get(succ_id, None)
if suc_details is None:
suc_details = SeenDetails(succ_id)
details[succ_id] = suc_details
suc_details.visits += 1
suc_details.awaits_tests.update(new_awaits_for_succs)
suc_details.ancestors.update(curr_details.ancestors)
if curr_details.node_id in multi_tested_nodes:
# Only track ancestry information for the set of nodes
# we will actually check against later.
suc_details.ancestors.add(curr_details.node_id)
if suc_details.visits == graph.in_degree(succ_id):
if len(suc_details.awaits_tests) > 0:
removes = set()
for awt in suc_details.awaits_tests:
if not any(True for a in awt[1] if a not in suc_details.ancestors):
removes.add(awt)
new_edges.append((awt[0], succ_id))
suc_details.awaits_tests.difference_update(removes)
ready.appendleft(succ_id)
# We are now done with the current node and all of its ancestors.
# Discard its details to save memory.
del details[curr_details.node_id]
return new_edges
def get_graph(self, manifest: Manifest) -> Graph:
self.link_graph(manifest)
return Graph(self.graph)

View File

@@ -60,6 +60,7 @@ class Profile(HasCredentials):
credentials: Credentials
profile_env_vars: Dict[str, Any]
log_cache_events: bool
secondary_profiles: Dict[str, "Profile"]
def __init__(
self,
@@ -79,6 +80,7 @@ class Profile(HasCredentials):
self.log_cache_events = (
get_flags().LOG_CACHE_EVENTS
) # never available on init, set for adapter instantiation via AdapterRequiredConfig
self.secondary_profiles = {}
def to_profile_info(self, serialize_credentials: bool = False) -> Dict[str, Any]:
"""Unlike to_project_config, this dict is not a mirror of any existing
@@ -257,6 +259,7 @@ defined in your profiles.yml file. You can find profiles.yml here:
profile_name: str,
target_override: Optional[str],
renderer: ProfileRenderer,
is_secondary: bool = False,
) -> Tuple[str, Dict[str, Any]]:
"""This is a containment zone for the hateful way we're rendering
profiles.
@@ -273,6 +276,12 @@ defined in your profiles.yml file. You can find profiles.yml here:
elif "target" in raw_profile:
# render the target if it was parsed from yaml
target_name = renderer.render_value(raw_profile["target"])
elif is_secondary and len(raw_profile.get("outputs", [])) == 1:
# if we only have one target, we can infer the target name
# currently, this is only used for secondary profiles
target_name = next(iter(raw_profile["outputs"]))
# the event name is slightly misleading, but the message indicates that we inferred the target name for a profile
fire_event(MissingProfileTarget(profile_name=profile_name, target_name=target_name))
else:
target_name = "default"
fire_event(MissingProfileTarget(profile_name=profile_name, target_name=target_name))
@@ -293,6 +302,7 @@ defined in your profiles.yml file. You can find profiles.yml here:
renderer: ProfileRenderer,
target_override: Optional[str] = None,
threads_override: Optional[int] = None,
is_secondary: bool = False,
) -> "Profile":
"""Create a profile from its raw profile information.
@@ -312,9 +322,14 @@ defined in your profiles.yml file. You can find profiles.yml here:
"""
# TODO: should it be, and the values coerced to bool?
target_name, profile_data = cls.render_profile(
raw_profile, profile_name, target_override, renderer
raw_profile, profile_name, target_override, renderer, is_secondary=is_secondary
)
if is_secondary and "secondary_profiles" in profile_data:
raise DbtProfileError(
f"Secondary profile '{profile_name}' cannot have nested secondary profiles"
)
# valid connections never include the number of threads, but it's
# stored on a per-connection level in the raw configs
threads = profile_data.pop("threads", DEFAULT_THREADS)
@@ -325,13 +340,31 @@ defined in your profiles.yml file. You can find profiles.yml here:
profile_data, profile_name, target_name
)
return cls.from_credentials(
profile = cls.from_credentials(
credentials=credentials,
profile_name=profile_name,
target_name=target_name,
threads=threads,
)
for p in profile_data.pop("secondary_profiles", []):
for secondary_profile_name, secondary_raw_profile in p.items():
if secondary_profile_name in profile.secondary_profiles:
raise DbtProfileError(
f"Secondary profile '{secondary_profile_name}' is already defined"
)
profile.secondary_profiles[secondary_profile_name] = cls.from_raw_profile_info(
secondary_raw_profile,
secondary_profile_name,
renderer,
target_override=target_override,
threads_override=threads_override,
is_secondary=True,
)
return profile
@classmethod
def from_raw_profiles(
cls,

View File

@@ -199,6 +199,9 @@ def load_raw_project(project_root: str) -> Dict[str, Any]:
if not isinstance(project_dict, dict):
raise DbtProjectError(f"{DBT_PROJECT_FILE_NAME} does not parse to a dictionary")
if "tests" in project_dict and "data_tests" not in project_dict:
project_dict["data_tests"] = project_dict.pop("tests")
return project_dict

View File

@@ -1,8 +1,7 @@
import itertools
import os
from copy import deepcopy
from dataclasses import dataclass, field
from datetime import datetime
from dataclasses import dataclass
from pathlib import Path
from typing import (
Any,
@@ -16,8 +15,6 @@ from typing import (
Type,
)
import pytz
from dbt import tracking
from dbt.adapters.contracts.connection import (
AdapterRequiredConfig,
@@ -101,7 +98,6 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
profile_name: str
cli_vars: Dict[str, Any]
dependencies: Optional[Mapping[str, "RuntimeConfig"]] = None
invoked_at: datetime = field(default_factory=lambda: datetime.now(pytz.UTC))
def __post_init__(self):
self.validate()
@@ -188,6 +184,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
project_env_vars=project.project_env_vars,
restrict_access=project.restrict_access,
profile_env_vars=profile.profile_env_vars,
secondary_profiles=profile.secondary_profiles,
profile_name=profile.profile_name,
target_name=profile.target_name,
threads=profile.threads,

View File

@@ -28,7 +28,13 @@ from dbt.adapters.factory import (
get_adapter_package_names,
get_adapter_type_names,
)
from dbt.artifacts.resources import NodeConfig, NodeVersion, RefArgs, SourceConfig
from dbt.artifacts.resources import (
NodeConfig,
NodeVersion,
RefArgs,
SeedConfig,
SourceConfig,
)
from dbt.clients.jinja import (
MacroGenerator,
MacroStack,
@@ -56,6 +62,7 @@ from dbt.contracts.graph.nodes import (
Resource,
SeedNode,
SemanticModel,
SnapshotNode,
SourceDefinition,
UnitTestNode,
)
@@ -237,23 +244,61 @@ class BaseResolver(metaclass=abc.ABCMeta):
def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeFilter]:
event_time_filter = None
if (
(isinstance(target.config, NodeConfig) or isinstance(target.config, SourceConfig))
and target.config.event_time
and isinstance(self.model, ModelNode)
and self.model.config.materialized == "incremental"
and self.model.config.incremental_strategy == "microbatch"
and self.manifest.use_microbatch_batches(project_name=self.config.project_name)
and self.model.batch is not None
):
start = self.model.batch.event_time_start
end = self.model.batch.event_time_end
sample_mode = bool(
os.environ.get("DBT_EXPERIMENTAL_SAMPLE_MODE")
and getattr(self.config.args, "sample", None)
)
if start is not None or end is not None:
# TODO The number of branches here is getting rough. We should consider ways to simplify
# what is going on to make it easier to maintain
# Only do event time filtering if the base node has the necessary event time configs
if (
isinstance(target.config, (NodeConfig, SeedConfig, SourceConfig))
and target.config.event_time
and isinstance(self.model, (ModelNode, SnapshotNode))
):
# Handling of microbatch models
if (
isinstance(self.model, ModelNode)
and self.model.config.materialized == "incremental"
and self.model.config.incremental_strategy == "microbatch"
and self.manifest.use_microbatch_batches(project_name=self.config.project_name)
and self.model.batch is not None
):
# Sample mode microbatch models
if sample_mode:
start = (
self.config.args.sample.start
if self.config.args.sample.start > self.model.batch.event_time_start
else self.model.batch.event_time_start
)
end = (
self.config.args.sample.end
if self.config.args.sample.end < self.model.batch.event_time_end
else self.model.batch.event_time_end
)
event_time_filter = EventTimeFilter(
field_name=target.config.event_time,
start=start,
end=end,
)
# Regular microbatch models
else:
event_time_filter = EventTimeFilter(
field_name=target.config.event_time,
start=self.model.batch.event_time_start,
end=self.model.batch.event_time_end,
)
# Sample mode _non_ microbatch models
elif sample_mode:
event_time_filter = EventTimeFilter(
field_name=target.config.event_time,
start=start,
end=end,
start=self.config.args.sample.start,
end=self.config.args.sample.end,
)
return event_time_filter
@@ -880,7 +925,7 @@ T = TypeVar("T")
# Base context collection, used for parsing configs.
class ProviderContext(ManifestContext):
# subclasses are MacroContext, ModelContext, TestContext
# subclasses are MacroContext, ModelContext, TestContext, SourceContext
def __init__(
self,
model,
@@ -893,7 +938,7 @@ class ProviderContext(ManifestContext):
raise DbtInternalError(f"Invalid provider given to context: {provider}")
# mypy appeasement - we know it'll be a RuntimeConfig
self.config: RuntimeConfig
self.model: Union[Macro, ManifestNode] = model
self.model: Union[Macro, ManifestNode, SourceDefinition] = model
super().__init__(config, manifest, model.package_name)
self.sql_results: Dict[str, Optional[AttrDict]] = {}
self.context_config: Optional[ContextConfig] = context_config
@@ -1558,6 +1603,20 @@ class MacroContext(ProviderContext):
self._search_package = search_package
class SourceContext(ProviderContext):
# SourceContext is being used to render jinja SQL during execution of
# custom SQL in source freshness. It is not used for parsing.
model: SourceDefinition
@contextproperty()
def this(self) -> Optional[RelationProxy]:
return self.db_wrapper.Relation.create_from(self.config, self.model)
@contextproperty()
def source_node(self) -> SourceDefinition:
return self.model
class ModelContext(ProviderContext):
model: ManifestNode

View File

@@ -60,6 +60,7 @@ from dbt.artifacts.resources import SourceDefinition as SourceDefinitionResource
from dbt.artifacts.resources import SqlOperation as SqlOperationResource
from dbt.artifacts.resources import TimeSpine
from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource
from dbt.artifacts.resources.v1.model import ModelFreshness
from dbt.artifacts.schemas.batch_results import BatchResults
from dbt.clients.jinja_static import statically_extract_has_name_this
from dbt.contracts.graph.model_config import UnitTestNodeConfig
@@ -1378,7 +1379,7 @@ class SourceDefinition(
@dataclass
class Exposure(GraphNode, ExposureResource):
class Exposure(NodeInfoMixin, GraphNode, ExposureResource):
@property
def depends_on_nodes(self):
return self.depends_on.nodes
@@ -1441,6 +1442,12 @@ class Exposure(GraphNode, ExposureResource):
def group(self):
return None
def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
if "_event_status" in dct:
del dct["_event_status"]
return dct
# ====================================
# Metric node
@@ -1524,7 +1531,7 @@ class Group(GroupResource, BaseNode):
return {
"name": self.name,
"package_name": self.package_name,
"owner": self.owner.to_dict(omit_none=True),
"owner": {k: str(v) for k, v in self.owner.to_dict(omit_none=True).items()},
}
@@ -1641,6 +1648,9 @@ class SavedQuery(NodeInfoMixin, GraphNode, SavedQueryResource):
return True
def same_tags(self, old: "SavedQuery") -> bool:
return self.tags == old.tags
def same_contents(self, old: Optional["SavedQuery"]) -> bool:
# existing when it didn't before is a change!
# metadata/tags changes are not "changes"
@@ -1656,9 +1666,16 @@ class SavedQuery(NodeInfoMixin, GraphNode, SavedQueryResource):
and self.same_config(old)
and self.same_group(old)
and self.same_exports(old)
and self.same_tags(old)
and True
)
def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
if "_event_status" in dct:
del dct["_event_status"]
return dct
# ====================================
# Patches
@@ -1686,6 +1703,7 @@ class ParsedNodePatch(ParsedPatch):
constraints: List[Dict[str, Any]]
deprecation_date: Optional[datetime]
time_spine: Optional[TimeSpine] = None
freshness: Optional[ModelFreshness] = None
@dataclass

View File

@@ -18,6 +18,7 @@ from dbt.artifacts.resources import (
MacroArgument,
MaturityType,
MeasureAggregationParameters,
ModelFreshness,
NodeVersion,
Owner,
Quoting,
@@ -27,8 +28,11 @@ from dbt.artifacts.resources import (
UnitTestOutputFixture,
UnitTestOverrides,
)
from dbt.artifacts.resources.v1.config import list_str, metas
from dbt.exceptions import ParsingError
from dbt.node_types import NodeType
from dbt_common.contracts.config.base import CompareBehavior, MergeBehavior
from dbt_common.contracts.config.metadata import ShowBehavior
from dbt_common.contracts.config.properties import AdditionalPropertiesMixin
from dbt_common.contracts.util import Mergeable
from dbt_common.dataclass_schema import (
@@ -221,6 +225,7 @@ class UnparsedModelUpdate(UnparsedNodeUpdate):
versions: Sequence[UnparsedVersion] = field(default_factory=list)
deprecation_date: Optional[datetime.datetime] = None
time_spine: Optional[TimeSpine] = None
freshness: Optional[ModelFreshness] = None
def __post_init__(self) -> None:
if self.latest_version:
@@ -317,6 +322,7 @@ class UnparsedSourceTableDefinition(HasColumnTests, HasColumnAndTestProps):
config: Dict[str, Any] = field(default_factory=dict)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
identifier: Optional[str] = None
quoting: Quoting = field(default_factory=Quoting)
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
@@ -342,6 +348,7 @@ class UnparsedSourceDefinition(dbtClassMixin):
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
tables: List[UnparsedSourceTableDefinition] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
config: Dict[str, Any] = field(default_factory=dict)
@@ -379,6 +386,7 @@ class SourceTablePatch(dbtClassMixin):
docs: Optional[Docs] = None
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
identifier: Optional[str] = None
quoting: Quoting = field(default_factory=Quoting)
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
@@ -422,6 +430,7 @@ class SourcePatch(dbtClassMixin):
freshness: Optional[Optional[FreshnessThreshold]] = field(default_factory=FreshnessThreshold)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
tables: Optional[List[SourceTablePatch]] = None
tags: Optional[List[str]] = None
@@ -564,7 +573,7 @@ class UnparsedMetricInput(dbtClassMixin):
filter: Union[str, List[str], None] = None
alias: Optional[str] = None
offset_window: Optional[str] = None
offset_to_grain: Optional[str] = None # str is really a TimeGranularity Enum
offset_to_grain: Optional[str] = None
@dataclass
@@ -661,6 +670,7 @@ class UnparsedEntity(dbtClassMixin):
label: Optional[str] = None
role: Optional[str] = None
expr: Optional[str] = None
config: Dict[str, Any] = field(default_factory=dict)
@dataclass
@@ -681,6 +691,7 @@ class UnparsedMeasure(dbtClassMixin):
non_additive_dimension: Optional[UnparsedNonAdditiveDimension] = None
agg_time_dimension: Optional[str] = None
create_metric: bool = False
config: Dict[str, Any] = field(default_factory=dict)
@dataclass
@@ -698,6 +709,7 @@ class UnparsedDimension(dbtClassMixin):
is_partition: bool = False
type_params: Optional[UnparsedDimensionTypeParams] = None
expr: Optional[str] = None
config: Dict[str, Any] = field(default_factory=dict)
@dataclass
@@ -740,6 +752,12 @@ class UnparsedSavedQuery(dbtClassMixin):
label: Optional[str] = None
exports: List[UnparsedExport] = field(default_factory=list)
config: Dict[str, Any] = field(default_factory=dict)
# Note: the order of the types is critical; it's the order that they will be checked against inputs.
# if reversed, a single-string tag like `tag: "good"` becomes ['g','o','o','d']
tags: Union[str, List[str]] = field(
default_factory=list_str,
metadata=metas(ShowBehavior.Hide, MergeBehavior.Append, CompareBehavior.Exclude),
)
def normalize_date(d: Optional[datetime.date]) -> Optional[datetime.datetime]:

View File

@@ -0,0 +1,40 @@
from datetime import datetime
from dateutil.relativedelta import relativedelta
from dbt.artifacts.resources.types import BatchSize
from dbt_common.exceptions import DbtRuntimeError
def offset_timestamp(timestamp=datetime, batch_size=BatchSize, offset=int) -> datetime:
"""Offsets the passed in timestamp based on the batch_size and offset.
Note: THIS IS DIFFERENT FROM MicrobatchBuilder.offset_timestamp. That function first
`truncates` the timestamp, and then does delta addition subtraction from there. This
function _doesn't_ truncate the timestamp and uses `relativedelta` for specific edge
case handling (months, years), which may produce different results than the delta math
done in `MicrobatchBuilder.offset_timestamp`
Examples
2024-09-17 16:06:00 + Batchsize.hour -1 -> 2024-09-17 15:06:00
2024-09-17 16:06:00 + Batchsize.hour +1 -> 2024-09-17 17:06:00
2024-09-17 16:06:00 + Batchsize.day -1 -> 2024-09-16 16:06:00
2024-09-17 16:06:00 + Batchsize.day +1 -> 2024-09-18 16:06:00
2024-09-17 16:06:00 + Batchsize.month -1 -> 2024-08-17 16:06:00
2024-09-17 16:06:00 + Batchsize.month +1 -> 2024-10-17 16:06:00
2024-09-17 16:06:00 + Batchsize.year -1 -> 2023-09-17 16:06:00
2024-09-17 16:06:00 + Batchsize.year +1 -> 2025-09-17 16:06:00
2024-01-31 16:06:00 + Batchsize.month +1 -> 2024-02-29 16:06:00
2024-02-29 16:06:00 + Batchsize.year +1 -> 2025-02-28 16:06:00
"""
if batch_size == BatchSize.hour:
return timestamp + relativedelta(hours=offset)
elif batch_size == BatchSize.day:
return timestamp + relativedelta(days=offset)
elif batch_size == BatchSize.month:
return timestamp + relativedelta(months=offset)
elif batch_size == BatchSize.year:
return timestamp + relativedelta(years=offset)
else:
raise DbtRuntimeError(f"Unhandled batch_size '{batch_size}'")

View File

@@ -0,0 +1,60 @@
from __future__ import annotations
from datetime import datetime
import pytz
from attr import dataclass
from dbt.artifacts.resources.types import BatchSize
from dbt.event_time.event_time import offset_timestamp
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.exceptions import DbtRuntimeError
@dataclass
class SampleWindow(dbtClassMixin):
start: datetime
end: datetime
def __post_serialize__(self, data, context):
# This is insane, but necessary, I apologize. Mashumaro handles the
# dictification of this class via a compile time generated `to_dict`
# method based off of the _typing_ of th class. By default `datetime`
# types are converted to strings. We don't want that, we want them to
# stay datetimes.
# Note: This is safe because the `SampleWindow` isn't part of the artifact
# and thus doesn't get written out.
new_data = super().__post_serialize__(data, context)
new_data["start"] = self.start
new_data["end"] = self.end
return new_data
@classmethod
def from_relative_string(cls, relative_string: str) -> SampleWindow:
end = datetime.now(tz=pytz.UTC)
relative_window = relative_string.split(" ")
if len(relative_window) != 2:
raise DbtRuntimeError(
f"Cannot load SAMPLE_WINDOW from '{relative_string}'. Must be of form 'DAYS_INT GRAIN_SIZE'."
)
try:
lookback = int(relative_window[0])
except Exception:
raise DbtRuntimeError(f"Unable to convert '{relative_window[0]}' to an integer.")
try:
batch_size_string = relative_window[1].lower().rstrip("s")
batch_size = BatchSize[batch_size_string]
except Exception:
grains = [size.value for size in BatchSize]
grain_plurals = [BatchSize.plural(size) for size in BatchSize]
valid_grains = grains + grain_plurals
raise DbtRuntimeError(
f"Invalid grain size '{relative_window[1]}'. Must be one of {valid_grains}."
)
start = offset_timestamp(timestamp=end, batch_size=batch_size, offset=-1 * lookback)
return cls(start=start, end=end)

View File

@@ -928,6 +928,18 @@ message MicrobatchModelNoEventTimeInputsMsg {
}
// I075
message InvalidConcurrentBatchesConfig {
int32 num_models = 1;
string adapter_type = 2;
}
message InvalidConcurrentBatchesConfigMsg {
CoreEventInfo info = 1;
InvalidConcurrentBatchesConfig data = 2;
}
// M - Deps generation
@@ -1343,8 +1355,23 @@ message LogTestResultMsg {
LogTestResult data = 2;
}
// Q008
message LogNodeResult {
NodeInfo node_info = 1;
string description = 2;
string status = 3;
int32 index = 4;
int32 total = 5;
float execution_time = 6;
string msg = 7;
}
// Skipped Q008, Q009, Q010
message LogNodeResultMsg {
CoreEventInfo info = 1;
LogNodeResult data = 2;
}
// Skipped Q009, Q010
// Q011
@@ -1388,6 +1415,7 @@ message LogSnapshotResult {
float execution_time = 6;
map<string, string> cfg = 7;
string result_message = 8;
Group group = 9;
}
message LogSnapshotResultMsg {
@@ -1405,6 +1433,7 @@ message LogSeedResult {
float execution_time = 6;
string schema = 7;
string relation = 8;
Group group = 9;
}
message LogSeedResultMsg {
@@ -1578,6 +1607,7 @@ message SkippingDetails {
string node_name = 4;
int32 index = 5;
int32 total = 6;
Group group = 7;
}
message SkippingDetailsMsg {
@@ -1690,6 +1720,35 @@ message MicrobatchExecutionDebugMsg {
MicrobatchExecutionDebug data = 2;
}
// Q045
message LogStartBatch {
NodeInfo node_info = 1;
string description = 2;
int32 batch_index = 3;
int32 total_batches = 4;
}
message LogStartBatchMsg {
CoreEventInfo info = 1;
LogStartBatch data = 2;
}
// Q046
message LogBatchResult {
NodeInfo node_info = 1;
string description = 2;
string status = 3;
int32 batch_index = 4;
int32 total_batches = 5;
float execution_time = 6;
Group group = 7;
}
message LogBatchResultMsg {
CoreEventInfo info = 1;
LogBatchResult data = 2;
}
// W - Node testing
// Skipped W001
@@ -2008,6 +2067,7 @@ message LogSkipBecauseError {
int32 index = 3;
int32 total = 4;
string status = 5;
Group group = 6;
}
message LogSkipBecauseErrorMsg {

File diff suppressed because one or more lines are too long

View File

@@ -967,6 +967,16 @@ class MicrobatchModelNoEventTimeInputs(WarnLevel):
return warning_tag(msg)
class InvalidConcurrentBatchesConfig(WarnLevel):
def code(self) -> str:
return "I075"
def message(self) -> str:
maybe_plural_count_of_models = pluralize(self.num_models, "microbatch model")
description = f"Found {maybe_plural_count_of_models} with the `concurrent_batches` config set to true, but the {self.adapter_type} adapter does not support running batches concurrently. Batches will be run sequentially."
return line_wrap_message(warning_tag(description))
# =======================================================
# M - Deps generation
# =======================================================
@@ -1332,7 +1342,15 @@ class LogTestResult(DynamicLevel):
return EventLevel.INFO
# Skipped Q008, Q009, Q010
class LogNodeResult(DynamicLevel):
def code(self) -> str:
return "Q008"
def message(self) -> str:
return self.msg
# Skipped Q009, Q010
class LogStartLine(InfoLevel):
@@ -1710,6 +1728,51 @@ class MicrobatchExecutionDebug(DebugLevel):
return self.msg
class LogStartBatch(InfoLevel):
def code(self) -> str:
return "Q045"
def message(self) -> str:
msg = f"START {self.description}"
# TODO update common so that we can append "batch" in `format_fancy_output_line`
formatted = format_fancy_output_line(
msg=msg,
status="RUN",
index=self.batch_index,
total=self.total_batches,
)
return f"Batch {formatted}"
class LogBatchResult(DynamicLevel):
def code(self) -> str:
return "Q046"
def message(self) -> str:
if self.status == "error":
info = "ERROR creating"
status = red(self.status.upper())
elif self.status == "skipped":
info = "SKIP"
status = yellow(self.status.upper())
else:
info = "OK created"
status = green(self.status)
msg = f"{info} {self.description}"
# TODO update common so that we can append "batch" in `format_fancy_output_line`
formatted = format_fancy_output_line(
msg=msg,
status=status,
index=self.batch_index,
total=self.total_batches,
execution_time=self.execution_time,
)
return f"Batch {formatted}"
# =======================================================
# W - Node testing
# =======================================================
@@ -1892,7 +1955,9 @@ class StatsLine(InfoLevel):
return "Z023"
def message(self) -> str:
stats_line = "Done. PASS={pass} WARN={warn} ERROR={error} SKIP={skip} TOTAL={total}"
stats_line = (
"Done. PASS={pass} WARN={warn} ERROR={error} SKIP={skip} NO-OP={noop} TOTAL={total}"
)
return stats_line.format(**self.stats)

View File

@@ -67,8 +67,14 @@ class Graph:
while len(selected) > 0 and (max_depth is None or i < max_depth):
next_layer: Set[UniqueId] = set()
for node in selected:
next_layer.update(self.descendants(node, 1))
next_layer = next_layer - children # Avoid re-searching
next_layer.update(
iter(
e[1]
for e in self.graph.out_edges(node)
if e[1] not in children
and self.filter_edges_by_type(e[0], e[1], "parent_test")
)
)
children.update(next_layer)
selected = next_layer
i += 1
@@ -86,8 +92,14 @@ class Graph:
while len(selected) > 0 and (max_depth is None or i < max_depth):
next_layer: Set[UniqueId] = set()
for node in selected:
next_layer.update(self.ancestors(node, 1))
next_layer = next_layer - parents # Avoid re-searching
next_layer.update(
iter(
e[0]
for e in self.graph.in_edges(node)
if e[0] not in parents
and self.filter_edges_by_type(e[0], e[1], "parent_test")
)
)
parents.update(next_layer)
selected = next_layer
i += 1

View File

@@ -178,10 +178,12 @@ class NodeSelector(MethodManager):
elif unique_id in self.manifest.saved_queries:
saved_query = self.manifest.saved_queries[unique_id]
return saved_query.config.enabled
node = self.manifest.nodes[unique_id]
return node.config.enabled
elif unique_id in self.manifest.exposures:
exposure = self.manifest.exposures[unique_id]
return exposure.config.enabled
else:
node = self.manifest.nodes[unique_id]
return node.config.enabled
def _is_empty_node(self, unique_id: UniqueId) -> bool:
if unique_id in self.manifest.nodes:

View File

@@ -680,17 +680,24 @@ class StateSelectorMethod(SelectorMethod):
def check_modified_content(
self, old: Optional[SelectorTarget], new: SelectorTarget, adapter_type: str
) -> bool:
different_contents = False
if isinstance(
new,
(SourceDefinition, Exposure, Metric, SemanticModel, UnitTestDefinition, SavedQuery),
):
# these all overwrite `same_contents`
different_contents = not new.same_contents(old) # type: ignore
else:
elif new: # because we also pull in deleted/disabled nodes, this could be None
different_contents = not new.same_contents(old, adapter_type) # type: ignore
upstream_macro_change = self.check_macros_modified(new)
return different_contents or upstream_macro_change
check_modified_contract = False
if isinstance(old, ModelNode):
func = self.check_modified_contract("same_contract", adapter_type)
check_modified_contract = func(old, new)
return different_contents or upstream_macro_change or check_modified_contract
def check_unmodified_content(
self, old: Optional[SelectorTarget], new: SelectorTarget, adapter_type: str
@@ -762,6 +769,7 @@ class StateSelectorMethod(SelectorMethod):
manifest: Manifest = self.previous_state.manifest
keyword_args = {} # initialize here to handle disabled node check below
for unique_id, node in self.all_nodes(included_nodes):
previous_node: Optional[SelectorTarget] = None
@@ -780,7 +788,6 @@ class StateSelectorMethod(SelectorMethod):
elif unique_id in manifest.saved_queries:
previous_node = SavedQuery.from_resource(manifest.saved_queries[unique_id])
keyword_args = {}
if checker.__name__ in [
"same_contract",
"check_modified_content",
@@ -792,7 +799,11 @@ class StateSelectorMethod(SelectorMethod):
yield unique_id
# checkers that can handle removed nodes
if checker.__name__ in ["check_modified_contract"]:
if checker.__name__ in [
"check_modified_contract",
"check_modified_content",
"check_unmodified_content",
]:
# ignore included_nodes, since those cannot contain removed nodes
for previous_unique_id, previous_node in manifest.nodes.items():
# detect removed (deleted, renamed, or disabled) nodes

View File

@@ -0,0 +1,18 @@
from __future__ import annotations
from multiprocessing.pool import ThreadPool
class DbtThreadPool(ThreadPool):
"""A ThreadPool that tracks whether or not it's been closed"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.closed = False
def close(self):
self.closed = True
super().close()
def is_closed(self):
return self.closed

View File

@@ -100,7 +100,8 @@ class MicrobatchBuilder:
return batches
def build_jinja_context_for_batch(self, incremental_batch: bool) -> Dict[str, Any]:
@staticmethod
def build_jinja_context_for_batch(model: ModelNode, incremental_batch: bool) -> Dict[str, Any]:
"""
Create context with entries that reflect microbatch model + incremental execution state
@@ -109,9 +110,9 @@ class MicrobatchBuilder:
jinja_context: Dict[str, Any] = {}
# Microbatch model properties
jinja_context["model"] = self.model.to_dict()
jinja_context["sql"] = self.model.compiled_code
jinja_context["compiled_code"] = self.model.compiled_code
jinja_context["model"] = model.to_dict()
jinja_context["sql"] = model.compiled_code
jinja_context["compiled_code"] = model.compiled_code
# Add incremental context variables for batches running incrementally
if incremental_batch:

View File

@@ -229,92 +229,6 @@ class TestBuilder(Generic[Testable]):
test_args["column_name"] = name
return test_name, test_args
@property
def enabled(self) -> Optional[bool]:
return self.config.get("enabled")
@property
def alias(self) -> Optional[str]:
return self.config.get("alias")
@property
def severity(self) -> Optional[str]:
sev = self.config.get("severity")
if sev:
return sev.upper()
else:
return None
@property
def store_failures(self) -> Optional[bool]:
return self.config.get("store_failures")
@property
def store_failures_as(self) -> Optional[bool]:
return self.config.get("store_failures_as")
@property
def where(self) -> Optional[str]:
return self.config.get("where")
@property
def limit(self) -> Optional[int]:
return self.config.get("limit")
@property
def warn_if(self) -> Optional[str]:
return self.config.get("warn_if")
@property
def error_if(self) -> Optional[str]:
return self.config.get("error_if")
@property
def fail_calc(self) -> Optional[str]:
return self.config.get("fail_calc")
@property
def meta(self) -> Optional[dict]:
return self.config.get("meta")
@property
def database(self) -> Optional[str]:
return self.config.get("database")
@property
def schema(self) -> Optional[str]:
return self.config.get("schema")
def get_static_config(self):
config = {}
if self.alias is not None:
config["alias"] = self.alias
if self.severity is not None:
config["severity"] = self.severity
if self.enabled is not None:
config["enabled"] = self.enabled
if self.where is not None:
config["where"] = self.where
if self.limit is not None:
config["limit"] = self.limit
if self.warn_if is not None:
config["warn_if"] = self.warn_if
if self.error_if is not None:
config["error_if"] = self.error_if
if self.fail_calc is not None:
config["fail_calc"] = self.fail_calc
if self.store_failures is not None:
config["store_failures"] = self.store_failures
if self.store_failures_as is not None:
config["store_failures_as"] = self.store_failures_as
if self.meta is not None:
config["meta"] = self.meta
if self.database is not None:
config["database"] = self.database
if self.schema is not None:
config["schema"] = self.schema
return config
def tags(self) -> List[str]:
tags = self.config.get("tags", [])
if isinstance(tags, str):

View File

@@ -10,6 +10,7 @@ from itertools import chain
from typing import Any, Callable, Dict, List, Mapping, Optional, Set, Tuple, Type, Union
import msgpack
from jinja2.nodes import Call
import dbt.deprecations
import dbt.exceptions
@@ -17,6 +18,7 @@ import dbt.tracking
import dbt.utils
import dbt_common.utils
from dbt import plugins
from dbt.adapters.capability import Capability
from dbt.adapters.factory import (
get_adapter,
get_adapter_package_names,
@@ -66,6 +68,7 @@ from dbt.events.types import (
ArtifactWritten,
DeprecatedModel,
DeprecatedReference,
InvalidConcurrentBatchesConfig,
InvalidDisabledTargetInTestNode,
MicrobatchModelNoEventTimeInputs,
NodeNotFoundOrDisabled,
@@ -113,6 +116,7 @@ from dbt.parser.snapshots import SnapshotParser
from dbt.parser.sources import SourcePatcher
from dbt.parser.unit_tests import process_models_for_unit_test
from dbt.version import __version__
from dbt_common.clients.jinja import parse
from dbt_common.clients.system import make_directory, path_exists, read_json, write_file
from dbt_common.constants import SECRET_ENV_PREFIX
from dbt_common.dataclass_schema import StrEnum, dbtClassMixin
@@ -510,6 +514,8 @@ class ManifestLoader:
self.check_for_model_deprecations()
self.check_for_spaces_in_resource_names()
self.check_for_microbatch_deprecations()
self.check_forcing_batch_concurrency()
self.check_microbatch_model_has_a_filtered_input()
return self.manifest
@@ -627,9 +633,11 @@ class ManifestLoader:
else EventLevel.WARN
)
flags = get_flags()
for node in self.manifest.nodes.values():
if " " in node.name:
if improper_resource_names == 0 or self.root_project.args.DEBUG:
if improper_resource_names == 0 or flags.DEBUG:
fire_event(
SpacesInResourceNameDeprecation(
unique_id=node.unique_id,
@@ -641,7 +649,6 @@ class ManifestLoader:
if improper_resource_names > 0:
if level == EventLevel.WARN:
flags = get_flags()
dbt.deprecations.warn(
"resource-names-with-spaces",
count_invalid_names=improper_resource_names,
@@ -1235,7 +1242,7 @@ class ManifestLoader:
self.manifest,
config.project_name,
)
_process_docs_for_node(ctx, node)
_process_docs_for_node(ctx, node, self.manifest)
for source in self.manifest.sources.values():
if source.created_at < self.started_at:
continue
@@ -1245,7 +1252,7 @@ class ManifestLoader:
self.manifest,
config.project_name,
)
_process_docs_for_source(ctx, source)
_process_docs_for_source(ctx, source, self.manifest)
for macro in self.manifest.macros.values():
if macro.created_at < self.started_at:
continue
@@ -1468,6 +1475,34 @@ class ManifestLoader:
f"Microbatch model '{node.name}' optional 'concurrent_batches' config must be of type `bool` if specified, but got: {type(concurrent_batches)})."
)
def check_forcing_batch_concurrency(self) -> None:
if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name):
adapter = get_adapter(self.root_project)
if not adapter.supports(Capability.MicrobatchConcurrency):
models_forcing_concurrent_batches = 0
for node in self.manifest.nodes.values():
if (
hasattr(node.config, "concurrent_batches")
and node.config.concurrent_batches is True
):
models_forcing_concurrent_batches += 1
if models_forcing_concurrent_batches > 0:
warn_or_error(
InvalidConcurrentBatchesConfig(
num_models=models_forcing_concurrent_batches,
adapter_type=adapter.type(),
)
)
def check_microbatch_model_has_a_filtered_input(self):
if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name):
for node in self.manifest.nodes.values():
if (
node.config.materialized == "incremental"
and node.config.incremental_strategy == "microbatch"
):
# Validate upstream node event_time (if configured)
has_input_with_event_time_config = False
for input_unique_id in node.depends_on.nodes:
@@ -1624,13 +1659,55 @@ def _check_manifest(manifest: Manifest, config: RuntimeConfig) -> None:
DocsContextCallback = Callable[[ResultNode], Dict[str, Any]]
def _get_doc_blocks(description: str, manifest: Manifest, node_package: str) -> List[str]:
ast = parse(description)
doc_blocks: List[str] = []
if not hasattr(ast, "body"):
return doc_blocks
for statement in ast.body:
for node in statement.nodes:
if (
isinstance(node, Call)
and hasattr(node, "node")
and hasattr(node, "args")
and hasattr(node.node, "name")
and node.node.name == "doc"
):
doc_args = [arg.value for arg in node.args]
if len(doc_args) == 1:
package, name = None, doc_args[0]
elif len(doc_args) == 2:
package, name = doc_args
else:
continue
if not manifest.metadata.project_name:
continue
resolved_doc = manifest.resolve_doc(
name, package, manifest.metadata.project_name, node_package
)
if resolved_doc:
doc_blocks.append(resolved_doc.unique_id)
return doc_blocks
# node and column descriptions
def _process_docs_for_node(
context: Dict[str, Any],
node: ManifestNode,
manifest: Manifest,
):
node.doc_blocks = _get_doc_blocks(node.description, manifest, node.package_name)
node.description = get_rendered(node.description, context)
for column_name, column in node.columns.items():
column.doc_blocks = _get_doc_blocks(column.description, manifest, node.package_name)
column.description = get_rendered(column.description, context)
@@ -1638,18 +1715,16 @@ def _process_docs_for_node(
def _process_docs_for_source(
context: Dict[str, Any],
source: SourceDefinition,
manifest: Manifest,
):
table_description = source.description
source_description = source.source_description
table_description = get_rendered(table_description, context)
source_description = get_rendered(source_description, context)
source.description = table_description
source.source_description = source_description
source.doc_blocks = _get_doc_blocks(source.description, manifest, source.package_name)
source.description = get_rendered(source.description, context)
source.source_description = get_rendered(source.source_description, context)
for column in source.columns.values():
column_desc = column.description
column_desc = get_rendered(column_desc, context)
column.description = column_desc
column.doc_blocks = _get_doc_blocks(column.description, manifest, source.package_name)
column.description = get_rendered(column.description, context)
# macro argument descriptions
@@ -2007,7 +2082,7 @@ def process_node(config: RuntimeConfig, manifest: Manifest, node: ManifestNode):
_process_sources_for_node(manifest, config.project_name, node)
_process_refs(manifest, config.project_name, node, config.dependencies)
ctx = generate_runtime_docs_context(config, node, manifest, config.project_name)
_process_docs_for_node(ctx, node)
_process_docs_for_node(ctx, node, manifest)
def write_semantic_manifest(manifest: Manifest, target_path: str) -> None:

View File

@@ -1,6 +1,6 @@
import os
from copy import deepcopy
from typing import Callable, Dict, List, MutableMapping
from typing import Callable, Dict, List, MutableMapping, Union
from dbt.constants import DEFAULT_ENV_PLACEHOLDER
from dbt.contracts.files import (
@@ -10,6 +10,7 @@ from dbt.contracts.files import (
parse_file_type_to_parser,
)
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import AnalysisNode, ModelNode, SeedNode, SnapshotNode
from dbt.events.types import PartialParsingEnabled, PartialParsingFile
from dbt.node_types import NodeType
from dbt_common.context import get_invocation_context
@@ -725,6 +726,7 @@ class PartialParsing:
handle_change("semantic_models", self.delete_schema_semantic_model)
handle_change("unit_tests", self.delete_schema_unit_test)
handle_change("saved_queries", self.delete_schema_saved_query)
handle_change("data_tests", self.delete_schema_data_test_patch)
def _handle_element_change(
self, schema_file, saved_yaml_dict, new_yaml_dict, env_var_changes, dict_key: str, delete
@@ -819,7 +821,7 @@ class PartialParsing:
# For model, seed, snapshot, analysis schema dictionary keys,
# delete the patches and tests from the patch
def delete_schema_mssa_links(self, schema_file, dict_key, elem):
def delete_schema_mssa_links(self, schema_file, dict_key, elem) -> None:
# find elem node unique_id in node_patches
prefix = key_to_prefix[dict_key]
elem_unique_ids = []
@@ -840,11 +842,12 @@ class PartialParsing:
elem_unique_id in self.saved_manifest.nodes
or elem_unique_id in self.saved_manifest.disabled
):
nodes: List[Union[ModelNode, SeedNode, SnapshotNode, AnalysisNode]] = []
if elem_unique_id in self.saved_manifest.nodes:
nodes = [self.saved_manifest.nodes.pop(elem_unique_id)]
nodes = [self.saved_manifest.nodes.pop(elem_unique_id)] # type: ignore[list-item]
else:
# The value of disabled items is a list of nodes
nodes = self.saved_manifest.disabled.pop(elem_unique_id)
nodes = self.saved_manifest.disabled.pop(elem_unique_id) # type: ignore[assignment]
# need to add the node source_file to pp_files
for node in nodes:
file_id = node.file_id
@@ -857,9 +860,9 @@ class PartialParsing:
# if the node's group has changed - need to reparse all referencing nodes to ensure valid ref access
if node.group != elem.get("group"):
self.schedule_referencing_nodes_for_parsing(node.unique_id)
# If the latest version has changed or a version has been removed we need to
# reparse referencing nodes.
if node.is_versioned:
# If the latest version has changed, a version has been removed, or a version has been added,
# we need to reparse referencing nodes.
if node.is_versioned or elem.get("versions"):
self.schedule_referencing_nodes_for_parsing(node.unique_id)
# remove from patches
schema_file.node_patches.remove(elem_unique_id)
@@ -919,6 +922,23 @@ class PartialParsing:
self.saved_files[macro_file_id] = deepcopy(self.new_files[macro_file_id])
self.add_to_pp_files(self.saved_files[macro_file_id])
def delete_schema_data_test_patch(self, schema_file, data_test):
data_test_unique_id = None
for unique_id in schema_file.node_patches:
if not unique_id.startswith("test"):
continue
parts = unique_id.split(".")
elem_name = parts[2]
if elem_name == data_test["name"]:
data_test_unique_id = unique_id
break
if data_test_unique_id and data_test_unique_id in self.saved_manifest.nodes:
singular_data_test = self.saved_manifest.nodes.pop(data_test_unique_id)
file_id = singular_data_test.file_id
if file_id in self.new_files:
self.saved_files[file_id] = deepcopy(self.new_files[file_id])
self.add_to_pp_files(self.saved_files[file_id])
# exposures are created only from schema files, so just delete
# the exposure or the disabled exposure.
def delete_schema_exposure(self, schema_file, exposure_dict):

View File

@@ -286,7 +286,7 @@ class SchemaGenericTestParser(SimpleParser):
# to the context in rendering processing
node.depends_on.add_macro(macro_unique_id)
if macro_unique_id in ["macro.dbt.test_not_null", "macro.dbt.test_unique"]:
config_call_dict = builder.get_static_config()
config_call_dict = builder.config
config._config_call_dict = config_call_dict
# This sets the config from dbt_project
self.update_parsed_node_config(node, config)

View File

@@ -37,11 +37,21 @@ class SchemaYamlRenderer(BaseRenderer):
"tests" and "data_tests" are both currently supported but "tests" has been deprecated
"""
# top level descriptions and data_tests
if len(keypath) >= 1 and keypath[0] in ("tests", "data_tests", "description"):
if len(keypath) >= 1 and keypath[0] in (
"tests",
"data_tests",
"description",
"loaded_at_query",
):
return True
# columns descriptions and data_tests
if len(keypath) == 2 and keypath[1] in ("tests", "data_tests", "description"):
if len(keypath) == 2 and keypath[1] in (
"tests",
"data_tests",
"description",
"loaded_at_query",
):
return True
# pre- and post-hooks
@@ -69,9 +79,8 @@ class SchemaYamlRenderer(BaseRenderer):
def should_render_keypath(self, keypath: Keypath) -> bool:
if len(keypath) < 1:
return True
if self.key == "sources":
if keypath[0] == "description":
if keypath[0] in ("description", "loaded_at_query"):
return False
if keypath[0] == "tables":
if self._is_norender_key(keypath[2:]):

View File

@@ -1,3 +1,4 @@
from collections.abc import Sequence
from typing import Any, Dict, List, Optional, Union
from dbt.artifacts.resources import (
@@ -21,6 +22,7 @@ from dbt.artifacts.resources import (
WhereFilter,
WhereFilterIntersection,
)
from dbt.artifacts.resources.v1.semantic_model import SemanticLayerElementConfig
from dbt.clients.jinja import get_rendered
from dbt.context.context_config import (
BaseContextConfigGenerator,
@@ -228,19 +230,11 @@ class MetricParser(YamlReader):
def _get_period_agg(self, unparsed_period_agg: str) -> PeriodAggregation:
return PeriodAggregation(unparsed_period_agg)
def _get_optional_grain_to_date(
self, unparsed_grain_to_date: Optional[str]
) -> Optional[TimeGranularity]:
if not unparsed_grain_to_date:
return None
return TimeGranularity(unparsed_grain_to_date)
def _get_optional_time_window(
self, unparsed_window: Optional[str]
) -> Optional[MetricTimeWindow]:
if unparsed_window is not None:
parts = unparsed_window.split(" ")
parts = unparsed_window.lower().split(" ")
if len(parts) != 2:
raise YamlParseDictError(
self.yaml.path,
@@ -252,16 +246,11 @@ class MetricParser(YamlReader):
granularity = parts[1]
# once we drop python 3.8 this could just be `granularity = parts[0].removesuffix('s')
if granularity.endswith("s"):
# months -> month
if granularity.endswith("s") and granularity[:-1] in [
item.value for item in TimeGranularity
]:
# Can only remove the `s` if it's a standard grain, months -> month
granularity = granularity[:-1]
if granularity not in [item.value for item in TimeGranularity]:
raise YamlParseDictError(
self.yaml.path,
"window",
{"window": unparsed_window},
f"Invalid time granularity {granularity} in cumulative/conversion metric window string: ({unparsed_window})",
)
count = parts[0]
if not count.isdigit():
@@ -274,7 +263,7 @@ class MetricParser(YamlReader):
return MetricTimeWindow(
count=int(count),
granularity=TimeGranularity(granularity),
granularity=granularity,
)
else:
return None
@@ -283,16 +272,12 @@ class MetricParser(YamlReader):
if isinstance(unparsed, str):
return MetricInput(name=unparsed)
else:
offset_to_grain: Optional[TimeGranularity] = None
if unparsed.offset_to_grain is not None:
offset_to_grain = TimeGranularity(unparsed.offset_to_grain)
return MetricInput(
name=unparsed.name,
filter=parse_where_filter(unparsed.filter),
alias=unparsed.alias,
offset_window=self._get_optional_time_window(unparsed.offset_window),
offset_to_grain=offset_to_grain,
offset_to_grain=unparsed.offset_to_grain,
)
def _get_optional_metric_input(
@@ -354,9 +339,7 @@ class MetricParser(YamlReader):
window=self._get_optional_time_window(
unparsed_type_params.cumulative_type_params.window
),
grain_to_date=self._get_optional_grain_to_date(
unparsed_type_params.cumulative_type_params.grain_to_date
),
grain_to_date=unparsed_type_params.cumulative_type_params.grain_to_date,
period_agg=self._get_period_agg(
unparsed_type_params.cumulative_type_params.period_agg
),
@@ -369,6 +352,10 @@ class MetricParser(YamlReader):
grain_to_date: Optional[TimeGranularity] = None
if type_params.grain_to_date is not None:
# This should've been changed to a string (to support custom grain), but since this
# is a legacy field waiting to be deprecated, we will not support custom grain here
# in order to force customers off of using this field. The field to use should be
# `cumulative_type_params.grain_to_date`
grain_to_date = TimeGranularity(type_params.grain_to_date)
return MetricTypeParams(
@@ -435,9 +422,7 @@ class MetricParser(YamlReader):
label=unparsed.label,
type=MetricType(unparsed.type),
type_params=self._get_metric_type_params(unparsed),
time_granularity=(
TimeGranularity(unparsed.time_granularity) if unparsed.time_granularity else None
),
time_granularity=unparsed.time_granularity,
filter=parse_where_filter(unparsed.filter),
meta=unparsed.meta,
tags=unparsed.tags,
@@ -553,6 +538,7 @@ class SemanticModelParser(YamlReader):
type_params=self._get_dimension_type_params(unparsed=unparsed.type_params),
expr=unparsed.expr,
metadata=None, # TODO: requires a fair bit of parsing context
config=SemanticLayerElementConfig(meta=unparsed.config.get("meta", {})),
)
)
return dimensions
@@ -568,6 +554,7 @@ class SemanticModelParser(YamlReader):
label=unparsed.label,
role=unparsed.role,
expr=unparsed.expr,
config=SemanticLayerElementConfig(meta=unparsed.config.get("meta", {})),
)
)
@@ -600,6 +587,7 @@ class SemanticModelParser(YamlReader):
unparsed.non_additive_dimension
),
agg_time_dimension=unparsed.agg_time_dimension,
config=SemanticLayerElementConfig(meta=unparsed.config.get("meta", {})),
)
)
return measures
@@ -655,6 +643,10 @@ class SemanticModelParser(YamlReader):
fqn = self.schema_parser.get_fqn_prefix(path)
fqn.append(unparsed.name)
entities = self._get_entities(unparsed.entities)
measures = self._get_measures(unparsed.measures)
dimensions = self._get_dimensions(unparsed.dimensions)
config = self._generate_semantic_model_config(
target=unparsed,
fqn=fqn,
@@ -662,6 +654,19 @@ class SemanticModelParser(YamlReader):
rendered=True,
)
# Combine configs according to the behavior documented here https://docs.getdbt.com/reference/configs-and-properties#combining-configs
elements: Sequence[Union[Dimension, Entity, Measure]] = [
*dimensions,
*entities,
*measures,
]
for element in elements:
if config is not None:
if element.config is None:
element.config = SemanticLayerElementConfig(meta=config.meta)
else:
element.config.meta = {**config.get("meta", {}), **element.config.meta}
config = config.finalize_and_validate()
unrendered_config = self._generate_semantic_model_config(
@@ -683,9 +688,9 @@ class SemanticModelParser(YamlReader):
path=path,
resource_type=NodeType.SemanticModel,
unique_id=unique_id,
entities=self._get_entities(unparsed.entities),
measures=self._get_measures(unparsed.measures),
dimensions=self._get_dimensions(unparsed.dimensions),
entities=entities,
measures=measures,
dimensions=dimensions,
defaults=unparsed.defaults,
primary_entity=unparsed.primary_entity,
config=config,
@@ -816,6 +821,18 @@ class SavedQueryParser(YamlReader):
rendered=False,
)
# The parser handles plain strings just fine, but we need to be able
# to join two lists, remove duplicates, and sort, so we have to wrap things here.
def wrap_tags(s: Union[List[str], str]) -> List[str]:
if s is None:
return []
return [s] if isinstance(s, str) else s
config_tags = wrap_tags(config.get("tags"))
unparsed_tags = wrap_tags(unparsed.tags)
tags = list(set([*unparsed_tags, *config_tags]))
tags.sort()
parsed = SavedQuery(
description=unparsed.description,
label=unparsed.label,
@@ -831,6 +848,7 @@ class SavedQueryParser(YamlReader):
config=config,
unrendered_config=unrendered_config,
group=config.group,
tags=tags,
)
for export in parsed.exports:

View File

@@ -1,11 +1,17 @@
import datetime
import pathlib
import time
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Generic, Iterable, List, Optional, Type, TypeVar
from dbt.artifacts.resources import RefArgs
from dbt.artifacts.resources.v1.model import CustomGranularity, TimeSpine
from dbt.artifacts.resources.v1.model import (
CustomGranularity,
ModelBuildAfter,
ModelFreshness,
TimeSpine,
)
from dbt.clients.jinja_static import statically_parse_ref_or_source
from dbt.clients.yaml_helper import load_yaml_text
from dbt.config import RuntimeConfig
@@ -289,9 +295,15 @@ class SchemaParser(SimpleParser[YamlBlock, ModelNode]):
parser = SnapshotParser(self.project, self.manifest, self.root_project)
fqn = parser.get_fqn_prefix(block.path.relative_path)
fqn.append(snapshot["name"])
compiled_path = str(
pathlib.PurePath("").joinpath(
block.path.relative_path, snapshot["name"] + ".sql"
)
)
snapshot_node = parser._create_parsetime_node(
block,
self.get_compiled_path(block),
compiled_path,
parser.initial_config(fqn),
fqn,
snapshot["name"],
@@ -715,6 +727,7 @@ class NodePatchParser(PatchParser[NodeTarget, ParsedNodePatch], Generic[NodeTarg
# code consistency.
deprecation_date: Optional[datetime.datetime] = None
time_spine: Optional[TimeSpine] = None
freshness: Optional[ModelFreshness] = None
if isinstance(block.target, UnparsedModelUpdate):
deprecation_date = block.target.deprecation_date
time_spine = (
@@ -731,6 +744,17 @@ class NodePatchParser(PatchParser[NodeTarget, ParsedNodePatch], Generic[NodeTarg
if block.target.time_spine
else None
)
freshness = (
ModelFreshness(
build_after=ModelBuildAfter(
count=block.target.freshness.build_after.count,
period=block.target.freshness.build_after.period,
depends_on=block.target.freshness.build_after.depends_on,
),
)
if block.target.freshness
else None
)
patch = ParsedNodePatch(
name=block.target.name,
original_file_path=block.target.original_file_path,
@@ -747,6 +771,7 @@ class NodePatchParser(PatchParser[NodeTarget, ParsedNodePatch], Generic[NodeTarg
constraints=block.target.constraints,
deprecation_date=deprecation_date,
time_spine=time_spine,
freshness=freshness,
)
assert isinstance(self.yaml.file, SchemaSourceFile)
source_file: SchemaSourceFile = self.yaml.file
@@ -1036,6 +1061,7 @@ class ModelPatchParser(NodePatchParser[UnparsedModelUpdate]):
# These two will have to be reapplied after config is built for versioned models
self.patch_constraints(node, patch.constraints)
self.patch_time_spine(node, patch.time_spine)
node.freshness = patch.freshness
node.build_contract_checksum()
def patch_constraints(self, node: ModelNode, constraints: List[Dict[str, Any]]) -> None:
@@ -1199,8 +1225,6 @@ class SingularTestPatchParser(PatchParser[UnparsedSingularTestUpdate, ParsedSing
node.patch_path = patch.file_id
node.description = patch.description
node.created_at = time.time()
node.meta = patch.meta
node.docs = patch.docs
class MacroPatchParser(PatchParser[UnparsedMacroUpdate, ParsedMacroPatch]):

View File

@@ -26,6 +26,7 @@ from dbt.contracts.graph.unparsed import (
UnparsedSourceTableDefinition,
)
from dbt.events.types import FreshnessConfigProblem, UnusedTables
from dbt.exceptions import ParsingError
from dbt.node_types import NodeType
from dbt.parser.common import ParserRef
from dbt.parser.schema_generic_tests import SchemaGenericTestParser
@@ -131,11 +132,28 @@ class SourcePatcher:
# We need to be able to tell the difference between explicitly setting the loaded_at_field to None/null
# and when it's simply not set. This allows a user to override the source level loaded_at_field so that
# specific table can default to metadata-based freshness.
if table.loaded_at_field_present and table.loaded_at_query:
raise ParsingError(
"Cannot specify both loaded_at_field and loaded_at_query at table level."
)
if source.loaded_at_field and source.loaded_at_query:
raise ParsingError(
"Cannot specify both loaded_at_field and loaded_at_query at source level."
)
if table.loaded_at_field_present or table.loaded_at_field is not None:
loaded_at_field = table.loaded_at_field
else:
loaded_at_field = source.loaded_at_field # may be None, that's okay
loaded_at_query: Optional[str]
if table.loaded_at_query is not None:
loaded_at_query = table.loaded_at_query
else:
if table.loaded_at_field_present:
loaded_at_query = None
else:
loaded_at_query = source.loaded_at_query
freshness = merge_freshness(source.freshness, table.freshness)
quoting = source.quoting.merged(table.quoting)
# path = block.path.original_file_path
@@ -185,6 +203,7 @@ class SourcePatcher:
meta=meta,
loader=source.loader,
loaded_at_field=loaded_at_field,
loaded_at_query=loaded_at_query,
freshness=freshness,
quoting=quoting,
resource_type=NodeType.Source,

View File

@@ -0,0 +1,2 @@
from .exposure_runner import ExposureRunner
from .saved_query_runner import SavedQueryRunner

View File

@@ -0,0 +1,7 @@
from dbt.runners.no_op_runner import NoOpRunner
class ExposureRunner(NoOpRunner):
@property
def description(self) -> str:
return f"exposure {self.node.name}"

View File

@@ -0,0 +1,45 @@
import threading
from dbt.artifacts.schemas.results import RunStatus
from dbt.artifacts.schemas.run import RunResult
from dbt.contracts.graph.manifest import Manifest
from dbt.events.types import LogNodeNoOpResult
from dbt.task.base import BaseRunner
from dbt_common.events.functions import fire_event
class NoOpRunner(BaseRunner):
@property
def description(self) -> str:
raise NotImplementedError("description not implemented")
def before_execute(self) -> None:
pass
def compile(self, manifest: Manifest):
return self.node
def after_execute(self, result) -> None:
fire_event(
LogNodeNoOpResult(
description=self.description,
index=self.node_index,
total=self.num_nodes,
node_info=self.node.node_info,
)
)
def execute(self, compiled_node, manifest):
# no-op
return RunResult(
node=compiled_node,
status=RunStatus.NoOp,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
message="NO-OP",
adapter_response={},
failures=0,
batch_results=None,
agate_table=None,
)

View File

@@ -0,0 +1,7 @@
from dbt.runners.no_op_runner import NoOpRunner
class SavedQueryRunner(NoOpRunner):
@property
def description(self) -> str:
return f"saved query {self.node.name}"

View File

@@ -41,6 +41,7 @@ from dbt.events.types import (
)
from dbt.flags import get_flags
from dbt.graph import Graph
from dbt.task import group_lookup
from dbt.task.printer import print_run_result_error
from dbt_common.events.contextvars import get_node_info
from dbt_common.events.functions import fire_event
@@ -424,6 +425,8 @@ class BaseRunner(metaclass=ABCMeta):
# if this model was skipped due to an upstream ephemeral model
# failure, print a special 'error skip' message.
# Include skip_cause NodeStatus
group = group_lookup.get(self.node.unique_id)
if self._skip_caused_by_ephemeral_failure():
fire_event(
LogSkipBecauseError(
@@ -432,6 +435,7 @@ class BaseRunner(metaclass=ABCMeta):
index=self.node_index,
total=self.num_nodes,
status=self.skip_cause.status,
group=group,
)
)
# skip_cause here should be the run_result from the ephemeral model
@@ -459,6 +463,7 @@ class BaseRunner(metaclass=ABCMeta):
index=self.node_index,
total=self.num_nodes,
node_info=self.node.node_info,
group=group,
)
)

View File

@@ -1,18 +1,17 @@
import threading
from typing import Dict, List, Optional, Set, Type
from dbt.artifacts.schemas.results import NodeStatus, RunStatus
from dbt.artifacts.schemas.results import NodeStatus
from dbt.artifacts.schemas.run import RunResult
from dbt.cli.flags import Flags
from dbt.config.runtime import RuntimeConfig
from dbt.contracts.graph.manifest import Manifest
from dbt.events.types import LogNodeNoOpResult
from dbt.exceptions import DbtInternalError
from dbt.graph import Graph, GraphQueue, ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.runners import ExposureRunner as exposure_runner
from dbt.runners import SavedQueryRunner as saved_query_runner
from dbt.task.base import BaseRunner, resource_types_from_args
from dbt.task.run import MicrobatchModelRunner
from dbt_common.events.functions import fire_event
from .run import ModelRunner as run_model_runner
from .run import RunTask
@@ -21,48 +20,10 @@ from .snapshot import SnapshotRunner as snapshot_model_runner
from .test import TestRunner as test_runner
class SavedQueryRunner(BaseRunner):
# Stub. No-op Runner for Saved Queries, which require MetricFlow for execution.
@property
def description(self) -> str:
return f"saved query {self.node.name}"
def before_execute(self) -> None:
pass
def compile(self, manifest: Manifest):
return self.node
def after_execute(self, result) -> None:
fire_event(
LogNodeNoOpResult(
description=self.description,
index=self.node_index,
total=self.num_nodes,
node_info=self.node.node_info,
)
)
def execute(self, compiled_node, manifest):
# no-op
return RunResult(
node=compiled_node,
status=RunStatus.Success,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
message="NO-OP",
adapter_response={},
failures=0,
batch_results=None,
agate_table=None,
)
class BuildTask(RunTask):
"""The Build task processes all assets of a given process and attempts to
'build' them in an opinionated fashion. Every resource type outlined in
RUNNER_MAP will be processed by the mapped runner class.
RUNNER_MAP will be processed by the mapped runners class.
I.E. a resource of type Model is handled by the ModelRunner which is
imported as run_model_runner."""
@@ -80,7 +41,8 @@ class BuildTask(RunTask):
NodeType.Seed: seed_runner,
NodeType.Test: test_runner,
NodeType.Unit: test_runner,
NodeType.SavedQuery: SavedQueryRunner,
NodeType.SavedQuery: saved_query_runner,
NodeType.Exposure: exposure_runner,
}
ALL_RESOURCE_VALUES = frozenset({x for x in RUNNER_MAP.keys()})
@@ -169,7 +131,8 @@ class BuildTask(RunTask):
runner.do_skip(cause=cause)
if isinstance(runner, MicrobatchModelRunner):
return self.handle_microbatch_model(runner, pool)
runner.set_parent_task(self)
runner.set_pool(pool)
return self.call_runner(runner)
@@ -184,10 +147,11 @@ class BuildTask(RunTask):
runner.do_skip(cause=cause)
if isinstance(runner, MicrobatchModelRunner):
callback(self.handle_microbatch_model(runner, pool))
else:
args = [runner]
self._submit(pool, args, callback)
runner.set_parent_task(self)
runner.set_pool(pool)
args = [runner]
self._submit(pool, args, callback)
# Make a map of model unique_ids to selected unit test unique_ids,
# for processing before the model.

View File

@@ -15,6 +15,8 @@ from dbt.artifacts.schemas.freshness import (
PartialSourceFreshnessResult,
SourceFreshnessResult,
)
from dbt.clients import jinja
from dbt.context.providers import RuntimeProvider, SourceContext
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import HookNode, SourceDefinition
from dbt.contracts.results import RunStatus
@@ -114,7 +116,22 @@ class FreshnessRunner(BaseRunner):
adapter_response: Optional[AdapterResponse] = None
freshness: Optional[FreshnessResponse] = None
if compiled_node.loaded_at_field is not None:
if compiled_node.loaded_at_query is not None:
# within the context user can have access to `this`, `source_node`(`model` will point to the same thing), etc
compiled_code = jinja.get_rendered(
compiled_node.loaded_at_query,
SourceContext(
compiled_node, self.config, manifest, RuntimeProvider(), None
).to_dict(),
compiled_node,
)
adapter_response, freshness = self.adapter.calculate_freshness_from_custom_sql(
relation,
compiled_code,
macro_resolver=manifest,
)
status = compiled_node.freshness.status(freshness["age"])
elif compiled_node.loaded_at_field is not None:
adapter_response, freshness = self.adapter.calculate_freshness(
relation,
compiled_node.loaded_at_field,
@@ -146,7 +163,6 @@ class FreshnessRunner(BaseRunner):
raise DbtRuntimeError(
f"Could not compute freshness for source {compiled_node.name}: no 'loaded_at_field' provided and {self.adapter.type()} adapter does not support metadata-based freshness checks."
)
# adapter_response was not returned in previous versions, so this will be None
# we cannot call to_dict() on NoneType
if adapter_response:

View File

@@ -17,6 +17,7 @@ from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.task.base import resource_types_from_args
from dbt.task.runnable import GraphRunnableTask
from dbt.utils import JSONEncoder
from dbt_common.events.contextvars import task_contextvars
from dbt_common.events.functions import fire_event, warn_or_error
from dbt_common.events.types import PrintEvent
@@ -142,7 +143,8 @@ class ListTask(GraphRunnableTask):
if self.args.output_keys
else k in self.ALLOWED_KEYS
)
}
},
cls=JSONEncoder,
)
def generate_paths(self) -> Iterator[str]:

Some files were not shown because too many files have changed in this diff Show More