Compare commits

...

6 Commits

Author SHA1 Message Date
Ian Knox
8507a5b7b7 Merge pull request #3693 from dbt-labs/local-fs-sa-skateboard
Basic local FS functionality
2021-08-13 17:06:57 -05:00
Ian Knox
2b961c5543 Merge branch 'feature/local-fs-storage-adapter' into local-fs-sa-skateboard 2021-08-13 17:06:32 -05:00
Ian Knox
db2ef4d084 Update core/dbt/clients/storage.py
Co-authored-by: leahwicz <60146280+leahwicz@users.noreply.github.com>
2021-08-05 09:42:27 -05:00
Ian Knox
ed3dbbdf97 Update core/dbt/adapters/internal_storage/local_filesystem.py
Co-authored-by: leahwicz <60146280+leahwicz@users.noreply.github.com>
2021-08-05 09:42:18 -05:00
Ian Knox
a1af19af68 Local FS storage adapter, pass 2- better logic, lots of edge cases fixed, type hinting 2021-08-04 08:29:12 -05:00
Ian Knox
ad8cbfcd22 * system_client logic -> local_filesystem adapter
* basic storage_adapter logic for dynamic loading
2021-07-30 13:34:57 -05:00
16 changed files with 320 additions and 39 deletions

View File

@@ -0,0 +1 @@
Move the warehouse adapters to a subfolder of adapters (adapters/warehouse).

View File

@@ -0,0 +1,218 @@
from pathlib import Path
from shutil import rmtree
from stat import S_IRUSR, S_IWUSR
from sys import platform
from typing import Any, Union
from typing_extensions import Literal
from dbt.logger import GLOBAL_LOGGER as logger
# windows platforms as returned by sys.platform
# via https://stackoverflow.com/a/13874620
WINDOWS_PLATFORMS = ("win32", "cygwin", "msys")
# load ctypes for windows platforms
if platform in WINDOWS_PLATFORMS:
from ctypes import WinDLL, c_bool
else:
WinDLL = None
c_bool = None
def ready_check() -> bool:
"""Ensures the adapter is ready for use.
Returns:
`True` if the resource is ready to be used.
`False` if the resource is not ready to be used.
Raises:
TBD - TODO: How best to report back errors here?
It should never fail for a filesystem (unless we're using something very exotic),
but for databases this should be our primary source of troubleshooting information.
"""
return True
def read(
path: str,
strip: bool = True,
) -> str:
"""Reads the content of a file on the filesystem.
Args:
path: Full path of file to be read.
strip: Wether or not to strip whitespace.
Returns:
Content of the file
"""
# create a concrete path object
path: Path = Path(path)
# read the file in as a string, or none if not found
file_content = path.read_text(encoding="utf-8")
# conditionally strip whitespace
file_content = file_content.strip() if strip else file_content
return file_content
def write(
path: str,
content: Union[str, bytes, None],
overwrite: bool = False,
) -> bool:
"""Writes the given content out to a resource on the filesystem.
Since there are many different ways to approach filesystem operations, It's best to enumerate
the rules(tm):
If the path to the file/dir doesn't exist, it will be created.
If content is `None` a directory will be created instead of a file.
If contet is not a `str` or `bytes` the string representation of the object will be written.
If the content is `str` it will be encoded as utf-8
Overwrites are only supported for files and are toggled by the overwrite parameter.
All logical cases outside those outlined above will result in failure
Args:
path: Full path of resource to be written.
content: Data to be written.
overwrite: Wether or not to overwrite if a file already exists at this path.
parser: A parser to apply to file data.
Returns:
`True` for success, `False` otherwise.
"""
# TODO: double check I hit all possible permutations here! (IK)
# create a concrete path object
path: Path = Path(path)
# handle overwrite errors for files and directories
if path.exists() and (overwrite is False or path.is_dir() is True):
logger.debug(f"{path} already exists")
return False
# handle trying to write file content to a path that is a directory
if path.is_dir() and content is not None:
logger.debug(f"{path} is a directory, but file content was specified")
return False
# create a directory if the content is `None`
if content is None:
path.mkdir(parents=True, exist_ok=True)
# create an empty file if the content is an empty string
elif content == "" and path.exists() is False:
path.touch()
# write out to a file
else:
try:
path.parent.mkdir(parents=True, exist_ok=True)
if type(content) == bytes:
path.write_bytes(content)
else:
path.write_text(str(content), encoding="utf-8")
except Exception as e:
# TODO: The block below was c/p'd directly from the old system client.
# We should examine if this actually makes sense to log file write failures and
# try to keep going.
if platform in WINDOWS_PLATFORMS:
# sometimes we get a winerror of 3 which means the path was
# definitely too long, but other times we don't and it means the
# path was just probably too long. This is probably based on the
# windows/python version.
if getattr(e, "winerror", 0) == 3:
reason = "Path was too long"
else:
reason = "Path was possibly too long"
# all our hard work and the path was still too long. Log and
# continue.
logger.debug(
f"Could not write to path {path}({len(path)} characters): "
f"{reason}\nexception: {e}"
)
else:
raise
return True
def delete(path: str) -> bool:
"""Deletes the resource at the given path
Args:
path: Full path of resource to be deleted
"""
# create concrete path object
path: Path = Path(path)
# ensure resource to be deleted exists
if not path.exists():
return False
# remove files
if path.is_file():
path.unlink()
# remove directories recursively, surprisingly obnoxious to do in a cross-platform safe manner
if path.is_dir():
if platform in WINDOWS_PLATFORMS:
# error handling for permissions on windows platforms
def on_error(func, path, _):
path.chmod(path, S_IWUSR | S_IRUSR)
func(path)
rmtree(path, onerror=on_error)
else:
rmtree(path)
return True
def find():
pass
def info(path: str) -> Union[dict, Literal[False]]:
"""Provides information about what is found at the given path.
If info is called on a directory the size will be `None`
if Info is called on a resource that does not exist the response will be `False`
N.B: despite my best efforts, getting a reliable cross-platform file creation time is
absurdly difficult.
See these links for information if we ever decide we have to have this feature:
* https://bugs.python.org/issue39533
* https://github.com/ipfs-shipyard/py-datastore/blob/e566d40a8ca81d8628147e255fe7830b5f928a43/datastore/filesystem/util/statx.py # noqa: E501
* https://github.com/ckarageorgkaneen/pystatx
Args:
path: Full path being queried.
Returns:
On success: A dict containing information about what is found at the given path.
On failure: `False`
"""
# create a concrete path object.
path: Path = Path(path)
# return `False` if the resource doesn't exsist
if not path.exists():
return False
# calulate file size (`None` for dirs)
size = None if path.is_dir() else path.stat().st_size
# return info on the resource
return {"size": size, "modified_at": path.stat().st_mtime}

View File

@@ -1,7 +1,8 @@
import re
import os.path
from dbt.clients.system import run_cmd, rmdir
from dbt.clients.system import run_cmd
from dbt.clients.storage import adapter as SA
from dbt.logger import GLOBAL_LOGGER as logger
import dbt.exceptions
from packaging import version
@@ -42,7 +43,7 @@ def clone(repo, cwd, dirname=None, remove_git_dir=False, revision=None, subdirec
run_cmd(os.path.join(cwd, dirname or ''), ['git', 'sparse-checkout', 'set', subdirectory])
if remove_git_dir:
rmdir(os.path.join(dirname, '.git'))
SA.rmdir(os.path.join(dirname, '.git'))
return result

View File

@@ -0,0 +1,27 @@
from importlib import import_module
from os import getenv
from dbt.logger import GLOBAL_LOGGER as logger
from .storage_adapter_type import StorageAdapterType
from typing import cast
# TODO:
# ensure configured adapter has the correct module signature
# provide better not-ready error messaging
# get configured storage adapter
_module_to_load = getenv(
"DBT_STORAGE_ADAPTER", "dbt.adapters.internal_storage.local_filesystem"
)
# load module if it exists
try:
_adapter = cast(StorageAdapterType, import_module(_module_to_load))
logger.debug(f"Storage adapter {_module_to_load} loaded")
except ModuleNotFoundError:
logger.error(f"Storage adapter {_module_to_load} not found")
# run ready check
if not _adapter.ready_check():
logger.error(f"Storage Adapter{_module_to_load} not ready")
else:
adapter = _adapter

View File

@@ -0,0 +1,37 @@
from typing import Any, Dict, Union
from typing_extensions import Literal
class StorageAdapterType:
"""Class that defines type hinting for storage adapters.
This is needed because importlib (used in the storage client) and mypy aren't exactly friends.
N.B: https://stackoverflow.com/questions/48976499/mypy-importlib-module-functions
"""
@staticmethod
def ready_check() -> bool:
pass
@staticmethod
def read(path: str, strip: bool = ...) -> str:
pass
@staticmethod
def write(
path: str, content: Union[str, Dict[str, Any], None], overwrite: bool = ...
) -> bool:
pass
@staticmethod
def delete(path: str) -> bool:
pass
@staticmethod
def find() -> None:
pass
@staticmethod
def info(path: str) -> Union[dict, Literal[False]]:
pass

View File

@@ -8,7 +8,7 @@ import sqlparse
from dbt import flags
from dbt.adapters.factory import get_adapter
from dbt.clients import jinja
from dbt.clients.system import make_directory
from dbt.clients.storage import adapter as SA
from dbt.context.providers import generate_runtime_model
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.compiled import (
@@ -153,8 +153,8 @@ class Compiler:
self.config = config
def initialize(self):
make_directory(self.config.target_path)
make_directory(self.config.modules_path)
SA.write(self.config.target_path, None)
SA.write(self.config.modules_path, None)
# creates a ModelContext which is converted to
# a dict for jinja rendering of SQL

View File

@@ -4,7 +4,7 @@ import os
from dbt.dataclass_schema import ValidationError
from dbt.clients.system import load_file_contents
from dbt.clients.storage import adapter as SA
from dbt.clients.yaml_helper import load_yaml_text
from dbt.contracts.connection import Credentials, HasCredentials
from dbt.contracts.project import ProfileConfig, UserConfig
@@ -52,7 +52,7 @@ def read_profile(profiles_dir: str) -> Dict[str, Any]:
contents = None
if os.path.isfile(path):
try:
contents = load_file_contents(path, strip=False)
contents = SA.read(path, strip=False)
yaml_content = load_yaml_text(contents)
if not yaml_content:
msg = f'The profiles.yml file at {path} is empty'

View File

@@ -9,9 +9,8 @@ from typing_extensions import Protocol, runtime_checkable
import hashlib
import os
from dbt.clients.system import resolve_path_from_base
from dbt.clients.system import path_exists
from dbt.clients.system import load_file_contents
from dbt.clients.storage import adapter as SA
from dbt.clients import system
from dbt.clients.yaml_helper import load_yaml_text
from dbt.contracts.connection import QueryComment
from dbt.exceptions import DbtProjectError
@@ -77,16 +76,16 @@ class IsFQNResource(Protocol):
def _load_yaml(path):
contents = load_file_contents(path)
contents = SA.read(path)
return load_yaml_text(contents)
def package_data_from_root(project_root):
package_filepath = resolve_path_from_base(
package_filepath = system.resolve_path_from_base(
'packages.yml', project_root
)
if path_exists(package_filepath):
if SA.info(package_filepath):
packages_dict = _load_yaml(package_filepath)
else:
packages_dict = None
@@ -149,7 +148,7 @@ def _raw_project_from(project_root: str) -> Dict[str, Any]:
project_yaml_filepath = os.path.join(project_root, 'dbt_project.yml')
# get the project.yml contents
if not path_exists(project_yaml_filepath):
if not SA.info(project_yaml_filepath):
raise DbtProjectError(
'no dbt_project.yml found at expected path {}'
.format(project_yaml_filepath)

View File

@@ -7,11 +7,8 @@ from dbt.dataclass_schema import ValidationError
from .renderer import SelectorRenderer
from dbt.clients.system import (
load_file_contents,
path_exists,
resolve_path_from_base,
)
from dbt.clients.storage import adapter as SA
from dbt.clients import system
from dbt.contracts.selection import SelectorFile
from dbt.exceptions import DbtSelectorsError, RuntimeException
from dbt.graph import parse_from_selectors_definition, SelectionSpec
@@ -75,7 +72,7 @@ class SelectorConfig(Dict[str, SelectionSpec]):
cls, path: Path, renderer: SelectorRenderer,
) -> 'SelectorConfig':
try:
data = load_yaml_text(load_file_contents(str(path)))
data = load_yaml_text(SA.read(str(path))) # type: ignore
except (ValidationError, RuntimeException) as exc:
raise DbtSelectorsError(
f'Could not read selector file: {exc}',
@@ -91,12 +88,12 @@ class SelectorConfig(Dict[str, SelectionSpec]):
def selector_data_from_root(project_root: str) -> Dict[str, Any]:
selector_filepath = resolve_path_from_base(
selector_filepath = system.resolve_path_from_base( # type: ignore
'selectors.yml', project_root
)
if path_exists(selector_filepath):
selectors_dict = load_yaml_text(load_file_contents(selector_filepath))
if SA.info(selector_filepath):
selectors_dict = load_yaml_text(SA.read(selector_filepath)) # type: ignore # noqa
else:
selectors_dict = None
return selectors_dict

View File

@@ -4,7 +4,7 @@ import tempfile
from contextlib import contextmanager
from typing import List, Optional, Generic, TypeVar
from dbt.clients import system
from dbt.clients.storage import adapter as SA
from dbt.contracts.project import ProjectPackageMetadata
from dbt.logger import GLOBAL_LOGGER as logger
@@ -30,13 +30,13 @@ def downloads_directory():
DOWNLOADS_PATH = tempfile.mkdtemp(prefix='dbt-downloads-')
remove_downloads = True
system.make_directory(DOWNLOADS_PATH)
SA.write(DOWNLOADS_PATH, None)
logger.debug("Set downloads directory='{}'".format(DOWNLOADS_PATH))
yield DOWNLOADS_PATH
if remove_downloads:
system.rmtree(DOWNLOADS_PATH)
SA.delete(DOWNLOADS_PATH)
DOWNLOADS_PATH = None

View File

@@ -3,6 +3,7 @@ from typing import List
from dbt import semver
from dbt.clients import registry, system
from dbt.clients.storage import adapter as SA
from dbt.contracts.project import (
RegistryPackageMetadata,
RegistryPackage,
@@ -58,7 +59,7 @@ class RegistryPinnedPackage(RegistryPackageMixin, PinnedPackage):
tar_path = os.path.realpath(
os.path.join(get_downloads_path(), tar_name)
)
system.make_directory(os.path.dirname(tar_path))
SA.write(os.path.dirname(tar_path), None)
download_url = metadata.downloads.tarball
system.download(download_url, tar_path)

View File

@@ -20,7 +20,7 @@ from dbt.logger import GLOBAL_LOGGER as logger, DbtProcessState
from dbt.node_types import NodeType
from dbt.clients.jinja import get_rendered, MacroStack
from dbt.clients.jinja_static import statically_extract_macro_calls
from dbt.clients.system import make_directory
from dbt.clients.storage import adapter as SA
from dbt.config import Project, RuntimeConfig
from dbt.context.docs import generate_runtime_docs
from dbt.context.macro_resolver import MacroResolver, TestMacroNamespace
@@ -445,9 +445,7 @@ class ManifestLoader:
PARTIAL_PARSE_FILE_NAME)
try:
manifest_msgpack = self.manifest.to_msgpack()
make_directory(os.path.dirname(path))
with open(path, 'wb') as fp:
fp.write(manifest_msgpack)
SA.write(path, manifest_msgpack)
except Exception:
raise

View File

@@ -1,4 +1,4 @@
from dbt.clients.system import load_file_contents
from dbt.clients.storage import adapter as SA
from dbt.contracts.files import (
FilePath, ParseFileType, SourceFile, FileHash, AnySourceFile, SchemaSourceFile
)
@@ -12,8 +12,8 @@ from typing import Optional
# This loads the files contents and creates the SourceFile object
def load_source_file(
path: FilePath, parse_file_type: ParseFileType,
project_name: str) -> Optional[AnySourceFile]:
file_contents = load_file_contents(path.absolute_path, strip=False)
project_name: str) -> AnySourceFile:
file_contents = SA.read(path.absolute_path, strip=False)
checksum = FileHash.from_contents(file_contents)
sf_cls = SchemaSourceFile if parse_file_type == ParseFileType.Schema else SourceFile
source_file = sf_cls(path=path, checksum=checksum,
@@ -58,7 +58,7 @@ def load_seed_source_file(match: FilePath, project_name) -> SourceFile:
# We don't want to calculate a hash of this file. Use the path.
source_file = SourceFile.big_seed(match)
else:
file_contents = load_file_contents(match.absolute_path, strip=False)
file_contents = SA.read(match.absolute_path, strip=False)
checksum = FileHash.from_contents(file_contents)
source_file = SourceFile(path=match, checksum=checksum)
source_file.contents = ''

View File

@@ -7,6 +7,7 @@ from typing import Optional, Dict, Any, List
from dbt.logger import GLOBAL_LOGGER as logger
import dbt.clients.system
import dbt.exceptions
from dbt.clients.storage import adapter as SA
from dbt.adapters.factory import get_adapter, register_adapter
from dbt.config import Project, Profile, PROFILES_DIR
from dbt.config.renderer import DbtProjectYamlRenderer, ProfileRenderer
@@ -255,7 +256,7 @@ class DebugTask(BaseTask):
try:
raw_profile_data = load_yaml_text(
dbt.clients.system.load_file_contents(self.profile_path)
SA.read(self.profile_path)
)
except Exception:
pass # we'll report this when we try to load the profile for real

View File

@@ -9,7 +9,7 @@ from dbt.deps.base import downloads_directory
from dbt.deps.resolver import resolve_packages
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.clients import system
from dbt.clients.storage import adapter as SA
from dbt.task.base import BaseTask, move_to_nearest_project_dir
@@ -43,7 +43,7 @@ class DepsTask(BaseTask):
)
def run(self):
system.make_directory(self.config.modules_path)
SA.write(self.config.modules_path, None)
packages = self.config.packages.packages
if not packages:
logger.info('Warning: No packages were found in packages.yml')

View File

@@ -3,6 +3,7 @@ import shutil
import dbt.config
import dbt.clients.system
from dbt.clients.storage import adapter as SA
from dbt.version import _get_adapter_plugin_names
from dbt.adapters.factory import load_plugin, get_include_paths
@@ -50,7 +51,7 @@ class InitTask(BaseTask):
if not os.path.exists(profiles_dir):
msg = "Creating dbt configuration folder at {}"
logger.info(msg.format(profiles_dir))
dbt.clients.system.make_directory(profiles_dir)
SA.write(profiles_dir, None)
return True
return False