forked from repo-mirrors/dbt-core
Compare commits
9 Commits
validate-p
...
mallet-hac
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0210988613 | ||
|
|
19ddfd7894 | ||
|
|
3fe50426a4 | ||
|
|
0f15df64ba | ||
|
|
8e84d29e94 | ||
|
|
cf73c5696a | ||
|
|
d722b3eb46 | ||
|
|
5c85356b0a | ||
|
|
44a740327a |
10
core/dbt/graph/graph_node.py
Normal file
10
core/dbt/graph/graph_node.py
Normal file
@@ -0,0 +1,10 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class GraphNode:
|
||||
name: str
|
||||
in_degree: int
|
||||
status: str
|
||||
update_date: int
|
||||
start_date: int
|
||||
86
core/dbt/graph/networkx_graph.py
Normal file
86
core/dbt/graph/networkx_graph.py
Normal file
@@ -0,0 +1,86 @@
|
||||
from typing import Set, Iterable, Iterator, Optional, NewType, Union
|
||||
from itertools import product
|
||||
import networkx as nx # type: ignore
|
||||
|
||||
from dbt.exceptions import InternalException, NotImplementedException
|
||||
from .graph import Graph
|
||||
|
||||
UniqueId = NewType("UniqueId", str)
|
||||
|
||||
|
||||
class NetworkXGraph(Graph):
|
||||
"""A wrapper around the networkx graph that understands SelectionCriteria
|
||||
and how they interact with the graph.
|
||||
"""
|
||||
|
||||
graph: nx.DiGraph
|
||||
|
||||
def __init__(self, graph: Union[Graph, nx.DiGraph]):
|
||||
if isinstance(graph, nx.DiGraph):
|
||||
self.graph = graph
|
||||
else:
|
||||
# Need to implement
|
||||
raise NotImplementedException("Doh!")
|
||||
|
||||
def nodes(self) -> Set[UniqueId]:
|
||||
return set(self.graph.nodes())
|
||||
|
||||
def edges(self):
|
||||
return self.graph.edges()
|
||||
|
||||
def __iter__(self) -> Iterator[UniqueId]:
|
||||
return iter(self.nodes())
|
||||
|
||||
def ancestors(self, node: UniqueId, max_depth: Optional[int] = None) -> Set[UniqueId]:
|
||||
"""Returns all nodes having a path to `node` in `graph`"""
|
||||
if not self.graph.has_node(node):
|
||||
raise InternalException(f"Node {node} not found in the graph!")
|
||||
return {
|
||||
child
|
||||
for _, child in nx.bfs_edges(self.graph, node, reverse=True, depth_limit=max_depth)
|
||||
}
|
||||
|
||||
def in_degree(self):
|
||||
self.graph.in_degree()
|
||||
|
||||
def descendants(self, node: UniqueId, max_depth: Optional[int] = None) -> Set[UniqueId]:
|
||||
"""Returns all nodes reachable from `node` in `graph`"""
|
||||
if not self.graph.has_node(node):
|
||||
raise InternalException(f"Node {node} not found in the graph!")
|
||||
return {child for _, child in nx.bfs_edges(self.graph, node, depth_limit=max_depth)}
|
||||
|
||||
def get_subset_graph(self, selected: Iterable[UniqueId]) -> "Graph":
|
||||
"""Create and return a new graph that is a shallow copy of the graph,
|
||||
but with only the nodes in include_nodes. Transitive edges across
|
||||
removed nodes are preserved as explicit new edges.
|
||||
"""
|
||||
|
||||
new_graph = self.graph.copy()
|
||||
include_nodes = set(selected)
|
||||
|
||||
for node in self:
|
||||
if node not in include_nodes:
|
||||
source_nodes = [x for x, _ in new_graph.in_edges(node)]
|
||||
target_nodes = [x for _, x in new_graph.out_edges(node)]
|
||||
|
||||
new_edges = product(source_nodes, target_nodes)
|
||||
non_cyclic_new_edges = [
|
||||
(source, target) for source, target in new_edges if source != target
|
||||
] # removes cyclic refs
|
||||
|
||||
new_graph.add_edges_from(non_cyclic_new_edges)
|
||||
new_graph.remove_node(node)
|
||||
|
||||
for node in include_nodes:
|
||||
if node not in new_graph:
|
||||
raise ValueError(
|
||||
"Couldn't find model '{}' -- does it exist or is it disabled?".format(node)
|
||||
)
|
||||
|
||||
return Graph(new_graph)
|
||||
|
||||
def subgraph(self, nodes: Iterable[UniqueId]) -> "Graph":
|
||||
return Graph(self.graph.subgraph(nodes))
|
||||
|
||||
def remove_node(self, node_id: UniqueId) -> None:
|
||||
return self.graph.remove_node(node_id)
|
||||
96
core/dbt/graph/networkx_queue.py
Normal file
96
core/dbt/graph/networkx_queue.py
Normal file
@@ -0,0 +1,96 @@
|
||||
import networkx as nx # type: ignore
|
||||
|
||||
from typing import Dict, Set, List, Generator
|
||||
|
||||
from .graph import UniqueId
|
||||
from .queue import GraphQueue
|
||||
from dbt.contracts.graph.parsed import ParsedSourceDefinition, ParsedExposure, ParsedMetric
|
||||
from dbt.contracts.graph.manifest import Manifest
|
||||
from dbt.node_types import NodeType
|
||||
|
||||
|
||||
class NetworkxGraphQueue(GraphQueue):
|
||||
"""A fancy queue that is backed by the dependency graph.
|
||||
Note: this will mutate input!
|
||||
This queue is thread-safe for `mark_done` calls, though you must ensure
|
||||
that separate threads do not call `.empty()` or `__len__()` and `.get()` at
|
||||
the same time, as there is an unlocked race!
|
||||
"""
|
||||
|
||||
def __init__(self, graph: nx.DiGraph, manifest: Manifest, selected: Set[UniqueId]) -> None:
|
||||
# store the 'score' of each node as a number. Lower is higher priority.
|
||||
self._scores = self._get_scores(graph)
|
||||
self.graph = graph
|
||||
super().__init__(graph, manifest, selected)
|
||||
|
||||
def _include_in_cost(self, node_id: UniqueId) -> bool:
|
||||
node = self.manifest.expect(node_id)
|
||||
if node.resource_type != NodeType.Model:
|
||||
return False
|
||||
# must be a Model - tell mypy this won't be a Source or Exposure or Metric
|
||||
assert not isinstance(node, (ParsedSourceDefinition, ParsedExposure, ParsedMetric))
|
||||
if node.is_ephemeral:
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _grouped_topological_sort(
|
||||
graph: nx.DiGraph,
|
||||
) -> Generator[List[str], None, None]:
|
||||
"""Topological sort of given graph that groups ties.
|
||||
Adapted from `nx.topological_sort`, this function returns a topo sort of a graph however
|
||||
instead of arbitrarily ordering ties in the sort order, ties are grouped together in
|
||||
lists.
|
||||
Args:
|
||||
graph: The graph to be sorted.
|
||||
Returns:
|
||||
A generator that yields lists of nodes, one list per graph depth level.
|
||||
"""
|
||||
indegree_map = {v: d for v, d in graph.in_degree() if d > 0}
|
||||
zero_indegree = [v for v, d in graph.in_degree() if d == 0]
|
||||
|
||||
while zero_indegree:
|
||||
yield zero_indegree
|
||||
new_zero_indegree = []
|
||||
for v in zero_indegree:
|
||||
for _, child in graph.edges(v):
|
||||
indegree_map[child] -= 1
|
||||
if not indegree_map[child]:
|
||||
new_zero_indegree.append(child)
|
||||
zero_indegree = new_zero_indegree
|
||||
|
||||
def _get_scores(self, graph: nx.DiGraph) -> Dict[str, int]:
|
||||
"""Scoring nodes for processing order.
|
||||
Scores are calculated by the graph depth level. Lowest score (0) should be processed first.
|
||||
Args:
|
||||
graph: The graph to be scored.
|
||||
Returns:
|
||||
A dictionary consisting of `node name`:`score` pairs.
|
||||
"""
|
||||
# split graph by connected subgraphs
|
||||
subgraphs = (graph.subgraph(x) for x in nx.connected_components(nx.Graph(graph)))
|
||||
|
||||
# score all nodes in all subgraphs
|
||||
scores = {}
|
||||
for subgraph in subgraphs:
|
||||
grouped_nodes = self._grouped_topological_sort(subgraph)
|
||||
for level, group in enumerate(grouped_nodes):
|
||||
for node in group:
|
||||
scores[node] = level
|
||||
|
||||
return scores
|
||||
|
||||
def find_new_additions(self) -> None:
|
||||
"""Find any nodes in the graph that need to be added to the internal
|
||||
queue and add them.
|
||||
"""
|
||||
for node, in_degree in self.graph.in_degree():
|
||||
if not self._already_known(node) and in_degree == 0:
|
||||
self.inner.put((self._scores[node], node))
|
||||
self.queued.add(node)
|
||||
|
||||
def remove_node_from_graph(self, node_id):
|
||||
self.graph.remove_node(node_id)
|
||||
|
||||
def get_node_num(self):
|
||||
return len(self.graph.nodes())
|
||||
@@ -1,17 +1,15 @@
|
||||
import networkx as nx # type: ignore
|
||||
from abc import ABC, abstractmethod
|
||||
import threading
|
||||
|
||||
from queue import PriorityQueue
|
||||
from typing import Dict, Set, List, Generator, Optional
|
||||
from typing import Set, Optional
|
||||
|
||||
from .graph import UniqueId
|
||||
from dbt.contracts.graph.parsed import ParsedSourceDefinition, ParsedExposure, ParsedMetric
|
||||
from .graph import Graph, UniqueId
|
||||
from dbt.contracts.graph.compiled import GraphMemberNode
|
||||
from dbt.contracts.graph.manifest import Manifest
|
||||
from dbt.node_types import NodeType
|
||||
|
||||
|
||||
class GraphQueue:
|
||||
class GraphQueue(ABC):
|
||||
"""A fancy queue that is backed by the dependency graph.
|
||||
Note: this will mutate input!
|
||||
|
||||
@@ -20,8 +18,7 @@ class GraphQueue:
|
||||
the same time, as there is an unlocked race!
|
||||
"""
|
||||
|
||||
def __init__(self, graph: nx.DiGraph, manifest: Manifest, selected: Set[UniqueId]):
|
||||
self.graph = graph
|
||||
def __init__(self, graph: Graph, manifest: Manifest, selected: Set[UniqueId]):
|
||||
self.manifest = manifest
|
||||
self._selected = selected
|
||||
# store the queue as a priority queue.
|
||||
@@ -33,88 +30,34 @@ class GraphQueue:
|
||||
self.queued: Set[UniqueId] = set()
|
||||
# this lock controls most things
|
||||
self.lock = threading.Lock()
|
||||
# store the 'score' of each node as a number. Lower is higher priority.
|
||||
self._scores = self._get_scores(self.graph)
|
||||
# populate the initial queue
|
||||
self._find_new_additions()
|
||||
self.find_new_additions()
|
||||
# awaits after task end
|
||||
self.some_task_done = threading.Condition(self.lock)
|
||||
|
||||
def _mark_in_progress(self, node_id: UniqueId) -> None:
|
||||
"""Mark the node as 'in progress'.
|
||||
Callers must hold the lock.
|
||||
:param str node_id: The node ID to mark as in progress.
|
||||
"""
|
||||
self.queued.remove(node_id)
|
||||
self.in_progress.add(node_id)
|
||||
|
||||
def join(self) -> None:
|
||||
"""Join the queue. Blocks until all tasks are marked as done.
|
||||
Make sure not to call this before the queue reports that it is empty.
|
||||
"""
|
||||
self.inner.join()
|
||||
|
||||
def get_selected_nodes(self) -> Set[UniqueId]:
|
||||
return self._selected.copy()
|
||||
|
||||
def _include_in_cost(self, node_id: UniqueId) -> bool:
|
||||
node = self.manifest.expect(node_id)
|
||||
if node.resource_type != NodeType.Model:
|
||||
return False
|
||||
# must be a Model - tell mypy this won't be a Source or Exposure or Metric
|
||||
assert not isinstance(node, (ParsedSourceDefinition, ParsedExposure, ParsedMetric))
|
||||
if node.is_ephemeral:
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _grouped_topological_sort(
|
||||
graph: nx.DiGraph,
|
||||
) -> Generator[List[str], None, None]:
|
||||
"""Topological sort of given graph that groups ties.
|
||||
|
||||
Adapted from `nx.topological_sort`, this function returns a topo sort of a graph however
|
||||
instead of arbitrarily ordering ties in the sort order, ties are grouped together in
|
||||
lists.
|
||||
|
||||
Args:
|
||||
graph: The graph to be sorted.
|
||||
|
||||
Returns:
|
||||
A generator that yields lists of nodes, one list per graph depth level.
|
||||
"""
|
||||
indegree_map = {v: d for v, d in graph.in_degree() if d > 0}
|
||||
zero_indegree = [v for v, d in graph.in_degree() if d == 0]
|
||||
|
||||
while zero_indegree:
|
||||
yield zero_indegree
|
||||
new_zero_indegree = []
|
||||
for v in zero_indegree:
|
||||
for _, child in graph.edges(v):
|
||||
indegree_map[child] -= 1
|
||||
if not indegree_map[child]:
|
||||
new_zero_indegree.append(child)
|
||||
zero_indegree = new_zero_indegree
|
||||
|
||||
def _get_scores(self, graph: nx.DiGraph) -> Dict[str, int]:
|
||||
"""Scoring nodes for processing order.
|
||||
|
||||
Scores are calculated by the graph depth level. Lowest score (0) should be processed first.
|
||||
|
||||
Args:
|
||||
graph: The graph to be scored.
|
||||
|
||||
Returns:
|
||||
A dictionary consisting of `node name`:`score` pairs.
|
||||
"""
|
||||
# split graph by connected subgraphs
|
||||
subgraphs = (graph.subgraph(x) for x in nx.connected_components(nx.Graph(graph)))
|
||||
|
||||
# score all nodes in all subgraphs
|
||||
scores = {}
|
||||
for subgraph in subgraphs:
|
||||
grouped_nodes = self._grouped_topological_sort(subgraph)
|
||||
for level, group in enumerate(grouped_nodes):
|
||||
for node in group:
|
||||
scores[node] = level
|
||||
|
||||
return scores
|
||||
|
||||
def get(self, block: bool = True, timeout: Optional[float] = None) -> GraphMemberNode:
|
||||
"""Get a node off the inner priority queue. By default, this blocks.
|
||||
|
||||
This takes the lock, but only for part of it.
|
||||
|
||||
:param block: If True, block until the inner queue has data
|
||||
:param timeout: If set, block for timeout seconds waiting for data.
|
||||
:return: The node as present in the manifest.
|
||||
|
||||
See `queue.PriorityQueue` for more information on `get()` behavior and
|
||||
exceptions.
|
||||
"""
|
||||
@@ -131,7 +74,7 @@ class GraphQueue:
|
||||
This takes the lock.
|
||||
"""
|
||||
with self.lock:
|
||||
return len(self.graph) - len(self.in_progress)
|
||||
return self.get_node_num() - len(self.in_progress)
|
||||
|
||||
def empty(self) -> bool:
|
||||
"""The graph queue is 'empty' if it all remaining nodes in the graph
|
||||
@@ -152,46 +95,18 @@ class GraphQueue:
|
||||
"""
|
||||
return node in self.in_progress or node in self.queued
|
||||
|
||||
def _find_new_additions(self) -> None:
|
||||
"""Find any nodes in the graph that need to be added to the internal
|
||||
queue and add them.
|
||||
"""
|
||||
for node, in_degree in self.graph.in_degree():
|
||||
if not self._already_known(node) and in_degree == 0:
|
||||
self.inner.put((self._scores[node], node))
|
||||
self.queued.add(node)
|
||||
|
||||
def mark_done(self, node_id: UniqueId) -> None:
|
||||
"""Given a node's unique ID, mark it as done.
|
||||
|
||||
This method takes the lock.
|
||||
|
||||
:param str node_id: The node ID to mark as complete.
|
||||
"""
|
||||
with self.lock:
|
||||
self.in_progress.remove(node_id)
|
||||
self.graph.remove_node(node_id)
|
||||
self._find_new_additions()
|
||||
self.remove_node_from_graph(node_id)
|
||||
self.find_new_additions()
|
||||
self.inner.task_done()
|
||||
self.some_task_done.notify_all()
|
||||
|
||||
def _mark_in_progress(self, node_id: UniqueId) -> None:
|
||||
"""Mark the node as 'in progress'.
|
||||
|
||||
Callers must hold the lock.
|
||||
|
||||
:param str node_id: The node ID to mark as in progress.
|
||||
"""
|
||||
self.queued.remove(node_id)
|
||||
self.in_progress.add(node_id)
|
||||
|
||||
def join(self) -> None:
|
||||
"""Join the queue. Blocks until all tasks are marked as done.
|
||||
|
||||
Make sure not to call this before the queue reports that it is empty.
|
||||
"""
|
||||
self.inner.join()
|
||||
|
||||
def wait_until_something_was_done(self) -> int:
|
||||
"""Block until a task is done, then return the number of unfinished
|
||||
tasks.
|
||||
@@ -199,3 +114,15 @@ class GraphQueue:
|
||||
with self.lock:
|
||||
self.some_task_done.wait()
|
||||
return self.inner.unfinished_tasks
|
||||
|
||||
@abstractmethod
|
||||
def find_new_additions(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def remove_node_from_graph(self, node_id):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_node_num(self):
|
||||
pass
|
||||
|
||||
96
core/dbt/graph/redis_queue.py
Normal file
96
core/dbt/graph/redis_queue.py
Normal file
@@ -0,0 +1,96 @@
|
||||
from dataclasses import asdict
|
||||
from datetime import datetime
|
||||
import networkx as nx # type: ignore
|
||||
import os
|
||||
|
||||
from typing import Set
|
||||
import redis
|
||||
import time
|
||||
from .graph_node import GraphNode
|
||||
from redisgraph import Node, Edge, Graph
|
||||
|
||||
from .graph import UniqueId
|
||||
from .queue import GraphQueue
|
||||
|
||||
from dbt.contracts.graph.manifest import Manifest
|
||||
|
||||
|
||||
class RedisGraphQueue(GraphQueue):
|
||||
# TODO this skips ephemeral model
|
||||
|
||||
def __init__(self, graph: nx.DiGraph, manifest: Manifest, selected: Set[UniqueId]) -> None:
|
||||
# create entry on run_results,
|
||||
# create new graph in redis from graph
|
||||
|
||||
previous_run_id = os.getenv("PREVIOUS_RUN_ID")
|
||||
self.r = redis.Redis(host="localhost", port=6379)
|
||||
if previous_run_id:
|
||||
self.name = int(previous_run_id)
|
||||
self.redis_graph = Graph(self.name, self.r)
|
||||
# mark current node
|
||||
query = """MATCH (p:node {status: 'queued'}) SET p.status = 'created'"""
|
||||
self.redis_graph.query(query)
|
||||
query = """MATCH (p:node {status: 'created'}) RETURN p.name"""
|
||||
result = self.redis_graph.query(query)
|
||||
self.node_count = len(result.result_set)
|
||||
else:
|
||||
self.name = int(time.time())
|
||||
self.redis_graph = Graph(self.name, self.r)
|
||||
nodes = {}
|
||||
for node_name, in_degree in graph.in_degree():
|
||||
start_time = int(datetime.now().timestamp())
|
||||
properties = GraphNode(
|
||||
name=node_name,
|
||||
in_degree=in_degree,
|
||||
status="created",
|
||||
update_date=start_time,
|
||||
start_date=start_time
|
||||
)
|
||||
curr_node = Node(
|
||||
label="node",
|
||||
properties=asdict(properties),
|
||||
)
|
||||
self.redis_graph.add_node(curr_node)
|
||||
nodes[node_name] = curr_node
|
||||
for child_name, father_name in graph.edges():
|
||||
edge = Edge(nodes[child_name], "is_child_of", nodes[father_name])
|
||||
self.redis_graph.add_edge(edge)
|
||||
self.redis_graph.commit()
|
||||
self.node_count = len(graph.nodes)
|
||||
super().__init__(graph, manifest, selected)
|
||||
|
||||
def find_new_additions(self) -> None:
|
||||
"""Find any nodes in the graph that need to be added to the internal
|
||||
queue and add them.
|
||||
"""
|
||||
query = """MATCH (p:node {in_degree :0, status: 'created'})
|
||||
RETURN p.name"""
|
||||
result = self.redis_graph.query(query)
|
||||
for name in result.result_set:
|
||||
node = name[0]
|
||||
if not self._already_known(node):
|
||||
self.inner.put((0, node))
|
||||
self.queued.add(node)
|
||||
query = """MATCH (p:node {in_degree :0, status: 'created'})
|
||||
set p.status = 'queued'"""
|
||||
self.redis_graph.query(query)
|
||||
|
||||
def remove_node_from_graph(self, node_id):
|
||||
# find all childnode of current
|
||||
self.node_count -= 1
|
||||
query = f"""MATCH (p:node {{name: '{node_id}'}})- [v:is_child_of] ->(c:node)
|
||||
RETURN c.name, c.in_degree"""
|
||||
result = self.redis_graph.query(query)
|
||||
# reduce the in degree of them
|
||||
for model_name, in_degree in result.result_set:
|
||||
query = (
|
||||
f"""MATCH (p:node {{name: '{model_name}'}}) SET p.in_degree = {in_degree - 1}"""
|
||||
)
|
||||
self.redis_graph.query(query)
|
||||
# mark current node
|
||||
current_time = int(datetime.now().timestamp())
|
||||
query = f"""MATCH (p:node {{name: '{node_id}'}}) SET p.status = 'done', p.update_date = {current_time}"""
|
||||
self.redis_graph.query(query)
|
||||
|
||||
def get_node_num(self):
|
||||
return self.node_count
|
||||
@@ -1,7 +1,10 @@
|
||||
from typing import Set, List, Optional, Tuple
|
||||
import os
|
||||
|
||||
from .graph import Graph, UniqueId
|
||||
from .queue import GraphQueue
|
||||
from .networkx_queue import NetworkxGraphQueue
|
||||
from .redis_queue import RedisGraphQueue
|
||||
from .selector_methods import MethodManager
|
||||
from .selector_spec import SelectionCriteria, SelectionSpec, IndirectSelection
|
||||
|
||||
@@ -275,7 +278,10 @@ class NodeSelector(MethodManager):
|
||||
selected_resources.set_selected_resources(selected_nodes)
|
||||
new_graph = self.full_graph.get_subset_graph(selected_nodes)
|
||||
# should we give a way here for consumers to mutate the graph?
|
||||
return GraphQueue(new_graph.graph, self.manifest, selected_nodes)
|
||||
if os.getenv("DBT_BACKEND") == "REDIS":
|
||||
return RedisGraphQueue(new_graph.graph, self.manifest, selected_nodes)
|
||||
else:
|
||||
return NetworkxGraphQueue(new_graph.graph, self.manifest, selected_nodes)
|
||||
|
||||
|
||||
class ResourceTypeSelector(NodeSelector):
|
||||
|
||||
@@ -50,10 +50,7 @@ def _match_to_int(match: Dict[str, str], key: str) -> Optional[int]:
|
||||
|
||||
|
||||
SelectionSpec = Union[
|
||||
"SelectionCriteria",
|
||||
"SelectionIntersection",
|
||||
"SelectionDifference",
|
||||
"SelectionUnion",
|
||||
"SelectionCriteria", "SelectionIntersection", "SelectionDifference", "SelectionUnion"
|
||||
]
|
||||
|
||||
|
||||
|
||||
@@ -298,6 +298,14 @@ def _build_base_subparser():
|
||||
string, eg. '{my_variable: my_value}'
|
||||
""",
|
||||
)
|
||||
base_subparser.add_argument(
|
||||
"--prev-run-id",
|
||||
type=str,
|
||||
default="",
|
||||
help="""
|
||||
resume from previous run id
|
||||
""",
|
||||
)
|
||||
|
||||
# if set, log all cache events. This is extremely verbose!
|
||||
base_subparser.add_argument(
|
||||
@@ -1185,6 +1193,10 @@ def parse_args(args, cls=DBTArgumentParser):
|
||||
sys.exit(1)
|
||||
|
||||
parsed = p.parse_args(args)
|
||||
if parsed.prev_run_id:
|
||||
os.environ["PREVIOUS_RUN_ID"] = parsed.prev_run_id
|
||||
else:
|
||||
os.environ["PREVIOUS_RUN_ID"] = ""
|
||||
|
||||
# profiles_dir is set before subcommands and after, so normalize
|
||||
if hasattr(parsed, "sub_profiles_dir"):
|
||||
|
||||
@@ -176,8 +176,8 @@ class GraphRunnableTask(ManifestTask):
|
||||
raise InternalException(
|
||||
f"Node selection returned {uid}, expected a node or a source"
|
||||
)
|
||||
|
||||
self.num_nodes = len([n for n in self._flattened_nodes if not n.is_ephemeral_model])
|
||||
self.num_nodes = self.job_queue.get_node_num()
|
||||
# self.num_nodes = len([n for n in self._flattened_nodes if not n.is_ephemeral_model])
|
||||
|
||||
def raise_on_first_error(self):
|
||||
return False
|
||||
@@ -290,6 +290,9 @@ class GraphRunnableTask(ManifestTask):
|
||||
if self.job_queue is None:
|
||||
raise InternalException("Got to run_queue callback with no job queue set")
|
||||
self.job_queue.mark_done(result.node.unique_id)
|
||||
import time
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
while not self.job_queue.empty():
|
||||
node = self.job_queue.get()
|
||||
|
||||
Reference in New Issue
Block a user