Compare commits

...

8 Commits

Author SHA1 Message Date
Ian Knox
c4969c1030 linter/mypy fixes 2021-08-16 16:08:17 -05:00
Ian Knox
0f2227f033 all file logic removed from dbt.clients.system (except find_matching) 2021-08-16 16:08:17 -05:00
Ian Knox
8507a5b7b7 Merge pull request #3693 from dbt-labs/local-fs-sa-skateboard
Basic local FS functionality
2021-08-13 17:06:57 -05:00
Ian Knox
2b961c5543 Merge branch 'feature/local-fs-storage-adapter' into local-fs-sa-skateboard 2021-08-13 17:06:32 -05:00
Ian Knox
db2ef4d084 Update core/dbt/clients/storage.py
Co-authored-by: leahwicz <60146280+leahwicz@users.noreply.github.com>
2021-08-05 09:42:27 -05:00
Ian Knox
ed3dbbdf97 Update core/dbt/adapters/internal_storage/local_filesystem.py
Co-authored-by: leahwicz <60146280+leahwicz@users.noreply.github.com>
2021-08-05 09:42:18 -05:00
Ian Knox
a1af19af68 Local FS storage adapter, pass 2- better logic, lots of edge cases fixed, type hinting 2021-08-04 08:29:12 -05:00
Ian Knox
ad8cbfcd22 * system_client logic -> local_filesystem adapter
* basic storage_adapter logic for dynamic loading
2021-07-30 13:34:57 -05:00
26 changed files with 365 additions and 425 deletions

View File

@@ -0,0 +1 @@
Move the warehouse adapters to a subfolder of adapters (adapters/warehouse).

View File

@@ -0,0 +1,215 @@
from pathlib import Path
from shutil import rmtree
from stat import S_IRUSR, S_IWUSR
from sys import platform
from typing import Union
from typing_extensions import Literal
from dbt.logger import GLOBAL_LOGGER as logger
# windows platforms as returned by sys.platform
# via https://stackoverflow.com/a/13874620
WINDOWS_PLATFORMS = ("win32", "cygwin", "msys")
# load ctypes for windows platforms
if platform in WINDOWS_PLATFORMS:
from ctypes import WinDLL, c_bool
else:
WinDLL = None
c_bool = None
def ready_check() -> bool:
"""Ensures the adapter is ready for use.
Returns:
`True` if the resource is ready to be used.
`False` if the resource is not ready to be used.
Raises:
TBD - TODO: How best to report back errors here?
It should never fail for a filesystem (unless we're using something very exotic),
but for databases this should be our primary source of troubleshooting information.
"""
return True
def read(
path: str,
strip: bool = True,
) -> str:
"""Reads the content of a file on the filesystem.
Args:
path: Full path of file to be read.
strip: Wether or not to strip whitespace.
Returns:
Content of the file
"""
# create a concrete path object
path: Path = Path(path)
# read the file in as a string, or none if not found
file_content = path.read_text(encoding="utf-8")
# conditionally strip whitespace
file_content = file_content.strip() if strip else file_content
return file_content
def write(
path: str,
content: Union[str, bytes, None],
overwrite: bool = True,
) -> bool:
"""Writes the given content out to a resource on the filesystem.
Since there are many different ways to approach filesystem operations, It's best to enumerate
the rules(tm):
If the path to the file/dir doesn't exist, it will be created.
If content is `None` a directory will be created instead of a file.
If contet is not a `str` or `bytes` the string representation of the object will be written.
If the content is `str` it will be encoded as utf-8
Overwrites are only supported for files and are toggled by the overwrite parameter.
All logical cases outside those outlined above will result in failure
Args:
path: Full path of resource to be written.
content: Data to be written.
overwrite: Wether or not to overwrite if a file already exists at this path.
parser: A parser to apply to file data.
Returns:
`True` for success, `False` otherwise.
"""
# create a concrete path object
path: Path = Path(path)
# handle overwrite errors for files and directories
if path.exists() and (overwrite is False or path.is_dir() is True):
logger.debug(f"{path} already exists")
return False
# handle trying to write file content to a path that is a directory
if path.is_dir() and content is not None:
logger.debug(f"{path} is a directory, but file content was specified")
return False
# create a directory if the content is `None`
if content is None:
path.mkdir(parents=True, exist_ok=True)
# create an empty file if the content is an empty string
elif content == "" and path.exists() is False:
path.touch()
# write out to a file
else:
try:
path.parent.mkdir(parents=True, exist_ok=True)
if type(content) == bytes:
path.write_bytes(content)
else:
path.write_text(str(content), encoding="utf-8")
except Exception as e:
# TODO: The block below was c/p'd directly from the old system client.
# We should examine if this actually makes sense to log file write failures and
# try to keep going.
if platform in WINDOWS_PLATFORMS:
# sometimes we get a winerror of 3 which means the path was
# definitely too long, but other times we don't and it means the
# path was just probably too long. This is probably based on the
# windows/python version.
if getattr(e, "winerror", 0) == 3:
reason = "Path was too long"
else:
reason = "Path was possibly too long"
# all our hard work and the path was still too long. Log and
# continue.
logger.debug(
f"Could not write to path {path}({len(path)} characters): "
f"{reason}\nexception: {e}"
)
else:
raise
return True
def delete(path: str) -> bool:
"""Deletes the resource at the given path
Args:
path: Full path of resource to be deleted
Returns:
`True` for success, `False` otherwise.
"""
# create concrete path object
path: Path = Path(path)
# ensure resource to be deleted exists
if not path.exists():
return False
# remove files
if path.is_file():
path.unlink()
# remove directories recursively, surprisingly obnoxious to do in a cross-platform safe manner
if path.is_dir():
if platform in WINDOWS_PLATFORMS:
# error handling for permissions on windows platforms
def on_error(func, path, _):
path.chmod(path, S_IWUSR | S_IRUSR)
func(path)
rmtree(path, onerror=on_error)
else:
rmtree(path)
return True
def info(path: str) -> Union[dict, Literal[False]]:
"""Provides information about what is found at the given path.
If info is called on a directory the size will be `None`
if Info is called on a resource that does not exist the response will be `False`
N.B: despite my best efforts, getting a reliable cross-platform file creation time is
absurdly difficult.
See these links for information if we ever decide we have to have this feature:
* https://bugs.python.org/issue39533
* https://github.com/ipfs-shipyard/py-datastore/blob/e566d40a8ca81d8628147e255fe7830b5f928a43/datastore/filesystem/util/statx.py # noqa: E501
* https://github.com/ckarageorgkaneen/pystatx
Args:
path: Full path being queried.
Returns:
On success: A dict containing information about what is found at the given path.
On failure: `False`
"""
# create a concrete path object.
path: Path = Path(path)
# return `False` if the resource doesn't exsist
if not path.exists():
return False
# calulate file size (`None` for dirs)
size = None if path.is_dir() else path.stat().st_size
# return info on the resource
return {"size": size, "modified_at": path.stat().st_mtime}

View File

@@ -1,7 +1,8 @@
import re
import os.path
from dbt.clients.system import run_cmd, rmdir
from dbt.clients.system import run_cmd
from dbt.clients.storage import adapter as SA
from dbt.logger import GLOBAL_LOGGER as logger
import dbt.exceptions
from packaging import version
@@ -42,7 +43,7 @@ def clone(repo, cwd, dirname=None, remove_git_dir=False, revision=None, subdirec
run_cmd(os.path.join(cwd, dirname or ''), ['git', 'sparse-checkout', 'set', subdirectory])
if remove_git_dir:
rmdir(os.path.join(dirname, '.git'))
SA.rmdir(os.path.join(dirname, '.git'))
return result

View File

@@ -0,0 +1,27 @@
from importlib import import_module
from os import getenv
from dbt.logger import GLOBAL_LOGGER as logger
from .storage_adapter_type import StorageAdapterType
from typing import cast
# TODO:
# ensure configured adapter has the correct module signature
# provide better not-ready error messaging
# get configured storage adapter
_module_to_load = getenv(
"DBT_STORAGE_ADAPTER", "dbt.adapters.internal_storage.local_filesystem"
)
# load module if it exists
try:
_adapter = cast(StorageAdapterType, import_module(_module_to_load))
logger.debug(f"Storage adapter {_module_to_load} loaded")
except ModuleNotFoundError:
logger.error(f"Storage adapter {_module_to_load} not found")
# run ready check
if not _adapter.ready_check():
logger.error(f"Storage Adapter{_module_to_load} not ready")
else:
adapter = _adapter

View File

@@ -0,0 +1,37 @@
from typing import Any, Dict, Union
from typing_extensions import Literal
class StorageAdapterType:
"""Class that defines type hinting for storage adapters.
This is needed because importlib (used in the storage client) and mypy aren't exactly friends.
N.B: https://stackoverflow.com/questions/48976499/mypy-importlib-module-functions
"""
@staticmethod
def ready_check() -> bool:
pass
@staticmethod
def read(path: str, strip: bool = ...) -> str:
pass
@staticmethod
def write(
path: str, content: Union[str, Dict[str, Any], None], overwrite: bool = ...
) -> bool:
pass
@staticmethod
def delete(path: str) -> bool:
pass
@staticmethod
def find() -> None:
pass
@staticmethod
def info(path: str) -> Union[dict, Literal[False]]:
pass

View File

@@ -1,6 +1,5 @@
import errno
import fnmatch
import json
import os
import os.path
import re
@@ -9,16 +8,15 @@ import subprocess
import sys
import tarfile
import requests
import stat
from pathlib import Path
from typing import (
Type, NoReturn, List, Optional, Dict, Any, Tuple, Callable, Union
Type, NoReturn, List, Optional, Dict, Any, Tuple, Union
)
import dbt.exceptions
import dbt.utils
from dbt.logger import GLOBAL_LOGGER as logger
if sys.platform == 'win32':
from ctypes import WinDLL, c_bool
else:
@@ -71,116 +69,6 @@ def find_matching(
return matching
def load_file_contents(path: str, strip: bool = True) -> str:
path = convert_path(path)
with open(path, 'rb') as handle:
to_return = handle.read().decode('utf-8')
if strip:
to_return = to_return.strip()
return to_return
def make_directory(path: str) -> None:
"""
Make a directory and any intermediate directories that don't already
exist. This function handles the case where two threads try to create
a directory at once.
"""
path = convert_path(path)
if not os.path.exists(path):
# concurrent writes that try to create the same dir can fail
try:
os.makedirs(path)
except OSError as e:
if e.errno == errno.EEXIST:
pass
else:
raise e
def make_file(path: str, contents: str = '', overwrite: bool = False) -> bool:
"""
Make a file at `path` assuming that the directory it resides in already
exists. The file is saved with contents `contents`
"""
if overwrite or not os.path.exists(path):
path = convert_path(path)
with open(path, 'w') as fh:
fh.write(contents)
return True
return False
def make_symlink(source: str, link_path: str) -> None:
"""
Create a symlink at `link_path` referring to `source`.
"""
if not supports_symlinks():
dbt.exceptions.system_error('create a symbolic link')
os.symlink(source, link_path)
def supports_symlinks() -> bool:
return getattr(os, "symlink", None) is not None
def write_file(path: str, contents: str = '') -> bool:
path = convert_path(path)
try:
make_directory(os.path.dirname(path))
with open(path, 'w', encoding='utf-8') as f:
f.write(str(contents))
except Exception as exc:
# note that you can't just catch FileNotFound, because sometimes
# windows apparently raises something else.
# It's also not sufficient to look at the path length, because
# sometimes windows fails to write paths that are less than the length
# limit. So on windows, suppress all errors that happen from writing
# to disk.
if os.name == 'nt':
# sometimes we get a winerror of 3 which means the path was
# definitely too long, but other times we don't and it means the
# path was just probably too long. This is probably based on the
# windows/python version.
if getattr(exc, 'winerror', 0) == 3:
reason = 'Path was too long'
else:
reason = 'Path was possibly too long'
# all our hard work and the path was still too long. Log and
# continue.
logger.debug(
f'Could not write to path {path}({len(path)} characters): '
f'{reason}\nexception: {exc}'
)
else:
raise
return True
def read_json(path: str) -> Dict[str, Any]:
return json.loads(load_file_contents(path))
def write_json(path: str, data: Dict[str, Any]) -> bool:
return write_file(path, json.dumps(data, cls=dbt.utils.JSONEncoder))
def _windows_rmdir_readonly(
func: Callable[[str], Any], path: str, exc: Tuple[Any, OSError, Any]
):
exception_val = exc[1]
if exception_val.errno == errno.EACCES:
os.chmod(path, stat.S_IWUSR)
func(path)
else:
raise
def resolve_path_from_base(path_to_resolve: str, base_path: str) -> str:
"""
If path-to_resolve is a relative path, create an absolute path
@@ -195,108 +83,6 @@ def resolve_path_from_base(path_to_resolve: str, base_path: str) -> str:
os.path.expanduser(path_to_resolve)))
def rmdir(path: str) -> None:
"""
Recursively deletes a directory. Includes an error handler to retry with
different permissions on Windows. Otherwise, removing directories (eg.
cloned via git) can cause rmtree to throw a PermissionError exception
"""
path = convert_path(path)
if sys.platform == 'win32':
onerror = _windows_rmdir_readonly
else:
onerror = None
shutil.rmtree(path, onerror=onerror)
def _win_prepare_path(path: str) -> str:
"""Given a windows path, prepare it for use by making sure it is absolute
and normalized.
"""
path = os.path.normpath(path)
# if a path starts with '\', splitdrive() on it will return '' for the
# drive, but the prefix requires a drive letter. So let's add the drive
# letter back in.
# Unless it starts with '\\'. In that case, the path is a UNC mount point
# and splitdrive will be fine.
if not path.startswith('\\\\') and path.startswith('\\'):
curdrive = os.path.splitdrive(os.getcwd())[0]
path = curdrive + path
# now our path is either an absolute UNC path or relative to the current
# directory. If it's relative, we need to make it absolute or the prefix
# won't work. `ntpath.abspath` allegedly doesn't always play nice with long
# paths, so do this instead.
if not os.path.splitdrive(path)[0]:
path = os.path.join(os.getcwd(), path)
return path
def _supports_long_paths() -> bool:
if sys.platform != 'win32':
return True
# Eryk Sun says to use `WinDLL('ntdll')` instead of `windll.ntdll` because
# of pointer caching in a comment here:
# https://stackoverflow.com/a/35097999/11262881
# I don't know exaclty what he means, but I am inclined to believe him as
# he's pretty active on Python windows bugs!
try:
dll = WinDLL('ntdll')
except OSError: # I don't think this happens? you need ntdll to run python
return False
# not all windows versions have it at all
if not hasattr(dll, 'RtlAreLongPathsEnabled'):
return False
# tell windows we want to get back a single unsigned byte (a bool).
dll.RtlAreLongPathsEnabled.restype = c_bool
return dll.RtlAreLongPathsEnabled()
def convert_path(path: str) -> str:
"""Convert a path that dbt has, which might be >260 characters long, to one
that will be writable/readable on Windows.
On other platforms, this is a no-op.
"""
# some parts of python seem to append '\*.*' to strings, better safe than
# sorry.
if len(path) < 250:
return path
if _supports_long_paths():
return path
prefix = '\\\\?\\'
# Nothing to do
if path.startswith(prefix):
return path
path = _win_prepare_path(path)
# add the prefix. The check is just in case os.getcwd() does something
# unexpected - I believe this if-state should always be True though!
if not path.startswith(prefix):
path = prefix + path
return path
def remove_file(path: str) -> None:
path = convert_path(path)
os.remove(path)
def path_exists(path: str) -> bool:
path = convert_path(path)
return os.path.lexists(path)
def path_is_symlink(path: str) -> bool:
path = convert_path(path)
return os.path.islink(path)
def open_dir_cmd() -> str:
# https://docs.python.org/2/library/sys.html#sys.platform
if sys.platform == 'win32':
@@ -444,7 +230,6 @@ def run_cmd(
def download(
url: str, path: str, timeout: Optional[Union[float, tuple]] = None
) -> None:
path = convert_path(path)
connection_timeout = timeout or float(os.getenv('DBT_HTTP_TIMEOUT', 10))
response = requests.get(url, timeout=connection_timeout)
with open(path, 'wb') as handle:
@@ -452,95 +237,14 @@ def download(
handle.write(block)
def rename(from_path: str, to_path: str, force: bool = False) -> None:
from_path = convert_path(from_path)
to_path = convert_path(to_path)
is_symlink = path_is_symlink(to_path)
if os.path.exists(to_path) and force:
if is_symlink:
remove_file(to_path)
else:
rmdir(to_path)
shutil.move(from_path, to_path)
def untar_package(
tar_path: str, dest_dir: str, rename_to: Optional[str] = None
) -> None:
tar_path = convert_path(tar_path)
tar_dir_name = None
with tarfile.open(tar_path, 'r') as tarball:
tarball.extractall(dest_dir)
tar_dir_name = os.path.commonprefix(tarball.getnames())
if rename_to:
downloaded_path = os.path.join(dest_dir, tar_dir_name)
desired_path = os.path.join(dest_dir, rename_to)
dbt.clients.system.rename(downloaded_path, desired_path, force=True)
def chmod_and_retry(func, path, exc_info):
"""Define an error handler to pass to shutil.rmtree.
On Windows, when a file is marked read-only as git likes to do, rmtree will
fail. To handle that, on errors try to make the file writable.
We want to retry most operations here, but listdir is one that we know will
be useless.
"""
if func is os.listdir or os.name != 'nt':
raise
os.chmod(path, stat.S_IREAD | stat.S_IWRITE)
# on error,this will raise.
func(path)
def _absnorm(path):
return os.path.normcase(os.path.abspath(path))
def move(src, dst):
"""A re-implementation of shutil.move that properly removes the source
directory on windows when it has read-only files in it and the move is
between two drives.
This is almost identical to the real shutil.move, except it uses our rmtree
and skips handling non-windows OSes since the existing one works ok there.
"""
src = convert_path(src)
dst = convert_path(dst)
if os.name != 'nt':
return shutil.move(src, dst)
if os.path.isdir(dst):
if _absnorm(src) == _absnorm(dst):
os.rename(src, dst)
return
dst = os.path.join(dst, os.path.basename(src.rstrip('/\\')))
if os.path.exists(dst):
raise EnvironmentError("Path '{}' already exists".format(dst))
try:
os.rename(src, dst)
except OSError:
# probably different drives
if os.path.isdir(src):
if _absnorm(dst + '\\').startswith(_absnorm(src + '\\')):
# dst is inside src
raise EnvironmentError(
"Cannot move a directory '{}' into itself '{}'"
.format(src, dst)
)
shutil.copytree(src, dst, symlinks=True)
rmtree(src)
else:
shutil.copy2(src, dst)
os.unlink(src)
def rmtree(path):
"""Recursively remove path. On permissions errors on windows, try to remove
the read-only flag and try again.
"""
path = convert_path(path)
return shutil.rmtree(path, onerror=chmod_and_retry)
downloaded_path = Path(os.path.join(dest_dir, tar_dir_name))
desired_path = Path(os.path.join(dest_dir, rename_to))
downloaded_path.rename(desired_path)

View File

@@ -8,7 +8,7 @@ import sqlparse
from dbt import flags
from dbt.adapters.factory import get_adapter
from dbt.clients import jinja
from dbt.clients.system import make_directory
from dbt.clients.storage import adapter as SA
from dbt.context.providers import generate_runtime_model
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.compiled import (
@@ -153,8 +153,8 @@ class Compiler:
self.config = config
def initialize(self):
make_directory(self.config.target_path)
make_directory(self.config.modules_path)
SA.write(self.config.target_path, None)
SA.write(self.config.modules_path, None)
# creates a ModelContext which is converted to
# a dict for jinja rendering of SQL

View File

@@ -4,7 +4,7 @@ import os
from dbt.dataclass_schema import ValidationError
from dbt.clients.system import load_file_contents
from dbt.clients.storage import adapter as SA
from dbt.clients.yaml_helper import load_yaml_text
from dbt.contracts.connection import Credentials, HasCredentials
from dbt.contracts.project import ProfileConfig, UserConfig
@@ -52,7 +52,7 @@ def read_profile(profiles_dir: str) -> Dict[str, Any]:
contents = None
if os.path.isfile(path):
try:
contents = load_file_contents(path, strip=False)
contents = SA.read(path, strip=False)
yaml_content = load_yaml_text(contents)
if not yaml_content:
msg = f'The profiles.yml file at {path} is empty'

View File

@@ -9,9 +9,8 @@ from typing_extensions import Protocol, runtime_checkable
import hashlib
import os
from dbt.clients.system import resolve_path_from_base
from dbt.clients.system import path_exists
from dbt.clients.system import load_file_contents
from dbt.clients.storage import adapter as SA
from dbt.clients import system
from dbt.clients.yaml_helper import load_yaml_text
from dbt.contracts.connection import QueryComment
from dbt.exceptions import DbtProjectError
@@ -77,16 +76,16 @@ class IsFQNResource(Protocol):
def _load_yaml(path):
contents = load_file_contents(path)
contents = SA.read(path)
return load_yaml_text(contents)
def package_data_from_root(project_root):
package_filepath = resolve_path_from_base(
package_filepath = system.resolve_path_from_base(
'packages.yml', project_root
)
if path_exists(package_filepath):
if SA.info(package_filepath):
packages_dict = _load_yaml(package_filepath)
else:
packages_dict = None
@@ -149,7 +148,7 @@ def _raw_project_from(project_root: str) -> Dict[str, Any]:
project_yaml_filepath = os.path.join(project_root, 'dbt_project.yml')
# get the project.yml contents
if not path_exists(project_yaml_filepath):
if not SA.info(project_yaml_filepath):
raise DbtProjectError(
'no dbt_project.yml found at expected path {}'
.format(project_yaml_filepath)

View File

@@ -7,11 +7,8 @@ from dbt.dataclass_schema import ValidationError
from .renderer import SelectorRenderer
from dbt.clients.system import (
load_file_contents,
path_exists,
resolve_path_from_base,
)
from dbt.clients.storage import adapter as SA
from dbt.clients import system
from dbt.contracts.selection import SelectorFile
from dbt.exceptions import DbtSelectorsError, RuntimeException
from dbt.graph import parse_from_selectors_definition, SelectionSpec
@@ -75,7 +72,7 @@ class SelectorConfig(Dict[str, SelectionSpec]):
cls, path: Path, renderer: SelectorRenderer,
) -> 'SelectorConfig':
try:
data = load_yaml_text(load_file_contents(str(path)))
data = load_yaml_text(SA.read(str(path))) # type: ignore
except (ValidationError, RuntimeException) as exc:
raise DbtSelectorsError(
f'Could not read selector file: {exc}',
@@ -91,12 +88,12 @@ class SelectorConfig(Dict[str, SelectionSpec]):
def selector_data_from_root(project_root: str) -> Dict[str, Any]:
selector_filepath = resolve_path_from_base(
selector_filepath = system.resolve_path_from_base( # type: ignore
'selectors.yml', project_root
)
if path_exists(selector_filepath):
selectors_dict = load_yaml_text(load_file_contents(selector_filepath))
if SA.info(selector_filepath):
selectors_dict = load_yaml_text(SA.read(selector_filepath)) # type: ignore # noqa
else:
selectors_dict = None
return selectors_dict

View File

@@ -19,7 +19,7 @@ from dbt.dataclass_schema import (
dbtClassMixin, ExtensibleDbtClassMixin
)
from dbt.clients.system import write_file
from dbt.clients.storage import adapter as SA
from dbt.contracts.files import FileHash, MAXIMUM_SEED_SIZE_NAME
from dbt.contracts.graph.unparsed import (
UnparsedNode, UnparsedDocumentation, Quoting, Docs,
@@ -216,7 +216,7 @@ class ParsedNodeDefaults(ParsedNodeMandatory):
target_path, subdirectory, self.package_name, path
)
write_file(full_path, payload)
SA.write(full_path, payload)
return full_path

View File

@@ -20,6 +20,9 @@ from dbt.utils import lowercase
from dbt.dataclass_schema import dbtClassMixin, StrEnum
import agate
import json
import dbt.utils
from dbt.clients.storage import adapter as SA
from dataclasses import dataclass, field
from datetime import datetime
@@ -27,8 +30,6 @@ from typing import (
Union, Dict, List, Optional, Any, NamedTuple, Sequence,
)
from dbt.clients.system import write_json
@dataclass
class TimingInfo(dbtClassMixin):
@@ -211,7 +212,8 @@ class RunResultsArtifact(ExecutionResult, ArtifactMixin):
)
def write(self, path: str):
write_json(path, self.to_dict(omit_none=False))
content = json.dumps(self.to_dict(omit_none=False), cls=dbt.utils.JSONEncoder)
SA.write(path, content)
@dataclass

View File

@@ -1,11 +1,12 @@
import dataclasses
import os
import json
from datetime import datetime
from typing import (
List, Tuple, ClassVar, Type, TypeVar, Dict, Any, Optional
)
from dbt.clients.system import write_json, read_json
from dbt.clients.storage import adapter as SA
from dbt.exceptions import (
InternalException,
RuntimeException,
@@ -13,6 +14,7 @@ from dbt.exceptions import (
from dbt.version import __version__
from dbt.tracking import get_invocation_id
from dbt.dataclass_schema import dbtClassMixin
import dbt.utils
SourceKey = Tuple[str, str]
@@ -56,10 +58,13 @@ class Mergeable(Replaceable):
class Writable:
def write(self, path: str):
write_json(
path, self.to_dict(omit_none=False) # type: ignore
content = json.dumps(
self.to_dict(omit_none=False), # type: ignore
cls=dbt.utils.JSONEncoder
)
SA.write(path, content)
class AdditionalPropertiesMixin:
"""Make this class an extensible property.
@@ -116,7 +121,7 @@ class Readable:
@classmethod
def read(cls, path: str):
try:
data = read_json(path)
data = json.loads(SA.read(path))
except (EnvironmentError, ValueError) as exc:
raise RuntimeException(
f'Could not read {cls.__name__} at "{path}" as JSON: {exc}'

View File

@@ -4,7 +4,7 @@ import tempfile
from contextlib import contextmanager
from typing import List, Optional, Generic, TypeVar
from dbt.clients import system
from dbt.clients.storage import adapter as SA
from dbt.contracts.project import ProjectPackageMetadata
from dbt.logger import GLOBAL_LOGGER as logger
@@ -30,13 +30,13 @@ def downloads_directory():
DOWNLOADS_PATH = tempfile.mkdtemp(prefix='dbt-downloads-')
remove_downloads = True
system.make_directory(DOWNLOADS_PATH)
SA.write(DOWNLOADS_PATH, None)
logger.debug("Set downloads directory='{}'".format(DOWNLOADS_PATH))
yield DOWNLOADS_PATH
if remove_downloads:
system.rmtree(DOWNLOADS_PATH)
SA.delete(DOWNLOADS_PATH)
DOWNLOADS_PATH = None

View File

@@ -1,8 +1,10 @@
import os
import hashlib
from pathlib import Path
from typing import List, Optional
from dbt.clients import git, system
from dbt.clients import git
import dbt.adapters.internal_storage.local_filesystem as local_SA
from dbt.config import Project
from dbt.contracts.project import (
ProjectPackageMetadata,
@@ -104,13 +106,9 @@ class GitPinnedPackage(GitPackageMixin, PinnedPackage):
def install(self, project, renderer):
dest_path = self.get_installation_path(project, renderer)
if os.path.exists(dest_path):
if system.path_is_symlink(dest_path):
system.remove_file(dest_path)
else:
system.rmdir(dest_path)
system.move(self._checkout(), dest_path)
local_SA.delete(dest_path)
checkout_path = Path(self._checkout())
checkout_path.rename(dest_path)
class GitUnpinnedPackage(GitPackageMixin, UnpinnedPackage[GitPinnedPackage]):

View File

@@ -1,12 +1,12 @@
import shutil
from pathlib import Path
from dbt.clients import system
import dbt.adapters.internal_storage.local_filesystem as local_SA
from dbt.deps.base import PinnedPackage, UnpinnedPackage
from dbt.contracts.project import (
ProjectPackageMetadata,
LocalPackage,
)
from dbt.logger import GLOBAL_LOGGER as logger
class LocalPackageMixin:
@@ -45,25 +45,12 @@ class LocalPinnedPackage(LocalPackageMixin, PinnedPackage):
return ProjectPackageMetadata.from_project(loaded)
def install(self, project, renderer):
src_path = self.resolve_path(project)
src_path = Path(self.resolve_path(project))
dest_path = self.get_installation_path(project, renderer)
can_create_symlink = system.supports_symlinks()
if system.path_exists(dest_path):
if not system.path_is_symlink(dest_path):
system.rmdir(dest_path)
else:
system.remove_file(dest_path)
if can_create_symlink:
logger.debug(' Creating symlink to local dependency.')
system.make_symlink(src_path, dest_path)
else:
logger.debug(' Symlinks are not available on this '
'OS, copying dependency.')
shutil.copytree(src_path, dest_path)
local_SA.delete(dest_path)
src_path.rename(dest_path)
# TODO: is it ok to remove symlinking?
# Symlinks aren't really a thing outside of filesystems and will be hard to model in SAs
class LocalUnpinnedPackage(

View File

@@ -3,6 +3,7 @@ from typing import List
from dbt import semver
from dbt.clients import registry, system
from dbt.clients.storage import adapter as SA
from dbt.contracts.project import (
RegistryPackageMetadata,
RegistryPackage,
@@ -58,7 +59,7 @@ class RegistryPinnedPackage(RegistryPackageMixin, PinnedPackage):
tar_path = os.path.realpath(
os.path.join(get_downloads_path(), tar_name)
)
system.make_directory(os.path.dirname(tar_path))
SA.write(os.path.dirname(tar_path), None)
download_url = metadata.downloads.tarball
system.download(download_url, tar_path)

View File

@@ -9,6 +9,7 @@ import time
import warnings
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Optional, List, ContextManager, Callable, Dict, Any, Set
import colorama
@@ -360,8 +361,8 @@ initialized = False
def make_log_dir_if_missing(log_dir):
import dbt.clients.system
dbt.clients.system.make_directory(log_dir)
# N.B: Storage adapters can't be used in the logger (circular imports)
Path(log_dir).mkdir(parents=True, exist_ok=True)
class DebugWarnings(logbook.compat.redirected_warnings):

View File

@@ -20,7 +20,7 @@ from dbt.logger import GLOBAL_LOGGER as logger, DbtProcessState
from dbt.node_types import NodeType
from dbt.clients.jinja import get_rendered, MacroStack
from dbt.clients.jinja_static import statically_extract_macro_calls
from dbt.clients.system import make_directory
from dbt.clients.storage import adapter as SA
from dbt.config import Project, RuntimeConfig
from dbt.context.docs import generate_runtime_docs
from dbt.context.macro_resolver import MacroResolver, TestMacroNamespace
@@ -445,9 +445,7 @@ class ManifestLoader:
PARTIAL_PARSE_FILE_NAME)
try:
manifest_msgpack = self.manifest.to_msgpack()
make_directory(os.path.dirname(path))
with open(path, 'wb') as fp:
fp.write(manifest_msgpack)
SA.write(path, manifest_msgpack)
except Exception:
raise

View File

@@ -1,4 +1,4 @@
from dbt.clients.system import load_file_contents
from dbt.clients.storage import adapter as SA
from dbt.contracts.files import (
FilePath, ParseFileType, SourceFile, FileHash, AnySourceFile, SchemaSourceFile
)
@@ -12,8 +12,8 @@ from typing import Optional
# This loads the files contents and creates the SourceFile object
def load_source_file(
path: FilePath, parse_file_type: ParseFileType,
project_name: str) -> Optional[AnySourceFile]:
file_contents = load_file_contents(path.absolute_path, strip=False)
project_name: str) -> AnySourceFile:
file_contents = SA.read(path.absolute_path, strip=False)
checksum = FileHash.from_contents(file_contents)
sf_cls = SchemaSourceFile if parse_file_type == ParseFileType.Schema else SourceFile
source_file = sf_cls(path=path, checksum=checksum,
@@ -58,7 +58,7 @@ def load_seed_source_file(match: FilePath, project_name) -> SourceFile:
# We don't want to calculate a hash of this file. Use the path.
source_file = SourceFile.big_seed(match)
else:
file_contents = load_file_contents(match.absolute_path, strip=False)
file_contents = SA.read(match.absolute_path, strip=False)
checksum = FileHash.from_contents(file_contents)
source_file = SourceFile(path=match, checksum=checksum)
source_file.contents = ''

View File

@@ -7,6 +7,7 @@ from typing import Optional, Dict, Any, List
from dbt.logger import GLOBAL_LOGGER as logger
import dbt.clients.system
import dbt.exceptions
from dbt.clients.storage import adapter as SA
from dbt.adapters.factory import get_adapter, register_adapter
from dbt.config import Project, Profile, PROFILES_DIR
from dbt.config.renderer import DbtProjectYamlRenderer, ProfileRenderer
@@ -255,7 +256,7 @@ class DebugTask(BaseTask):
try:
raw_profile_data = load_yaml_text(
dbt.clients.system.load_file_contents(self.profile_path)
SA.read(self.profile_path)
)
except Exception:
pass # we'll report this when we try to load the profile for real

View File

@@ -9,7 +9,7 @@ from dbt.deps.base import downloads_directory
from dbt.deps.resolver import resolve_packages
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.clients import system
from dbt.clients.storage import adapter as SA
from dbt.task.base import BaseTask, move_to_nearest_project_dir
@@ -43,7 +43,7 @@ class DepsTask(BaseTask):
)
def run(self):
system.make_directory(self.config.modules_path)
SA.write(self.config.modules_path, None)
packages = self.config.packages.packages
if not packages:
logger.info('Warning: No packages were found in packages.yml')

View File

@@ -3,6 +3,7 @@ import shutil
import dbt.config
import dbt.clients.system
from dbt.clients.storage import adapter as SA
from dbt.version import _get_adapter_plugin_names
from dbt.adapters.factory import load_plugin, get_include_paths
@@ -50,7 +51,7 @@ class InitTask(BaseTask):
if not os.path.exists(profiles_dir):
msg = "Creating dbt configuration folder at {}"
logger.info(msg.format(profiles_dir))
dbt.clients.system.make_directory(profiles_dir)
SA.write(profiles_dir, None)
return True
return False

View File

@@ -12,7 +12,7 @@ from dbt.parser.manifest import (
Manifest, ManifestLoader, _check_manifest
)
from dbt.logger import DbtProcessState, print_timestamped_line
from dbt.clients.system import write_file
from dbt.clients.storage import adapter as SA
from dbt.graph import Graph
import time
from typing import Optional
@@ -38,8 +38,14 @@ class ParseTask(ConfiguredTask):
def write_perf_info(self):
path = os.path.join(self.config.target_path, PERF_INFO_FILE_NAME)
write_file(path, json.dumps(self.loader._perf_info,
cls=dbt.utils.JSONEncoder, indent=4))
SA.write(
path,
json.dumps(
self.loader._perf_info,
cls=dbt.utils.JSONEncoder,
indent=4
)
)
print_timestamped_line(f"Performance info: {path}")
# This method takes code that normally exists in other files

View File

@@ -10,7 +10,7 @@ from dbt.contracts.results import (
CatalogArtifact, RunResultsArtifact, FreshnessExecutionResultArtifact
)
from dbt.contracts.util import VersionedSchema
from dbt.clients.system import write_file
from dbt.adapters.internal_storage import local_filesystem as local_SA
@dataclass
@@ -31,7 +31,7 @@ class ArtifactInfo:
)
def write_schema(self, dest_dir: Path):
write_file(
local_SA.write(
str(dest_dir / self.path),
json.dumps(self.json_schema, indent=2)
)

View File

@@ -8,47 +8,6 @@ from dbt.exceptions import ExecutableError, WorkingDirectoryError
import dbt.clients.system
class SystemClient(unittest.TestCase):
def setUp(self):
super().setUp()
self.tmp_dir = mkdtemp()
self.profiles_path = '{}/profiles.yml'.format(self.tmp_dir)
def set_up_profile(self):
with open(self.profiles_path, 'w') as f:
f.write('ORIGINAL_TEXT')
def get_profile_text(self):
with open(self.profiles_path, 'r') as f:
return f.read()
def tearDown(self):
try:
shutil.rmtree(self.tmp_dir)
except:
pass
def test__make_file_when_exists(self):
self.set_up_profile()
written = dbt.clients.system.make_file(self.profiles_path, contents='NEW_TEXT')
self.assertFalse(written)
self.assertEqual(self.get_profile_text(), 'ORIGINAL_TEXT')
def test__make_file_when_not_exists(self):
written = dbt.clients.system.make_file(self.profiles_path, contents='NEW_TEXT')
self.assertTrue(written)
self.assertEqual(self.get_profile_text(), 'NEW_TEXT')
def test__make_file_with_overwrite(self):
self.set_up_profile()
written = dbt.clients.system.make_file(self.profiles_path, contents='NEW_TEXT', overwrite=True)
self.assertTrue(written)
self.assertEqual(self.get_profile_text(), 'NEW_TEXT')
class TestRunCmd(unittest.TestCase):
"""Test `run_cmd`.