Compare commits

...

1 Commits

Author SHA1 Message Date
Jeremy Cohen
a066c1e8bb Include adapter_response for freshness queries 2022-11-15 14:53:55 +01:00
3 changed files with 8 additions and 6 deletions

View File

@@ -1096,7 +1096,8 @@ class BaseAdapter(metaclass=AdapterMeta):
}
# run the macro
table = self.execute_macro(FRESHNESS_MACRO_NAME, kwargs=kwargs, manifest=manifest)
result = self.execute_macro(FRESHNESS_MACRO_NAME, kwargs=kwargs, manifest=manifest)
adapter_response, table = result.response, result.table
# now we have a 1-row table of the maximum `loaded_at_field` value and
# the current time according to the db.
if len(table) != 1 or len(table[0]) != 2:
@@ -1114,11 +1115,12 @@ class BaseAdapter(metaclass=AdapterMeta):
snapshotted_at = _utc(table[0][1], source, loaded_at_field)
age = (snapshotted_at - max_loaded_at).total_seconds()
return {
freshness = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
}
return adapter_response, freshness
def pre_model_hook(self, config: Mapping[str, Any]) -> Any:
"""A hook for running some operation before the model materialization

View File

@@ -12,5 +12,5 @@
where {{ filter }}
{% endif %}
{% endcall %}
{{ return(load_result('collect_freshness').table) }}
{{ return(load_result('collect_freshness')) }}
{% endmacro %}

View File

@@ -105,10 +105,10 @@ class FreshnessRunner(BaseRunner):
)
relation = self.adapter.Relation.create_from_source(compiled_node)
# given a Source, calculate its fresnhess.
# given a Source, calculate its freshness.
with self.adapter.connection_for(compiled_node):
self.adapter.clear_transaction()
freshness = self.adapter.calculate_freshness(
adapter_response, freshness = self.adapter.calculate_freshness(
relation,
compiled_node.loaded_at_field,
compiled_node.freshness.filter,
@@ -124,7 +124,7 @@ class FreshnessRunner(BaseRunner):
timing=[],
execution_time=0,
message=None,
adapter_response={},
adapter_response=adapter_response.to_dict(omit_none=True),
failures=None,
**freshness,
)