Compare commits

...

1 Commits

Author SHA1 Message Date
Jeremy Cohen
14c64737cd Rough in naive implementation for 5005 2024-09-12 13:41:36 +02:00
2 changed files with 42 additions and 2 deletions

View File

@@ -635,8 +635,13 @@ def clone(ctx, **kwargs):
@global_flags @global_flags
@click.argument("macro") @click.argument("macro")
@p.args @p.args
@p.exclude
@p.exclude_resource_type
@p.profiles_dir @p.profiles_dir
@p.project_dir @p.project_dir
@p.resource_type
@p.select
@p.selector
@p.target_path @p.target_path
@p.threads @p.threads
@p.vars @p.vars

View File

@@ -15,8 +15,10 @@ from dbt.events.types import (
RunningOperationCaughtError, RunningOperationCaughtError,
RunningOperationUncaughtError, RunningOperationUncaughtError,
) )
from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType from dbt.node_types import NodeType
from dbt.task.base import ConfiguredTask from dbt.task.base import resource_types_from_args
from dbt.task.runnable import GraphRunnableTask
from dbt_common.events.functions import fire_event from dbt_common.events.functions import fire_event
from dbt_common.exceptions import DbtInternalError from dbt_common.exceptions import DbtInternalError
@@ -27,7 +29,36 @@ if TYPE_CHECKING:
import agate import agate
class RunOperationTask(ConfiguredTask): class RunOperationTask(GraphRunnableTask):
ALL_RESOURCE_VALUES = frozenset(
(
NodeType.Model,
NodeType.Snapshot,
NodeType.Seed,
NodeType.Test,
NodeType.Source,
NodeType.Exposure,
NodeType.Metric,
NodeType.SavedQuery,
NodeType.SemanticModel,
NodeType.Unit,
NodeType.Analysis,
)
)
def get_node_selector(self):
if self.manifest is None or self.graph is None:
raise DbtInternalError("manifest and graph must be set to get perform node selection")
return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
resource_types=resource_types_from_args(
self.args, set(self.ALL_RESOURCE_VALUES), set(self.ALL_RESOURCE_VALUES)
),
include_empty_nodes=True,
)
def _get_macro_parts(self): def _get_macro_parts(self):
macro_name = self.args.macro macro_name = self.args.macro
if "." in macro_name: if "." in macro_name:
@@ -53,6 +84,10 @@ class RunOperationTask(ConfiguredTask):
def run(self) -> RunResultsArtifact: def run(self) -> RunResultsArtifact:
start = datetime.utcnow() start = datetime.utcnow()
self.compile_manifest() self.compile_manifest()
if self.manifest is None or self.graph is None:
raise DbtInternalError("_runtime_initialize never loaded the graph!")
self.job_queue = self.get_graph_queue()
success = True success = True