Compare commits

...

3 Commits

Author SHA1 Message Date
Ian Knox
fadff9819d merge conflict 2022-03-30 10:30:01 -05:00
Ian Knox
931e21ac77 WIP 2022-03-29 11:51:42 -05:00
Ian Knox
8d3b3f8e21 tmp commit 2022-03-16 17:28:10 -05:00
7 changed files with 1246 additions and 11 deletions

View File

@@ -2,3 +2,4 @@
from .profile import Profile, read_user_config # noqa from .profile import Profile, read_user_config # noqa
from .project import Project, IsFQNResource # noqa from .project import Project, IsFQNResource # noqa
from .runtime import RuntimeConfig, UnsetProfileConfig # noqa from .runtime import RuntimeConfig, UnsetProfileConfig # noqa
#from .task import TaskConfig # noqa

1032
core/dbt/config/args.py Normal file

File diff suppressed because it is too large Load Diff

63
core/dbt/config/task.py Normal file
View File

@@ -0,0 +1,63 @@
from argparse import Namespace
from ast import Str
from dataclasses import dataclass, field
from os import PathLike
from sys import argv
from typing import List, Union
import dbt.flags as flags
from .args import parse_args
from .profile import Profile, read_user_config
from .project import Project
from .renderer import DbtProjectYamlRenderer, ProfileRenderer
from .runtime import RuntimeConfig, UnsetProfileConfig
@dataclass
class TaskConfig:
args: Union[Namespace, List]
config: Union[RuntimeConfig, UnsetProfileConfig] = field(init=False)
def __post_init__(self):
# Get the args from the cli and "parse" them
#TODO replace parse_args() with Click functionality
self.args = parse_args(self.args) if self.args else parse_args(argv[1:])
# Update flags
# TODO: Flags are just env vars, replace with Click functionality
user_config = read_user_config(flags.PROFILES_DIR)
flags.set_from_args(self.args, user_config)
# Generate a profile renderer
_profile_renderer = ProfileRenderer()
# Generate a profile
_profile = Profile.render_from_args(
self.args,
_profile_renderer,
"forced_deps_test"
)
# Generate a project renderer
_project_renderer = DbtProjectYamlRenderer(_profile)
# Generate a Project
_project = Project.from_project_root(
"/Users/iknox/Projects/dbt_projects/forced_deps/",
_project_renderer,
verify_version=bool(flags.VERSION_CHECK),
)
# Generate dependencies (often None)
_dependencies = None
# Generate a RuntimeConfig (this is only for certain types of tasks)
self.config = RuntimeConfig.from_parts(
_project,
_profile,
self.args,
_dependencies,
)
breakpoint()

View File

@@ -548,7 +548,8 @@ class LogManager(logbook.NestedSetup):
log_manager = LogManager() log_manager = LogManager()
if dbt.flags.DEBUG:
log_manager.set_debug()
def log_cache_events(flag): def log_cache_events(flag):
"""Set the cache logger to propagate its messages based on the given flag.""" """Set the cache logger to propagate its messages based on the given flag."""

139
core/dbt/new_main.py Normal file
View File

@@ -0,0 +1,139 @@
import sys
import traceback
from contextlib import ExitStack, contextmanager
import dbt.tracking
import dbt.version
from dbt.adapters.factory import cleanup_connections, reset_adapters
from dbt.config import TaskConfig
from dbt.events.functions import fire_event, setup_event_logger
from dbt.events.types import (MainEncounteredError, MainKeyboardInterrupt,
MainReportArgs, MainReportVersion,
MainStackTrace, MainTrackingUserState)
from dbt.exceptions import (FailedToConnectException, InternalException,
NotImplementedException)
from dbt.logger import log_cache_events, log_manager
from dbt.profiler import profiler
from dbt.utils import ExitCodes, args_to_dict
import warnings
import click
@click.group()
def main():
pass
# docs,source,init,clean,debug,deps,list,ls,build,snapshot,run,compile,parse,test,seed,run-operation
@main.command()
def docs():
click.echo('Docs go here')
@main.command()
def source():
click.echo('Source go here')
@main.command()
def init():
click.echo('init go here')
@main.command()
def clean():
click.echo('clean go here')
def main_complex(args=None):
# Generate a task config
task_config = TaskConfig(args)
# Select a task
# N.B. The task selection logic is tightly coupled to the arg parsing logic.
# This task selection method is temporary until CT-208 is complete
task_class = task_config.args.cls
# Instantiate the task
task = task_class(
task_config.args,
task_config.config
)
# Set up logging
# N.B. Logbook warnings are ignored from the CLI so we don't have to fork it to support
# python 3.10.
warnings.filterwarnings("ignore", category=DeprecationWarning, module="logbook")
log_cache_events(getattr(task_config.args, "log_cache_events", False))
log_path = getattr(task.config, "log_path", None)
log_manager.set_path(log_path)
task.set_log_format()
setup_event_logger(
log_path or "logs",
task.pre_init_hook(task_config.args)
)
# Prepare task run contexts
task_run_contexts = [
log_manager.applicationbound(),
#adapter_management(),
#track_run(task),
]
#if task_config.args.record_timing_info:
#task_run_contexts.append(
# profiler(
# outfile=task_config.args.record_timing_info
# )
#)
# Run the task in an ExitStack of contexts
with ExitStack() as stack:
for context in task_run_contexts:
stack.enter_context(context)
# Fire task start events
#fire_event(MainReportVersion(v=str(dbt.version.installed)))
#fire_event(MainReportArgs(args=args_to_dict(task_config.args)))
#fire_event(MainTrackingUserState(user_state=dbt.tracking.active_user.state()))
try:
task_succeeded = task.run()
exit_code = ExitCodes.Success.value if task_succeeded else ExitCodes.ModelError.value
# Handle keyboard inturrupts
except KeyboardInterrupt:
fire_event(MainKeyboardInterrupt())
exit_code = ExitCodes.UnhandledError.value
# Handle system exits
except SystemExit as e:
exit_code = e.code
# Handle other exceptions
except BaseException as e:
fire_event(MainEncounteredError(e=str(e)))
fire_event(MainStackTrace(stack_trace=traceback.format_exc()))
exit_code = ExitCodes.UnhandledError.value
finally:
# Exit with approriate code
sys.exit(exit_code)
#main = main_complex
@contextmanager
def adapter_management():
reset_adapters()
try:
yield
finally:
cleanup_connections()
@contextmanager
def track_run(task):
dbt.tracking.initialize_from_flags()
dbt.tracking.track_invocation_start(config=task.config, args=task.args)
try:
yield
dbt.tracking.track_invocation_end(config=task.config, args=task.args, result_type="ok")
except (NotImplementedException, FailedToConnectException) as e:
fire_event(MainEncounteredError(e=str(e)))
dbt.tracking.track_invocation_end(config=task.config, args=task.args, result_type="error")
except Exception:
dbt.tracking.track_invocation_end(config=task.config, args=task.args, result_type="error")
raise
finally:
dbt.tracking.flush()

View File

@@ -226,7 +226,8 @@ class ManifestLoader:
# Save performance info # Save performance info
loader._perf_info.load_all_elapsed = time.perf_counter() - start_load_all loader._perf_info.load_all_elapsed = time.perf_counter() - start_load_all
loader.track_project_load() if dbt.tracking.active_user is not None:
loader.track_project_load()
return manifest return manifest

View File

@@ -4,16 +4,14 @@ from pstats import Stats
@contextmanager @contextmanager
def profiler(enable, outfile): def profiler(outfile):
try: try:
if enable: profiler = Profile()
profiler = Profile() profiler.enable()
profiler.enable()
yield yield
finally: finally:
if enable: profiler.disable()
profiler.disable() stats = Stats(profiler)
stats = Stats(profiler) stats.sort_stats("tottime")
stats.sort_stats("tottime") stats.dump_stats(outfile)
stats.dump_stats(outfile)