Compare commits

...

9 Commits

Author SHA1 Message Date
Chenyu Li
0210988613 better node number 2022-10-28 11:35:01 -07:00
Chenyu Li
19ddfd7894 format 2022-10-28 11:00:15 -07:00
Chenyu Li
3fe50426a4 add variable for restart from a run 2022-10-28 11:00:13 -07:00
Colin
0f15df64ba use graph node object 2022-10-28 10:08:19 -07:00
Chenyu Li
8e84d29e94 seconf iteration of abstract away graph 2022-10-27 16:01:51 -07:00
Chenyu Li
cf73c5696a first iteration of abstract away graph 2022-10-27 12:20:50 -07:00
Colin
d722b3eb46 update abstraction logic 2022-10-27 10:42:33 -07:00
Colin
5c85356b0a graph_node dataclass 2022-10-27 10:21:12 -07:00
Colin
44a740327a initial 2022-10-27 10:19:00 -07:00
9 changed files with 348 additions and 115 deletions

View File

@@ -0,0 +1,10 @@
from dataclasses import dataclass
@dataclass
class GraphNode:
name: str
in_degree: int
status: str
update_date: int
start_date: int

View File

@@ -0,0 +1,86 @@
from typing import Set, Iterable, Iterator, Optional, NewType, Union
from itertools import product
import networkx as nx # type: ignore
from dbt.exceptions import InternalException, NotImplementedException
from .graph import Graph
UniqueId = NewType("UniqueId", str)
class NetworkXGraph(Graph):
"""A wrapper around the networkx graph that understands SelectionCriteria
and how they interact with the graph.
"""
graph: nx.DiGraph
def __init__(self, graph: Union[Graph, nx.DiGraph]):
if isinstance(graph, nx.DiGraph):
self.graph = graph
else:
# Need to implement
raise NotImplementedException("Doh!")
def nodes(self) -> Set[UniqueId]:
return set(self.graph.nodes())
def edges(self):
return self.graph.edges()
def __iter__(self) -> Iterator[UniqueId]:
return iter(self.nodes())
def ancestors(self, node: UniqueId, max_depth: Optional[int] = None) -> Set[UniqueId]:
"""Returns all nodes having a path to `node` in `graph`"""
if not self.graph.has_node(node):
raise InternalException(f"Node {node} not found in the graph!")
return {
child
for _, child in nx.bfs_edges(self.graph, node, reverse=True, depth_limit=max_depth)
}
def in_degree(self):
self.graph.in_degree()
def descendants(self, node: UniqueId, max_depth: Optional[int] = None) -> Set[UniqueId]:
"""Returns all nodes reachable from `node` in `graph`"""
if not self.graph.has_node(node):
raise InternalException(f"Node {node} not found in the graph!")
return {child for _, child in nx.bfs_edges(self.graph, node, depth_limit=max_depth)}
def get_subset_graph(self, selected: Iterable[UniqueId]) -> "Graph":
"""Create and return a new graph that is a shallow copy of the graph,
but with only the nodes in include_nodes. Transitive edges across
removed nodes are preserved as explicit new edges.
"""
new_graph = self.graph.copy()
include_nodes = set(selected)
for node in self:
if node not in include_nodes:
source_nodes = [x for x, _ in new_graph.in_edges(node)]
target_nodes = [x for _, x in new_graph.out_edges(node)]
new_edges = product(source_nodes, target_nodes)
non_cyclic_new_edges = [
(source, target) for source, target in new_edges if source != target
] # removes cyclic refs
new_graph.add_edges_from(non_cyclic_new_edges)
new_graph.remove_node(node)
for node in include_nodes:
if node not in new_graph:
raise ValueError(
"Couldn't find model '{}' -- does it exist or is it disabled?".format(node)
)
return Graph(new_graph)
def subgraph(self, nodes: Iterable[UniqueId]) -> "Graph":
return Graph(self.graph.subgraph(nodes))
def remove_node(self, node_id: UniqueId) -> None:
return self.graph.remove_node(node_id)

View File

@@ -0,0 +1,96 @@
import networkx as nx # type: ignore
from typing import Dict, Set, List, Generator
from .graph import UniqueId
from .queue import GraphQueue
from dbt.contracts.graph.parsed import ParsedSourceDefinition, ParsedExposure, ParsedMetric
from dbt.contracts.graph.manifest import Manifest
from dbt.node_types import NodeType
class NetworkxGraphQueue(GraphQueue):
"""A fancy queue that is backed by the dependency graph.
Note: this will mutate input!
This queue is thread-safe for `mark_done` calls, though you must ensure
that separate threads do not call `.empty()` or `__len__()` and `.get()` at
the same time, as there is an unlocked race!
"""
def __init__(self, graph: nx.DiGraph, manifest: Manifest, selected: Set[UniqueId]) -> None:
# store the 'score' of each node as a number. Lower is higher priority.
self._scores = self._get_scores(graph)
self.graph = graph
super().__init__(graph, manifest, selected)
def _include_in_cost(self, node_id: UniqueId) -> bool:
node = self.manifest.expect(node_id)
if node.resource_type != NodeType.Model:
return False
# must be a Model - tell mypy this won't be a Source or Exposure or Metric
assert not isinstance(node, (ParsedSourceDefinition, ParsedExposure, ParsedMetric))
if node.is_ephemeral:
return False
return True
@staticmethod
def _grouped_topological_sort(
graph: nx.DiGraph,
) -> Generator[List[str], None, None]:
"""Topological sort of given graph that groups ties.
Adapted from `nx.topological_sort`, this function returns a topo sort of a graph however
instead of arbitrarily ordering ties in the sort order, ties are grouped together in
lists.
Args:
graph: The graph to be sorted.
Returns:
A generator that yields lists of nodes, one list per graph depth level.
"""
indegree_map = {v: d for v, d in graph.in_degree() if d > 0}
zero_indegree = [v for v, d in graph.in_degree() if d == 0]
while zero_indegree:
yield zero_indegree
new_zero_indegree = []
for v in zero_indegree:
for _, child in graph.edges(v):
indegree_map[child] -= 1
if not indegree_map[child]:
new_zero_indegree.append(child)
zero_indegree = new_zero_indegree
def _get_scores(self, graph: nx.DiGraph) -> Dict[str, int]:
"""Scoring nodes for processing order.
Scores are calculated by the graph depth level. Lowest score (0) should be processed first.
Args:
graph: The graph to be scored.
Returns:
A dictionary consisting of `node name`:`score` pairs.
"""
# split graph by connected subgraphs
subgraphs = (graph.subgraph(x) for x in nx.connected_components(nx.Graph(graph)))
# score all nodes in all subgraphs
scores = {}
for subgraph in subgraphs:
grouped_nodes = self._grouped_topological_sort(subgraph)
for level, group in enumerate(grouped_nodes):
for node in group:
scores[node] = level
return scores
def find_new_additions(self) -> None:
"""Find any nodes in the graph that need to be added to the internal
queue and add them.
"""
for node, in_degree in self.graph.in_degree():
if not self._already_known(node) and in_degree == 0:
self.inner.put((self._scores[node], node))
self.queued.add(node)
def remove_node_from_graph(self, node_id):
self.graph.remove_node(node_id)
def get_node_num(self):
return len(self.graph.nodes())

View File

@@ -1,17 +1,15 @@
import networkx as nx # type: ignore
from abc import ABC, abstractmethod
import threading
from queue import PriorityQueue
from typing import Dict, Set, List, Generator, Optional
from typing import Set, Optional
from .graph import UniqueId
from dbt.contracts.graph.parsed import ParsedSourceDefinition, ParsedExposure, ParsedMetric
from .graph import Graph, UniqueId
from dbt.contracts.graph.compiled import GraphMemberNode
from dbt.contracts.graph.manifest import Manifest
from dbt.node_types import NodeType
class GraphQueue:
class GraphQueue(ABC):
"""A fancy queue that is backed by the dependency graph.
Note: this will mutate input!
@@ -20,8 +18,7 @@ class GraphQueue:
the same time, as there is an unlocked race!
"""
def __init__(self, graph: nx.DiGraph, manifest: Manifest, selected: Set[UniqueId]):
self.graph = graph
def __init__(self, graph: Graph, manifest: Manifest, selected: Set[UniqueId]):
self.manifest = manifest
self._selected = selected
# store the queue as a priority queue.
@@ -33,88 +30,34 @@ class GraphQueue:
self.queued: Set[UniqueId] = set()
# this lock controls most things
self.lock = threading.Lock()
# store the 'score' of each node as a number. Lower is higher priority.
self._scores = self._get_scores(self.graph)
# populate the initial queue
self._find_new_additions()
self.find_new_additions()
# awaits after task end
self.some_task_done = threading.Condition(self.lock)
def _mark_in_progress(self, node_id: UniqueId) -> None:
"""Mark the node as 'in progress'.
Callers must hold the lock.
:param str node_id: The node ID to mark as in progress.
"""
self.queued.remove(node_id)
self.in_progress.add(node_id)
def join(self) -> None:
"""Join the queue. Blocks until all tasks are marked as done.
Make sure not to call this before the queue reports that it is empty.
"""
self.inner.join()
def get_selected_nodes(self) -> Set[UniqueId]:
return self._selected.copy()
def _include_in_cost(self, node_id: UniqueId) -> bool:
node = self.manifest.expect(node_id)
if node.resource_type != NodeType.Model:
return False
# must be a Model - tell mypy this won't be a Source or Exposure or Metric
assert not isinstance(node, (ParsedSourceDefinition, ParsedExposure, ParsedMetric))
if node.is_ephemeral:
return False
return True
@staticmethod
def _grouped_topological_sort(
graph: nx.DiGraph,
) -> Generator[List[str], None, None]:
"""Topological sort of given graph that groups ties.
Adapted from `nx.topological_sort`, this function returns a topo sort of a graph however
instead of arbitrarily ordering ties in the sort order, ties are grouped together in
lists.
Args:
graph: The graph to be sorted.
Returns:
A generator that yields lists of nodes, one list per graph depth level.
"""
indegree_map = {v: d for v, d in graph.in_degree() if d > 0}
zero_indegree = [v for v, d in graph.in_degree() if d == 0]
while zero_indegree:
yield zero_indegree
new_zero_indegree = []
for v in zero_indegree:
for _, child in graph.edges(v):
indegree_map[child] -= 1
if not indegree_map[child]:
new_zero_indegree.append(child)
zero_indegree = new_zero_indegree
def _get_scores(self, graph: nx.DiGraph) -> Dict[str, int]:
"""Scoring nodes for processing order.
Scores are calculated by the graph depth level. Lowest score (0) should be processed first.
Args:
graph: The graph to be scored.
Returns:
A dictionary consisting of `node name`:`score` pairs.
"""
# split graph by connected subgraphs
subgraphs = (graph.subgraph(x) for x in nx.connected_components(nx.Graph(graph)))
# score all nodes in all subgraphs
scores = {}
for subgraph in subgraphs:
grouped_nodes = self._grouped_topological_sort(subgraph)
for level, group in enumerate(grouped_nodes):
for node in group:
scores[node] = level
return scores
def get(self, block: bool = True, timeout: Optional[float] = None) -> GraphMemberNode:
"""Get a node off the inner priority queue. By default, this blocks.
This takes the lock, but only for part of it.
:param block: If True, block until the inner queue has data
:param timeout: If set, block for timeout seconds waiting for data.
:return: The node as present in the manifest.
See `queue.PriorityQueue` for more information on `get()` behavior and
exceptions.
"""
@@ -131,7 +74,7 @@ class GraphQueue:
This takes the lock.
"""
with self.lock:
return len(self.graph) - len(self.in_progress)
return self.get_node_num() - len(self.in_progress)
def empty(self) -> bool:
"""The graph queue is 'empty' if it all remaining nodes in the graph
@@ -152,46 +95,18 @@ class GraphQueue:
"""
return node in self.in_progress or node in self.queued
def _find_new_additions(self) -> None:
"""Find any nodes in the graph that need to be added to the internal
queue and add them.
"""
for node, in_degree in self.graph.in_degree():
if not self._already_known(node) and in_degree == 0:
self.inner.put((self._scores[node], node))
self.queued.add(node)
def mark_done(self, node_id: UniqueId) -> None:
"""Given a node's unique ID, mark it as done.
This method takes the lock.
:param str node_id: The node ID to mark as complete.
"""
with self.lock:
self.in_progress.remove(node_id)
self.graph.remove_node(node_id)
self._find_new_additions()
self.remove_node_from_graph(node_id)
self.find_new_additions()
self.inner.task_done()
self.some_task_done.notify_all()
def _mark_in_progress(self, node_id: UniqueId) -> None:
"""Mark the node as 'in progress'.
Callers must hold the lock.
:param str node_id: The node ID to mark as in progress.
"""
self.queued.remove(node_id)
self.in_progress.add(node_id)
def join(self) -> None:
"""Join the queue. Blocks until all tasks are marked as done.
Make sure not to call this before the queue reports that it is empty.
"""
self.inner.join()
def wait_until_something_was_done(self) -> int:
"""Block until a task is done, then return the number of unfinished
tasks.
@@ -199,3 +114,15 @@ class GraphQueue:
with self.lock:
self.some_task_done.wait()
return self.inner.unfinished_tasks
@abstractmethod
def find_new_additions(self):
pass
@abstractmethod
def remove_node_from_graph(self, node_id):
pass
@abstractmethod
def get_node_num(self):
pass

View File

@@ -0,0 +1,96 @@
from dataclasses import asdict
from datetime import datetime
import networkx as nx # type: ignore
import os
from typing import Set
import redis
import time
from .graph_node import GraphNode
from redisgraph import Node, Edge, Graph
from .graph import UniqueId
from .queue import GraphQueue
from dbt.contracts.graph.manifest import Manifest
class RedisGraphQueue(GraphQueue):
# TODO this skips ephemeral model
def __init__(self, graph: nx.DiGraph, manifest: Manifest, selected: Set[UniqueId]) -> None:
# create entry on run_results,
# create new graph in redis from graph
previous_run_id = os.getenv("PREVIOUS_RUN_ID")
self.r = redis.Redis(host="localhost", port=6379)
if previous_run_id:
self.name = int(previous_run_id)
self.redis_graph = Graph(self.name, self.r)
# mark current node
query = """MATCH (p:node {status: 'queued'}) SET p.status = 'created'"""
self.redis_graph.query(query)
query = """MATCH (p:node {status: 'created'}) RETURN p.name"""
result = self.redis_graph.query(query)
self.node_count = len(result.result_set)
else:
self.name = int(time.time())
self.redis_graph = Graph(self.name, self.r)
nodes = {}
for node_name, in_degree in graph.in_degree():
start_time = int(datetime.now().timestamp())
properties = GraphNode(
name=node_name,
in_degree=in_degree,
status="created",
update_date=start_time,
start_date=start_time
)
curr_node = Node(
label="node",
properties=asdict(properties),
)
self.redis_graph.add_node(curr_node)
nodes[node_name] = curr_node
for child_name, father_name in graph.edges():
edge = Edge(nodes[child_name], "is_child_of", nodes[father_name])
self.redis_graph.add_edge(edge)
self.redis_graph.commit()
self.node_count = len(graph.nodes)
super().__init__(graph, manifest, selected)
def find_new_additions(self) -> None:
"""Find any nodes in the graph that need to be added to the internal
queue and add them.
"""
query = """MATCH (p:node {in_degree :0, status: 'created'})
RETURN p.name"""
result = self.redis_graph.query(query)
for name in result.result_set:
node = name[0]
if not self._already_known(node):
self.inner.put((0, node))
self.queued.add(node)
query = """MATCH (p:node {in_degree :0, status: 'created'})
set p.status = 'queued'"""
self.redis_graph.query(query)
def remove_node_from_graph(self, node_id):
# find all childnode of current
self.node_count -= 1
query = f"""MATCH (p:node {{name: '{node_id}'}})- [v:is_child_of] ->(c:node)
RETURN c.name, c.in_degree"""
result = self.redis_graph.query(query)
# reduce the in degree of them
for model_name, in_degree in result.result_set:
query = (
f"""MATCH (p:node {{name: '{model_name}'}}) SET p.in_degree = {in_degree - 1}"""
)
self.redis_graph.query(query)
# mark current node
current_time = int(datetime.now().timestamp())
query = f"""MATCH (p:node {{name: '{node_id}'}}) SET p.status = 'done', p.update_date = {current_time}"""
self.redis_graph.query(query)
def get_node_num(self):
return self.node_count

View File

@@ -1,7 +1,10 @@
from typing import Set, List, Optional, Tuple
import os
from .graph import Graph, UniqueId
from .queue import GraphQueue
from .networkx_queue import NetworkxGraphQueue
from .redis_queue import RedisGraphQueue
from .selector_methods import MethodManager
from .selector_spec import SelectionCriteria, SelectionSpec, IndirectSelection
@@ -275,7 +278,10 @@ class NodeSelector(MethodManager):
selected_resources.set_selected_resources(selected_nodes)
new_graph = self.full_graph.get_subset_graph(selected_nodes)
# should we give a way here for consumers to mutate the graph?
return GraphQueue(new_graph.graph, self.manifest, selected_nodes)
if os.getenv("DBT_BACKEND") == "REDIS":
return RedisGraphQueue(new_graph.graph, self.manifest, selected_nodes)
else:
return NetworkxGraphQueue(new_graph.graph, self.manifest, selected_nodes)
class ResourceTypeSelector(NodeSelector):

View File

@@ -50,10 +50,7 @@ def _match_to_int(match: Dict[str, str], key: str) -> Optional[int]:
SelectionSpec = Union[
"SelectionCriteria",
"SelectionIntersection",
"SelectionDifference",
"SelectionUnion",
"SelectionCriteria", "SelectionIntersection", "SelectionDifference", "SelectionUnion"
]

View File

@@ -298,6 +298,14 @@ def _build_base_subparser():
string, eg. '{my_variable: my_value}'
""",
)
base_subparser.add_argument(
"--prev-run-id",
type=str,
default="",
help="""
resume from previous run id
""",
)
# if set, log all cache events. This is extremely verbose!
base_subparser.add_argument(
@@ -1185,6 +1193,10 @@ def parse_args(args, cls=DBTArgumentParser):
sys.exit(1)
parsed = p.parse_args(args)
if parsed.prev_run_id:
os.environ["PREVIOUS_RUN_ID"] = parsed.prev_run_id
else:
os.environ["PREVIOUS_RUN_ID"] = ""
# profiles_dir is set before subcommands and after, so normalize
if hasattr(parsed, "sub_profiles_dir"):

View File

@@ -176,8 +176,8 @@ class GraphRunnableTask(ManifestTask):
raise InternalException(
f"Node selection returned {uid}, expected a node or a source"
)
self.num_nodes = len([n for n in self._flattened_nodes if not n.is_ephemeral_model])
self.num_nodes = self.job_queue.get_node_num()
# self.num_nodes = len([n for n in self._flattened_nodes if not n.is_ephemeral_model])
def raise_on_first_error(self):
return False
@@ -290,6 +290,9 @@ class GraphRunnableTask(ManifestTask):
if self.job_queue is None:
raise InternalException("Got to run_queue callback with no job queue set")
self.job_queue.mark_done(result.node.unique_id)
import time
time.sleep(3)
while not self.job_queue.empty():
node = self.job_queue.get()