Fix state file being ignored when pipline_name includes FILENAME_SEPARATOR

This commit is contained in:
travior
2025-12-08 16:46:34 +01:00
parent c4515d7112
commit b38e279871
2 changed files with 22 additions and 3 deletions

View File

@@ -734,11 +734,14 @@ class FilesystemClient(
for filepath in all_files:
filename = os.path.splitext(os.path.basename(filepath))[0]
fileparts = filename.split(FILENAME_SEPARATOR)
if len(fileparts) != 3:
if len(fileparts) < 3:
continue
# pipeline name could include `FILENAME_SEPARATOR`, if so we have to put it back together
*pipeline_name_parts, load_id, version_hash = fileparts
pipeline_name_from_file = FILENAME_SEPARATOR.join(pipeline_name_parts)
# Filters only if pipeline_name provided
if pipeline_name is None or fileparts[0] == pipeline_name:
yield filepath, fileparts
if pipeline_name is None or pipeline_name_from_file == pipeline_name:
yield filepath, [pipeline_name_from_file, load_id, version_hash]
def _store_load(self, load_id: str) -> None:
# write entry to load "table"

View File

@@ -421,3 +421,19 @@ def test_get_storage_version_invalid(invalid_version_info: Union[str, Dict[str,
else:
with pytest.raises(UnsupportedStorageVersionException):
client.get_storage_versions()
def test_list_dlt_table_files_with_separator_in_pipeline_name() -> None:
filesystem_ = filesystem("random_location")
client = _client_factory(filesystem_)
client.initialize_storage()
state_table_dir = client.get_table_dir(client.schema.state_table_name)
client.fs_client.mkdirs(state_table_dir)
test_file = client.pathlib.join(state_table_dir, "my__pipeline__load123__hash123.jsonl")
client.fs_client.touch(test_file)
results = list(client._list_dlt_table_files(client.schema.state_table_name))
assert len(results) == 1
assert results[0][0] == test_file
assert results[0][1] == ["my__pipeline", "load123", "hash123"]