mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-21 05:21:27 +00:00
Factor Out Repeated Logic in the PartialParsing Class (#7952)
* CT-2711: Add remove_tests() call to delete_schema_source() so that call sites are more uniform with other node deletion call sites. This will enable further code factorization. * CT-2711: Factor repeated code section (mostly) out of PartialParsing.handle_schema_file_changes() * CT-2711: Factor a repeated code section out of schedule_nodes_for_parsing()
This commit is contained in:
@@ -1,11 +1,12 @@
|
|||||||
import os
|
import os
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
from typing import MutableMapping, Dict, List
|
from typing import MutableMapping, Dict, List, Callable
|
||||||
from dbt.contracts.graph.manifest import Manifest
|
from dbt.contracts.graph.manifest import Manifest
|
||||||
from dbt.contracts.files import (
|
from dbt.contracts.files import (
|
||||||
AnySourceFile,
|
AnySourceFile,
|
||||||
ParseFileType,
|
ParseFileType,
|
||||||
parse_file_type_to_parser,
|
parse_file_type_to_parser,
|
||||||
|
SchemaSourceFile,
|
||||||
)
|
)
|
||||||
from dbt.events.functions import fire_event
|
from dbt.events.functions import fire_event
|
||||||
from dbt.events.base_types import EventLevel
|
from dbt.events.base_types import EventLevel
|
||||||
@@ -403,41 +404,19 @@ class PartialParsing:
|
|||||||
self.add_to_pp_files(self.saved_files[file_id])
|
self.add_to_pp_files(self.saved_files[file_id])
|
||||||
elif unique_id in self.saved_manifest.sources:
|
elif unique_id in self.saved_manifest.sources:
|
||||||
source = self.saved_manifest.sources[unique_id]
|
source = self.saved_manifest.sources[unique_id]
|
||||||
file_id = source.file_id
|
self._schedule_for_parsing(
|
||||||
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]:
|
"sources", source, source.source_name, self.delete_schema_source
|
||||||
schema_file = self.saved_files[file_id]
|
)
|
||||||
sources = []
|
|
||||||
if "sources" in schema_file.dict_from_yaml:
|
|
||||||
sources = schema_file.dict_from_yaml["sources"]
|
|
||||||
source_element = self.get_schema_element(sources, source.source_name)
|
|
||||||
if source_element:
|
|
||||||
self.delete_schema_source(schema_file, source_element)
|
|
||||||
self.remove_tests(schema_file, "sources", source_element["name"])
|
|
||||||
self.merge_patch(schema_file, "sources", source_element)
|
|
||||||
elif unique_id in self.saved_manifest.exposures:
|
elif unique_id in self.saved_manifest.exposures:
|
||||||
exposure = self.saved_manifest.exposures[unique_id]
|
exposure = self.saved_manifest.exposures[unique_id]
|
||||||
file_id = exposure.file_id
|
self._schedule_for_parsing(
|
||||||
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]:
|
"exposures", exposure, exposure.name, self.delete_schema_exposure
|
||||||
schema_file = self.saved_files[file_id]
|
)
|
||||||
exposures = []
|
|
||||||
if "exposures" in schema_file.dict_from_yaml:
|
|
||||||
exposures = schema_file.dict_from_yaml["exposures"]
|
|
||||||
exposure_element = self.get_schema_element(exposures, exposure.name)
|
|
||||||
if exposure_element:
|
|
||||||
self.delete_schema_exposure(schema_file, exposure_element)
|
|
||||||
self.merge_patch(schema_file, "exposures", exposure_element)
|
|
||||||
elif unique_id in self.saved_manifest.metrics:
|
elif unique_id in self.saved_manifest.metrics:
|
||||||
metric = self.saved_manifest.metrics[unique_id]
|
metric = self.saved_manifest.metrics[unique_id]
|
||||||
file_id = metric.file_id
|
self._schedule_for_parsing(
|
||||||
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]:
|
"metrics", metric, metric.name, self.delete_schema_metric
|
||||||
schema_file = self.saved_files[file_id]
|
)
|
||||||
metrics = []
|
|
||||||
if "metrics" in schema_file.dict_from_yaml:
|
|
||||||
metrics = schema_file.dict_from_yaml["metrics"]
|
|
||||||
metric_element = self.get_schema_element(metrics, metric.name)
|
|
||||||
if metric_element:
|
|
||||||
self.delete_schema_metric(schema_file, metric_element)
|
|
||||||
self.merge_patch(schema_file, "metrics", metric_element)
|
|
||||||
elif unique_id in self.saved_manifest.macros:
|
elif unique_id in self.saved_manifest.macros:
|
||||||
macro = self.saved_manifest.macros[unique_id]
|
macro = self.saved_manifest.macros[unique_id]
|
||||||
file_id = macro.file_id
|
file_id = macro.file_id
|
||||||
@@ -447,6 +426,19 @@ class PartialParsing:
|
|||||||
self.saved_files[file_id] = deepcopy(self.new_files[file_id])
|
self.saved_files[file_id] = deepcopy(self.new_files[file_id])
|
||||||
self.add_to_pp_files(self.saved_files[file_id])
|
self.add_to_pp_files(self.saved_files[file_id])
|
||||||
|
|
||||||
|
def _schedule_for_parsing(self, dict_key: str, element, name, delete: Callable) -> None:
|
||||||
|
file_id = element.file_id
|
||||||
|
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]:
|
||||||
|
schema_file = self.saved_files[file_id]
|
||||||
|
elements = []
|
||||||
|
assert isinstance(schema_file, SchemaSourceFile)
|
||||||
|
if dict_key in schema_file.dict_from_yaml:
|
||||||
|
elements = schema_file.dict_from_yaml[dict_key]
|
||||||
|
schema_element = self.get_schema_element(elements, name)
|
||||||
|
if schema_element:
|
||||||
|
delete(schema_file, schema_element)
|
||||||
|
self.merge_patch(schema_file, dict_key, schema_element)
|
||||||
|
|
||||||
def delete_macro_file(self, source_file, follow_references=False):
|
def delete_macro_file(self, source_file, follow_references=False):
|
||||||
self.check_for_special_deleted_macros(source_file)
|
self.check_for_special_deleted_macros(source_file)
|
||||||
self.handle_macro_file_links(source_file, follow_references)
|
self.handle_macro_file_links(source_file, follow_references)
|
||||||
@@ -538,7 +530,6 @@ class PartialParsing:
|
|||||||
# This is a source patch; need to re-parse orig source
|
# This is a source patch; need to re-parse orig source
|
||||||
self.remove_source_override_target(patch)
|
self.remove_source_override_target(patch)
|
||||||
self.delete_schema_source(schema_file, patch)
|
self.delete_schema_source(schema_file, patch)
|
||||||
self.remove_tests(schema_file, "sources", patch["name"])
|
|
||||||
self.merge_patch(schema_file, "sources", patch)
|
self.merge_patch(schema_file, "sources", patch)
|
||||||
else:
|
else:
|
||||||
file_id = node.file_id
|
file_id = node.file_id
|
||||||
@@ -639,14 +630,12 @@ class PartialParsing:
|
|||||||
if "overrides" in source: # This is a source patch; need to re-parse orig source
|
if "overrides" in source: # This is a source patch; need to re-parse orig source
|
||||||
self.remove_source_override_target(source)
|
self.remove_source_override_target(source)
|
||||||
self.delete_schema_source(schema_file, source)
|
self.delete_schema_source(schema_file, source)
|
||||||
self.remove_tests(schema_file, dict_key, source["name"])
|
|
||||||
self.merge_patch(schema_file, dict_key, source)
|
self.merge_patch(schema_file, dict_key, source)
|
||||||
if source_diff["deleted"]:
|
if source_diff["deleted"]:
|
||||||
for source in source_diff["deleted"]:
|
for source in source_diff["deleted"]:
|
||||||
if "overrides" in source: # This is a source patch; need to re-parse orig source
|
if "overrides" in source: # This is a source patch; need to re-parse orig source
|
||||||
self.remove_source_override_target(source)
|
self.remove_source_override_target(source)
|
||||||
self.delete_schema_source(schema_file, source)
|
self.delete_schema_source(schema_file, source)
|
||||||
self.remove_tests(schema_file, dict_key, source["name"])
|
|
||||||
if source_diff["added"]:
|
if source_diff["added"]:
|
||||||
for source in source_diff["added"]:
|
for source in source_diff["added"]:
|
||||||
if "overrides" in source: # This is a source patch; need to re-parse orig source
|
if "overrides" in source: # This is a source patch; need to re-parse orig source
|
||||||
@@ -662,99 +651,40 @@ class PartialParsing:
|
|||||||
if "overrides" in source:
|
if "overrides" in source:
|
||||||
self.remove_source_override_target(source)
|
self.remove_source_override_target(source)
|
||||||
self.delete_schema_source(schema_file, source)
|
self.delete_schema_source(schema_file, source)
|
||||||
self.remove_tests(schema_file, dict_key, source["name"])
|
|
||||||
self.merge_patch(schema_file, dict_key, source)
|
self.merge_patch(schema_file, dict_key, source)
|
||||||
|
|
||||||
# macros
|
def handle_change(key: str, delete: Callable):
|
||||||
dict_key = "macros"
|
self._handle_element_change(
|
||||||
macro_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict)
|
schema_file, saved_yaml_dict, new_yaml_dict, env_var_changes, key, delete
|
||||||
if macro_diff["changed"]:
|
)
|
||||||
for macro in macro_diff["changed"]:
|
|
||||||
self.delete_schema_macro_patch(schema_file, macro)
|
|
||||||
self.merge_patch(schema_file, dict_key, macro)
|
|
||||||
if macro_diff["deleted"]:
|
|
||||||
for macro in macro_diff["deleted"]:
|
|
||||||
self.delete_schema_macro_patch(schema_file, macro)
|
|
||||||
if macro_diff["added"]:
|
|
||||||
for macro in macro_diff["added"]:
|
|
||||||
self.merge_patch(schema_file, dict_key, macro)
|
|
||||||
# Handle schema file updates due to env_var changes
|
|
||||||
if dict_key in env_var_changes and dict_key in new_yaml_dict:
|
|
||||||
for name in env_var_changes[dict_key]:
|
|
||||||
if name in macro_diff["changed_or_deleted_names"]:
|
|
||||||
continue
|
|
||||||
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
|
|
||||||
if elem:
|
|
||||||
self.delete_schema_macro_patch(schema_file, elem)
|
|
||||||
self.merge_patch(schema_file, dict_key, elem)
|
|
||||||
|
|
||||||
# exposures
|
handle_change("macros", self.delete_schema_macro_patch)
|
||||||
dict_key = "exposures"
|
handle_change("exposures", self.delete_schema_exposure)
|
||||||
exposure_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict)
|
handle_change("metrics", self.delete_schema_metric)
|
||||||
if exposure_diff["changed"]:
|
handle_change("groups", self.delete_schema_group)
|
||||||
for exposure in exposure_diff["changed"]:
|
|
||||||
self.delete_schema_exposure(schema_file, exposure)
|
|
||||||
self.merge_patch(schema_file, dict_key, exposure)
|
|
||||||
if exposure_diff["deleted"]:
|
|
||||||
for exposure in exposure_diff["deleted"]:
|
|
||||||
self.delete_schema_exposure(schema_file, exposure)
|
|
||||||
if exposure_diff["added"]:
|
|
||||||
for exposure in exposure_diff["added"]:
|
|
||||||
self.merge_patch(schema_file, dict_key, exposure)
|
|
||||||
# Handle schema file updates due to env_var changes
|
|
||||||
if dict_key in env_var_changes and dict_key in new_yaml_dict:
|
|
||||||
for name in env_var_changes[dict_key]:
|
|
||||||
if name in exposure_diff["changed_or_deleted_names"]:
|
|
||||||
continue
|
|
||||||
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
|
|
||||||
if elem:
|
|
||||||
self.delete_schema_exposure(schema_file, elem)
|
|
||||||
self.merge_patch(schema_file, dict_key, elem)
|
|
||||||
|
|
||||||
# metrics
|
def _handle_element_change(
|
||||||
dict_key = "metrics"
|
self, schema_file, saved_yaml_dict, new_yaml_dict, env_var_changes, dict_key: str, delete
|
||||||
metric_diff = self.get_diff_for("metrics", saved_yaml_dict, new_yaml_dict)
|
):
|
||||||
if metric_diff["changed"]:
|
element_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict)
|
||||||
for metric in metric_diff["changed"]:
|
if element_diff["changed"]:
|
||||||
self.delete_schema_metric(schema_file, metric)
|
for element in element_diff["changed"]:
|
||||||
self.merge_patch(schema_file, dict_key, metric)
|
delete(schema_file, element)
|
||||||
if metric_diff["deleted"]:
|
self.merge_patch(schema_file, dict_key, element)
|
||||||
for metric in metric_diff["deleted"]:
|
if element_diff["deleted"]:
|
||||||
self.delete_schema_metric(schema_file, metric)
|
for element in element_diff["deleted"]:
|
||||||
if metric_diff["added"]:
|
delete(schema_file, element)
|
||||||
for metric in metric_diff["added"]:
|
if element_diff["added"]:
|
||||||
self.merge_patch(schema_file, dict_key, metric)
|
for element in element_diff["added"]:
|
||||||
|
self.merge_patch(schema_file, dict_key, element)
|
||||||
# Handle schema file updates due to env_var changes
|
# Handle schema file updates due to env_var changes
|
||||||
if dict_key in env_var_changes and dict_key in new_yaml_dict:
|
if dict_key in env_var_changes and dict_key in new_yaml_dict:
|
||||||
for name in env_var_changes[dict_key]:
|
for name in env_var_changes[dict_key]:
|
||||||
if name in metric_diff["changed_or_deleted_names"]:
|
if name in element_diff["changed_or_deleted_names"]:
|
||||||
continue
|
continue
|
||||||
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
|
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
|
||||||
if elem:
|
if elem:
|
||||||
self.delete_schema_metric(schema_file, elem)
|
delete(schema_file, elem)
|
||||||
self.merge_patch(schema_file, dict_key, elem)
|
|
||||||
|
|
||||||
# groups
|
|
||||||
dict_key = "groups"
|
|
||||||
group_diff = self.get_diff_for("groups", saved_yaml_dict, new_yaml_dict)
|
|
||||||
if group_diff["changed"]:
|
|
||||||
for group in group_diff["changed"]:
|
|
||||||
self.delete_schema_group(schema_file, group)
|
|
||||||
self.merge_patch(schema_file, dict_key, group)
|
|
||||||
if group_diff["deleted"]:
|
|
||||||
for group in group_diff["deleted"]:
|
|
||||||
self.delete_schema_group(schema_file, group)
|
|
||||||
if group_diff["added"]:
|
|
||||||
for group in group_diff["added"]:
|
|
||||||
self.merge_patch(schema_file, dict_key, group)
|
|
||||||
# Handle schema file updates due to env_var changes
|
|
||||||
if dict_key in env_var_changes and dict_key in new_yaml_dict:
|
|
||||||
for name in env_var_changes[dict_key]:
|
|
||||||
if name in group_diff["changed_or_deleted_names"]:
|
|
||||||
continue
|
|
||||||
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
|
|
||||||
if elem:
|
|
||||||
self.delete_schema_group(schema_file, elem)
|
|
||||||
self.merge_patch(schema_file, dict_key, elem)
|
self.merge_patch(schema_file, dict_key, elem)
|
||||||
|
|
||||||
# Take a "section" of the schema file yaml dictionary from saved and new schema files
|
# Take a "section" of the schema file yaml dictionary from saved and new schema files
|
||||||
@@ -887,6 +817,8 @@ class PartialParsing:
|
|||||||
schema_file.sources.remove(unique_id)
|
schema_file.sources.remove(unique_id)
|
||||||
self.schedule_referencing_nodes_for_parsing(unique_id)
|
self.schedule_referencing_nodes_for_parsing(unique_id)
|
||||||
|
|
||||||
|
self.remove_tests(schema_file, "sources", source_name)
|
||||||
|
|
||||||
def delete_schema_macro_patch(self, schema_file, macro):
|
def delete_schema_macro_patch(self, schema_file, macro):
|
||||||
# This is just macro patches that need to be reapplied
|
# This is just macro patches that need to be reapplied
|
||||||
macro_unique_id = None
|
macro_unique_id = None
|
||||||
@@ -970,7 +902,6 @@ class PartialParsing:
|
|||||||
(orig_file, orig_source) = self.get_source_override_file_and_dict(source_dict)
|
(orig_file, orig_source) = self.get_source_override_file_and_dict(source_dict)
|
||||||
if orig_source:
|
if orig_source:
|
||||||
self.delete_schema_source(orig_file, orig_source)
|
self.delete_schema_source(orig_file, orig_source)
|
||||||
self.remove_tests(orig_file, "sources", orig_source["name"])
|
|
||||||
self.merge_patch(orig_file, "sources", orig_source)
|
self.merge_patch(orig_file, "sources", orig_source)
|
||||||
self.add_to_pp_files(orig_file)
|
self.add_to_pp_files(orig_file)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user