Compare commits

...

1 Commits

Author SHA1 Message Date
Jeremy Cohen
159e295a6b Tracking with old + new collector endpoints 2023-04-04 12:24:37 +02:00

View File

@@ -28,7 +28,8 @@ from dbt.exceptions import FailedToConnectError, NotImplementedError
sp_logger.setLevel(100)
COLLECTOR_URL = "fishtownanalytics.sinter-collect.com"
OLD_COLLECTOR_URL = "fishtownanalytics.sinter-collect.com"
NEW_COLLECTOR_URL = "collector.getdbt.com"
COLLECTOR_PROTOCOL = "https"
DBT_INVOCATION_ENV = "DBT_INVOCATION_ENV"
@@ -48,10 +49,10 @@ RUNNABLE_TIMING = "iglu:com.dbt/runnable/jsonschema/1-0-0"
RUN_MODEL_SPEC = "iglu:com.dbt/run_model/jsonschema/1-0-2"
class TimeoutEmitter(Emitter):
class OldTimeoutEmitter(Emitter):
def __init__(self):
super().__init__(
COLLECTOR_URL,
OLD_COLLECTOR_URL,
protocol=COLLECTOR_PROTOCOL,
buffer_size=30,
on_failure=self.handle_failure,
@@ -100,9 +101,68 @@ class TimeoutEmitter(Emitter):
return r
emitter = TimeoutEmitter()
tracker = Tracker(
emitter,
class NewTimeoutEmitter(Emitter):
def __init__(self):
super().__init__(
NEW_COLLECTOR_URL,
protocol=COLLECTOR_PROTOCOL,
buffer_size=30,
on_failure=self.handle_failure,
method="post",
# don't set this.
byte_limit=None,
)
@staticmethod
def handle_failure(num_ok, unsent):
# num_ok will always be 0, unsent will always be 1 entry long, because
# the buffer is length 1, so not much to talk about
fire_event(DisableTracking())
disable_tracking()
def _log_request(self, request, payload):
sp_logger.info(f"Sending {request} request to {self.endpoint}...")
sp_logger.debug(f"Payload: {payload}")
def _log_result(self, request, status_code):
msg = f"{request} request finished with status code: {status_code}"
if self.is_good_status_code(status_code):
sp_logger.info(msg)
else:
sp_logger.warning(msg)
def http_post(self, payload):
self._log_request("POST", payload)
r = requests.post(
self.endpoint,
data=payload,
headers={"content-type": "application/json; charset=utf-8"},
timeout=5.0,
)
self._log_result("GET", r.status_code)
return r
def http_get(self, payload):
self._log_request("GET", payload)
r = requests.get(self.endpoint, params=payload, timeout=5.0)
self._log_result("GET", r.status_code)
return r
old_emitter = OldTimeoutEmitter()
old_tracker = Tracker(
old_emitter,
namespace="cf",
app_id="dbt",
)
new_emitter = NewTimeoutEmitter()
new_tracker = Tracker(
new_emitter,
namespace="cf",
app_id="dbt",
)
@@ -132,13 +192,15 @@ class User:
subject = Subject()
subject.set_user_id(self.id)
tracker.set_subject(subject)
old_tracker.set_subject(subject)
new_tracker.set_subject(subject)
def disable_tracking(self):
self.do_not_track = True
self.id = None
self.cookie_dir = None
tracker.set_subject(None)
old_tracker.set_subject(None)
new_tracker.set_subject(None)
def set_cookie(self):
# If the user points dbt to a profile directory which exists AND
@@ -207,7 +269,8 @@ def track(user, *args, **kwargs):
else:
fire_event(SendingEvent(kwargs=str(kwargs)))
try:
tracker.track_struct_event(*args, **kwargs)
old_tracker.track_struct_event(*args, **kwargs)
new_tracker.track_struct_event(*args, **kwargs)
except Exception:
fire_event(SendEventFailure())
@@ -424,7 +487,8 @@ def track_runnable_timing(options):
def flush():
fire_event(FlushEvents())
try:
tracker.flush()
old_tracker.flush()
new_tracker.flush()
except Exception:
fire_event(FlushEventsFailure())