mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-17 19:31:34 +00:00
Factor Out Repeated Logic in the PartialParsing Class (#7952)
* CT-2711: Add remove_tests() call to delete_schema_source() so that call sites are more uniform with other node deletion call sites. This will enable further code factorization. * CT-2711: Factor repeated code section (mostly) out of PartialParsing.handle_schema_file_changes() * CT-2711: Factor a repeated code section out of schedule_nodes_for_parsing()
This commit is contained in:
@@ -1,11 +1,12 @@
|
||||
import os
|
||||
from copy import deepcopy
|
||||
from typing import MutableMapping, Dict, List
|
||||
from typing import MutableMapping, Dict, List, Callable
|
||||
from dbt.contracts.graph.manifest import Manifest
|
||||
from dbt.contracts.files import (
|
||||
AnySourceFile,
|
||||
ParseFileType,
|
||||
parse_file_type_to_parser,
|
||||
SchemaSourceFile,
|
||||
)
|
||||
from dbt.events.functions import fire_event
|
||||
from dbt.events.base_types import EventLevel
|
||||
@@ -403,41 +404,19 @@ class PartialParsing:
|
||||
self.add_to_pp_files(self.saved_files[file_id])
|
||||
elif unique_id in self.saved_manifest.sources:
|
||||
source = self.saved_manifest.sources[unique_id]
|
||||
file_id = source.file_id
|
||||
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]:
|
||||
schema_file = self.saved_files[file_id]
|
||||
sources = []
|
||||
if "sources" in schema_file.dict_from_yaml:
|
||||
sources = schema_file.dict_from_yaml["sources"]
|
||||
source_element = self.get_schema_element(sources, source.source_name)
|
||||
if source_element:
|
||||
self.delete_schema_source(schema_file, source_element)
|
||||
self.remove_tests(schema_file, "sources", source_element["name"])
|
||||
self.merge_patch(schema_file, "sources", source_element)
|
||||
self._schedule_for_parsing(
|
||||
"sources", source, source.source_name, self.delete_schema_source
|
||||
)
|
||||
elif unique_id in self.saved_manifest.exposures:
|
||||
exposure = self.saved_manifest.exposures[unique_id]
|
||||
file_id = exposure.file_id
|
||||
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]:
|
||||
schema_file = self.saved_files[file_id]
|
||||
exposures = []
|
||||
if "exposures" in schema_file.dict_from_yaml:
|
||||
exposures = schema_file.dict_from_yaml["exposures"]
|
||||
exposure_element = self.get_schema_element(exposures, exposure.name)
|
||||
if exposure_element:
|
||||
self.delete_schema_exposure(schema_file, exposure_element)
|
||||
self.merge_patch(schema_file, "exposures", exposure_element)
|
||||
self._schedule_for_parsing(
|
||||
"exposures", exposure, exposure.name, self.delete_schema_exposure
|
||||
)
|
||||
elif unique_id in self.saved_manifest.metrics:
|
||||
metric = self.saved_manifest.metrics[unique_id]
|
||||
file_id = metric.file_id
|
||||
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]:
|
||||
schema_file = self.saved_files[file_id]
|
||||
metrics = []
|
||||
if "metrics" in schema_file.dict_from_yaml:
|
||||
metrics = schema_file.dict_from_yaml["metrics"]
|
||||
metric_element = self.get_schema_element(metrics, metric.name)
|
||||
if metric_element:
|
||||
self.delete_schema_metric(schema_file, metric_element)
|
||||
self.merge_patch(schema_file, "metrics", metric_element)
|
||||
self._schedule_for_parsing(
|
||||
"metrics", metric, metric.name, self.delete_schema_metric
|
||||
)
|
||||
elif unique_id in self.saved_manifest.macros:
|
||||
macro = self.saved_manifest.macros[unique_id]
|
||||
file_id = macro.file_id
|
||||
@@ -447,6 +426,19 @@ class PartialParsing:
|
||||
self.saved_files[file_id] = deepcopy(self.new_files[file_id])
|
||||
self.add_to_pp_files(self.saved_files[file_id])
|
||||
|
||||
def _schedule_for_parsing(self, dict_key: str, element, name, delete: Callable) -> None:
|
||||
file_id = element.file_id
|
||||
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]:
|
||||
schema_file = self.saved_files[file_id]
|
||||
elements = []
|
||||
assert isinstance(schema_file, SchemaSourceFile)
|
||||
if dict_key in schema_file.dict_from_yaml:
|
||||
elements = schema_file.dict_from_yaml[dict_key]
|
||||
schema_element = self.get_schema_element(elements, name)
|
||||
if schema_element:
|
||||
delete(schema_file, schema_element)
|
||||
self.merge_patch(schema_file, dict_key, schema_element)
|
||||
|
||||
def delete_macro_file(self, source_file, follow_references=False):
|
||||
self.check_for_special_deleted_macros(source_file)
|
||||
self.handle_macro_file_links(source_file, follow_references)
|
||||
@@ -538,7 +530,6 @@ class PartialParsing:
|
||||
# This is a source patch; need to re-parse orig source
|
||||
self.remove_source_override_target(patch)
|
||||
self.delete_schema_source(schema_file, patch)
|
||||
self.remove_tests(schema_file, "sources", patch["name"])
|
||||
self.merge_patch(schema_file, "sources", patch)
|
||||
else:
|
||||
file_id = node.file_id
|
||||
@@ -639,14 +630,12 @@ class PartialParsing:
|
||||
if "overrides" in source: # This is a source patch; need to re-parse orig source
|
||||
self.remove_source_override_target(source)
|
||||
self.delete_schema_source(schema_file, source)
|
||||
self.remove_tests(schema_file, dict_key, source["name"])
|
||||
self.merge_patch(schema_file, dict_key, source)
|
||||
if source_diff["deleted"]:
|
||||
for source in source_diff["deleted"]:
|
||||
if "overrides" in source: # This is a source patch; need to re-parse orig source
|
||||
self.remove_source_override_target(source)
|
||||
self.delete_schema_source(schema_file, source)
|
||||
self.remove_tests(schema_file, dict_key, source["name"])
|
||||
if source_diff["added"]:
|
||||
for source in source_diff["added"]:
|
||||
if "overrides" in source: # This is a source patch; need to re-parse orig source
|
||||
@@ -662,99 +651,40 @@ class PartialParsing:
|
||||
if "overrides" in source:
|
||||
self.remove_source_override_target(source)
|
||||
self.delete_schema_source(schema_file, source)
|
||||
self.remove_tests(schema_file, dict_key, source["name"])
|
||||
self.merge_patch(schema_file, dict_key, source)
|
||||
|
||||
# macros
|
||||
dict_key = "macros"
|
||||
macro_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict)
|
||||
if macro_diff["changed"]:
|
||||
for macro in macro_diff["changed"]:
|
||||
self.delete_schema_macro_patch(schema_file, macro)
|
||||
self.merge_patch(schema_file, dict_key, macro)
|
||||
if macro_diff["deleted"]:
|
||||
for macro in macro_diff["deleted"]:
|
||||
self.delete_schema_macro_patch(schema_file, macro)
|
||||
if macro_diff["added"]:
|
||||
for macro in macro_diff["added"]:
|
||||
self.merge_patch(schema_file, dict_key, macro)
|
||||
# Handle schema file updates due to env_var changes
|
||||
if dict_key in env_var_changes and dict_key in new_yaml_dict:
|
||||
for name in env_var_changes[dict_key]:
|
||||
if name in macro_diff["changed_or_deleted_names"]:
|
||||
continue
|
||||
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
|
||||
if elem:
|
||||
self.delete_schema_macro_patch(schema_file, elem)
|
||||
self.merge_patch(schema_file, dict_key, elem)
|
||||
def handle_change(key: str, delete: Callable):
|
||||
self._handle_element_change(
|
||||
schema_file, saved_yaml_dict, new_yaml_dict, env_var_changes, key, delete
|
||||
)
|
||||
|
||||
# exposures
|
||||
dict_key = "exposures"
|
||||
exposure_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict)
|
||||
if exposure_diff["changed"]:
|
||||
for exposure in exposure_diff["changed"]:
|
||||
self.delete_schema_exposure(schema_file, exposure)
|
||||
self.merge_patch(schema_file, dict_key, exposure)
|
||||
if exposure_diff["deleted"]:
|
||||
for exposure in exposure_diff["deleted"]:
|
||||
self.delete_schema_exposure(schema_file, exposure)
|
||||
if exposure_diff["added"]:
|
||||
for exposure in exposure_diff["added"]:
|
||||
self.merge_patch(schema_file, dict_key, exposure)
|
||||
# Handle schema file updates due to env_var changes
|
||||
if dict_key in env_var_changes and dict_key in new_yaml_dict:
|
||||
for name in env_var_changes[dict_key]:
|
||||
if name in exposure_diff["changed_or_deleted_names"]:
|
||||
continue
|
||||
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
|
||||
if elem:
|
||||
self.delete_schema_exposure(schema_file, elem)
|
||||
self.merge_patch(schema_file, dict_key, elem)
|
||||
handle_change("macros", self.delete_schema_macro_patch)
|
||||
handle_change("exposures", self.delete_schema_exposure)
|
||||
handle_change("metrics", self.delete_schema_metric)
|
||||
handle_change("groups", self.delete_schema_group)
|
||||
|
||||
# metrics
|
||||
dict_key = "metrics"
|
||||
metric_diff = self.get_diff_for("metrics", saved_yaml_dict, new_yaml_dict)
|
||||
if metric_diff["changed"]:
|
||||
for metric in metric_diff["changed"]:
|
||||
self.delete_schema_metric(schema_file, metric)
|
||||
self.merge_patch(schema_file, dict_key, metric)
|
||||
if metric_diff["deleted"]:
|
||||
for metric in metric_diff["deleted"]:
|
||||
self.delete_schema_metric(schema_file, metric)
|
||||
if metric_diff["added"]:
|
||||
for metric in metric_diff["added"]:
|
||||
self.merge_patch(schema_file, dict_key, metric)
|
||||
def _handle_element_change(
|
||||
self, schema_file, saved_yaml_dict, new_yaml_dict, env_var_changes, dict_key: str, delete
|
||||
):
|
||||
element_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict)
|
||||
if element_diff["changed"]:
|
||||
for element in element_diff["changed"]:
|
||||
delete(schema_file, element)
|
||||
self.merge_patch(schema_file, dict_key, element)
|
||||
if element_diff["deleted"]:
|
||||
for element in element_diff["deleted"]:
|
||||
delete(schema_file, element)
|
||||
if element_diff["added"]:
|
||||
for element in element_diff["added"]:
|
||||
self.merge_patch(schema_file, dict_key, element)
|
||||
# Handle schema file updates due to env_var changes
|
||||
if dict_key in env_var_changes and dict_key in new_yaml_dict:
|
||||
for name in env_var_changes[dict_key]:
|
||||
if name in metric_diff["changed_or_deleted_names"]:
|
||||
if name in element_diff["changed_or_deleted_names"]:
|
||||
continue
|
||||
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
|
||||
if elem:
|
||||
self.delete_schema_metric(schema_file, elem)
|
||||
self.merge_patch(schema_file, dict_key, elem)
|
||||
|
||||
# groups
|
||||
dict_key = "groups"
|
||||
group_diff = self.get_diff_for("groups", saved_yaml_dict, new_yaml_dict)
|
||||
if group_diff["changed"]:
|
||||
for group in group_diff["changed"]:
|
||||
self.delete_schema_group(schema_file, group)
|
||||
self.merge_patch(schema_file, dict_key, group)
|
||||
if group_diff["deleted"]:
|
||||
for group in group_diff["deleted"]:
|
||||
self.delete_schema_group(schema_file, group)
|
||||
if group_diff["added"]:
|
||||
for group in group_diff["added"]:
|
||||
self.merge_patch(schema_file, dict_key, group)
|
||||
# Handle schema file updates due to env_var changes
|
||||
if dict_key in env_var_changes and dict_key in new_yaml_dict:
|
||||
for name in env_var_changes[dict_key]:
|
||||
if name in group_diff["changed_or_deleted_names"]:
|
||||
continue
|
||||
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
|
||||
if elem:
|
||||
self.delete_schema_group(schema_file, elem)
|
||||
delete(schema_file, elem)
|
||||
self.merge_patch(schema_file, dict_key, elem)
|
||||
|
||||
# Take a "section" of the schema file yaml dictionary from saved and new schema files
|
||||
@@ -887,6 +817,8 @@ class PartialParsing:
|
||||
schema_file.sources.remove(unique_id)
|
||||
self.schedule_referencing_nodes_for_parsing(unique_id)
|
||||
|
||||
self.remove_tests(schema_file, "sources", source_name)
|
||||
|
||||
def delete_schema_macro_patch(self, schema_file, macro):
|
||||
# This is just macro patches that need to be reapplied
|
||||
macro_unique_id = None
|
||||
@@ -970,7 +902,6 @@ class PartialParsing:
|
||||
(orig_file, orig_source) = self.get_source_override_file_and_dict(source_dict)
|
||||
if orig_source:
|
||||
self.delete_schema_source(orig_file, orig_source)
|
||||
self.remove_tests(orig_file, "sources", orig_source["name"])
|
||||
self.merge_patch(orig_file, "sources", orig_source)
|
||||
self.add_to_pp_files(orig_file)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user