Factor Out Repeated Logic in the PartialParsing Class (#7952)

* CT-2711: Add remove_tests() call to delete_schema_source() so that call sites are more uniform with other node deletion call sites. This will enable further code factorization.

* CT-2711: Factor repeated code section (mostly) out of PartialParsing.handle_schema_file_changes()

* CT-2711: Factor a repeated code section out of schedule_nodes_for_parsing()
This commit is contained in:
Peter Webb
2023-06-26 15:20:50 -04:00
committed by GitHub
parent f9d4e9e03d
commit b37e5b5198

View File

@@ -1,11 +1,12 @@
import os import os
from copy import deepcopy from copy import deepcopy
from typing import MutableMapping, Dict, List from typing import MutableMapping, Dict, List, Callable
from dbt.contracts.graph.manifest import Manifest from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.files import ( from dbt.contracts.files import (
AnySourceFile, AnySourceFile,
ParseFileType, ParseFileType,
parse_file_type_to_parser, parse_file_type_to_parser,
SchemaSourceFile,
) )
from dbt.events.functions import fire_event from dbt.events.functions import fire_event
from dbt.events.base_types import EventLevel from dbt.events.base_types import EventLevel
@@ -403,41 +404,19 @@ class PartialParsing:
self.add_to_pp_files(self.saved_files[file_id]) self.add_to_pp_files(self.saved_files[file_id])
elif unique_id in self.saved_manifest.sources: elif unique_id in self.saved_manifest.sources:
source = self.saved_manifest.sources[unique_id] source = self.saved_manifest.sources[unique_id]
file_id = source.file_id self._schedule_for_parsing(
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]: "sources", source, source.source_name, self.delete_schema_source
schema_file = self.saved_files[file_id] )
sources = []
if "sources" in schema_file.dict_from_yaml:
sources = schema_file.dict_from_yaml["sources"]
source_element = self.get_schema_element(sources, source.source_name)
if source_element:
self.delete_schema_source(schema_file, source_element)
self.remove_tests(schema_file, "sources", source_element["name"])
self.merge_patch(schema_file, "sources", source_element)
elif unique_id in self.saved_manifest.exposures: elif unique_id in self.saved_manifest.exposures:
exposure = self.saved_manifest.exposures[unique_id] exposure = self.saved_manifest.exposures[unique_id]
file_id = exposure.file_id self._schedule_for_parsing(
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]: "exposures", exposure, exposure.name, self.delete_schema_exposure
schema_file = self.saved_files[file_id] )
exposures = []
if "exposures" in schema_file.dict_from_yaml:
exposures = schema_file.dict_from_yaml["exposures"]
exposure_element = self.get_schema_element(exposures, exposure.name)
if exposure_element:
self.delete_schema_exposure(schema_file, exposure_element)
self.merge_patch(schema_file, "exposures", exposure_element)
elif unique_id in self.saved_manifest.metrics: elif unique_id in self.saved_manifest.metrics:
metric = self.saved_manifest.metrics[unique_id] metric = self.saved_manifest.metrics[unique_id]
file_id = metric.file_id self._schedule_for_parsing(
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]: "metrics", metric, metric.name, self.delete_schema_metric
schema_file = self.saved_files[file_id] )
metrics = []
if "metrics" in schema_file.dict_from_yaml:
metrics = schema_file.dict_from_yaml["metrics"]
metric_element = self.get_schema_element(metrics, metric.name)
if metric_element:
self.delete_schema_metric(schema_file, metric_element)
self.merge_patch(schema_file, "metrics", metric_element)
elif unique_id in self.saved_manifest.macros: elif unique_id in self.saved_manifest.macros:
macro = self.saved_manifest.macros[unique_id] macro = self.saved_manifest.macros[unique_id]
file_id = macro.file_id file_id = macro.file_id
@@ -447,6 +426,19 @@ class PartialParsing:
self.saved_files[file_id] = deepcopy(self.new_files[file_id]) self.saved_files[file_id] = deepcopy(self.new_files[file_id])
self.add_to_pp_files(self.saved_files[file_id]) self.add_to_pp_files(self.saved_files[file_id])
def _schedule_for_parsing(self, dict_key: str, element, name, delete: Callable) -> None:
file_id = element.file_id
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]:
schema_file = self.saved_files[file_id]
elements = []
assert isinstance(schema_file, SchemaSourceFile)
if dict_key in schema_file.dict_from_yaml:
elements = schema_file.dict_from_yaml[dict_key]
schema_element = self.get_schema_element(elements, name)
if schema_element:
delete(schema_file, schema_element)
self.merge_patch(schema_file, dict_key, schema_element)
def delete_macro_file(self, source_file, follow_references=False): def delete_macro_file(self, source_file, follow_references=False):
self.check_for_special_deleted_macros(source_file) self.check_for_special_deleted_macros(source_file)
self.handle_macro_file_links(source_file, follow_references) self.handle_macro_file_links(source_file, follow_references)
@@ -538,7 +530,6 @@ class PartialParsing:
# This is a source patch; need to re-parse orig source # This is a source patch; need to re-parse orig source
self.remove_source_override_target(patch) self.remove_source_override_target(patch)
self.delete_schema_source(schema_file, patch) self.delete_schema_source(schema_file, patch)
self.remove_tests(schema_file, "sources", patch["name"])
self.merge_patch(schema_file, "sources", patch) self.merge_patch(schema_file, "sources", patch)
else: else:
file_id = node.file_id file_id = node.file_id
@@ -639,14 +630,12 @@ class PartialParsing:
if "overrides" in source: # This is a source patch; need to re-parse orig source if "overrides" in source: # This is a source patch; need to re-parse orig source
self.remove_source_override_target(source) self.remove_source_override_target(source)
self.delete_schema_source(schema_file, source) self.delete_schema_source(schema_file, source)
self.remove_tests(schema_file, dict_key, source["name"])
self.merge_patch(schema_file, dict_key, source) self.merge_patch(schema_file, dict_key, source)
if source_diff["deleted"]: if source_diff["deleted"]:
for source in source_diff["deleted"]: for source in source_diff["deleted"]:
if "overrides" in source: # This is a source patch; need to re-parse orig source if "overrides" in source: # This is a source patch; need to re-parse orig source
self.remove_source_override_target(source) self.remove_source_override_target(source)
self.delete_schema_source(schema_file, source) self.delete_schema_source(schema_file, source)
self.remove_tests(schema_file, dict_key, source["name"])
if source_diff["added"]: if source_diff["added"]:
for source in source_diff["added"]: for source in source_diff["added"]:
if "overrides" in source: # This is a source patch; need to re-parse orig source if "overrides" in source: # This is a source patch; need to re-parse orig source
@@ -662,99 +651,40 @@ class PartialParsing:
if "overrides" in source: if "overrides" in source:
self.remove_source_override_target(source) self.remove_source_override_target(source)
self.delete_schema_source(schema_file, source) self.delete_schema_source(schema_file, source)
self.remove_tests(schema_file, dict_key, source["name"])
self.merge_patch(schema_file, dict_key, source) self.merge_patch(schema_file, dict_key, source)
# macros def handle_change(key: str, delete: Callable):
dict_key = "macros" self._handle_element_change(
macro_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict) schema_file, saved_yaml_dict, new_yaml_dict, env_var_changes, key, delete
if macro_diff["changed"]: )
for macro in macro_diff["changed"]:
self.delete_schema_macro_patch(schema_file, macro)
self.merge_patch(schema_file, dict_key, macro)
if macro_diff["deleted"]:
for macro in macro_diff["deleted"]:
self.delete_schema_macro_patch(schema_file, macro)
if macro_diff["added"]:
for macro in macro_diff["added"]:
self.merge_patch(schema_file, dict_key, macro)
# Handle schema file updates due to env_var changes
if dict_key in env_var_changes and dict_key in new_yaml_dict:
for name in env_var_changes[dict_key]:
if name in macro_diff["changed_or_deleted_names"]:
continue
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem:
self.delete_schema_macro_patch(schema_file, elem)
self.merge_patch(schema_file, dict_key, elem)
# exposures handle_change("macros", self.delete_schema_macro_patch)
dict_key = "exposures" handle_change("exposures", self.delete_schema_exposure)
exposure_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict) handle_change("metrics", self.delete_schema_metric)
if exposure_diff["changed"]: handle_change("groups", self.delete_schema_group)
for exposure in exposure_diff["changed"]:
self.delete_schema_exposure(schema_file, exposure)
self.merge_patch(schema_file, dict_key, exposure)
if exposure_diff["deleted"]:
for exposure in exposure_diff["deleted"]:
self.delete_schema_exposure(schema_file, exposure)
if exposure_diff["added"]:
for exposure in exposure_diff["added"]:
self.merge_patch(schema_file, dict_key, exposure)
# Handle schema file updates due to env_var changes
if dict_key in env_var_changes and dict_key in new_yaml_dict:
for name in env_var_changes[dict_key]:
if name in exposure_diff["changed_or_deleted_names"]:
continue
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem:
self.delete_schema_exposure(schema_file, elem)
self.merge_patch(schema_file, dict_key, elem)
# metrics def _handle_element_change(
dict_key = "metrics" self, schema_file, saved_yaml_dict, new_yaml_dict, env_var_changes, dict_key: str, delete
metric_diff = self.get_diff_for("metrics", saved_yaml_dict, new_yaml_dict) ):
if metric_diff["changed"]: element_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict)
for metric in metric_diff["changed"]: if element_diff["changed"]:
self.delete_schema_metric(schema_file, metric) for element in element_diff["changed"]:
self.merge_patch(schema_file, dict_key, metric) delete(schema_file, element)
if metric_diff["deleted"]: self.merge_patch(schema_file, dict_key, element)
for metric in metric_diff["deleted"]: if element_diff["deleted"]:
self.delete_schema_metric(schema_file, metric) for element in element_diff["deleted"]:
if metric_diff["added"]: delete(schema_file, element)
for metric in metric_diff["added"]: if element_diff["added"]:
self.merge_patch(schema_file, dict_key, metric) for element in element_diff["added"]:
self.merge_patch(schema_file, dict_key, element)
# Handle schema file updates due to env_var changes # Handle schema file updates due to env_var changes
if dict_key in env_var_changes and dict_key in new_yaml_dict: if dict_key in env_var_changes and dict_key in new_yaml_dict:
for name in env_var_changes[dict_key]: for name in env_var_changes[dict_key]:
if name in metric_diff["changed_or_deleted_names"]: if name in element_diff["changed_or_deleted_names"]:
continue continue
elem = self.get_schema_element(new_yaml_dict[dict_key], name) elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem: if elem:
self.delete_schema_metric(schema_file, elem) delete(schema_file, elem)
self.merge_patch(schema_file, dict_key, elem)
# groups
dict_key = "groups"
group_diff = self.get_diff_for("groups", saved_yaml_dict, new_yaml_dict)
if group_diff["changed"]:
for group in group_diff["changed"]:
self.delete_schema_group(schema_file, group)
self.merge_patch(schema_file, dict_key, group)
if group_diff["deleted"]:
for group in group_diff["deleted"]:
self.delete_schema_group(schema_file, group)
if group_diff["added"]:
for group in group_diff["added"]:
self.merge_patch(schema_file, dict_key, group)
# Handle schema file updates due to env_var changes
if dict_key in env_var_changes and dict_key in new_yaml_dict:
for name in env_var_changes[dict_key]:
if name in group_diff["changed_or_deleted_names"]:
continue
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem:
self.delete_schema_group(schema_file, elem)
self.merge_patch(schema_file, dict_key, elem) self.merge_patch(schema_file, dict_key, elem)
# Take a "section" of the schema file yaml dictionary from saved and new schema files # Take a "section" of the schema file yaml dictionary from saved and new schema files
@@ -887,6 +817,8 @@ class PartialParsing:
schema_file.sources.remove(unique_id) schema_file.sources.remove(unique_id)
self.schedule_referencing_nodes_for_parsing(unique_id) self.schedule_referencing_nodes_for_parsing(unique_id)
self.remove_tests(schema_file, "sources", source_name)
def delete_schema_macro_patch(self, schema_file, macro): def delete_schema_macro_patch(self, schema_file, macro):
# This is just macro patches that need to be reapplied # This is just macro patches that need to be reapplied
macro_unique_id = None macro_unique_id = None
@@ -970,7 +902,6 @@ class PartialParsing:
(orig_file, orig_source) = self.get_source_override_file_and_dict(source_dict) (orig_file, orig_source) = self.get_source_override_file_and_dict(source_dict)
if orig_source: if orig_source:
self.delete_schema_source(orig_file, orig_source) self.delete_schema_source(orig_file, orig_source)
self.remove_tests(orig_file, "sources", orig_source["name"])
self.merge_patch(orig_file, "sources", orig_source) self.merge_patch(orig_file, "sources", orig_source)
self.add_to_pp_files(orig_file) self.add_to_pp_files(orig_file)