From b1b3bd20f8e84a0582c6666de180f9a800086faa Mon Sep 17 00:00:00 2001 From: "Edgar R. M" Date: Thu, 27 Jul 2023 12:57:03 -0600 Subject: [PATCH] feat: Add `_sdc_sync_started_at` metadata column to indicate the start of the target process (#1878) --- singer_sdk/plugin_base.py | 13 +++++++ singer_sdk/sinks/core.py | 12 ++++-- singer_sdk/sinks/sql.py | 4 +- tests/core/sinks/__init__.py | 0 tests/core/sinks/test_sdc_metadata.py | 55 +++++++++++++++++++++++++++ 5 files changed, 78 insertions(+), 6 deletions(-) create mode 100644 tests/core/sinks/__init__.py create mode 100644 tests/core/sinks/test_sdc_metadata.py diff --git a/singer_sdk/plugin_base.py b/singer_sdk/plugin_base.py index d81e8f7c3..53e2cd2f2 100644 --- a/singer_sdk/plugin_base.py +++ b/singer_sdk/plugin_base.py @@ -6,6 +6,7 @@ import logging import os import sys +import time import typing as t from pathlib import Path, PurePath from types import MappingProxyType @@ -155,6 +156,9 @@ def __init__( metrics._setup_logging(self.config) self.metrics_logger = metrics.get_metrics_logger() + # Initialization timestamp + self.__initialized_at = int(time.time() * 1000) + def setup_mapper(self) -> None: """Initialize the plugin mapper for this tap.""" self._mapper = PluginMapper( @@ -185,6 +189,15 @@ def mapper(self, mapper: PluginMapper) -> None: """ self._mapper = mapper + @property + def initialized_at(self) -> int: + """Start time of the plugin. + + Returns: + The start time of the plugin. + """ + return self.__initialized_at + @classproperty def capabilities(self) -> list[CapabilitiesEnum]: """Get capabilities. diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 9928aa6f2..19dfbc31a 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -32,7 +32,7 @@ if t.TYPE_CHECKING: from logging import Logger - from singer_sdk.plugin_base import PluginBase + from singer_sdk.target_base import Target JSONSchemaValidator = Draft7Validator @@ -48,7 +48,7 @@ class Sink(metaclass=abc.ABCMeta): def __init__( self, - target: PluginBase, + target: Target, stream_name: str, schema: dict, key_properties: list[str] | None, @@ -62,6 +62,7 @@ def __init__( key_properties: Primary key of the stream to sink. """ self.logger = target.logger + self.sync_started_at = target.initialized_at self._config = dict(target.config) self._pending_batch: dict | None = None self.stream_name = stream_name @@ -238,7 +239,7 @@ def _add_sdc_metadata_to_record( Args: record: Individual record in the stream. - message: TODO + message: The record message. context: Stream partition or context dictionary. """ record["_sdc_extracted_at"] = message.get("time_extracted") @@ -252,6 +253,7 @@ def _add_sdc_metadata_to_record( record["_sdc_deleted_at"] = record.get("_sdc_deleted_at") record["_sdc_sequence"] = int(round(time.time() * 1000)) record["_sdc_table_version"] = message.get("version") + record["_sdc_sync_started_at"] = self.sync_started_at def _add_sdc_metadata_to_schema(self) -> None: """Add _sdc metadata columns. @@ -270,7 +272,7 @@ def _add_sdc_metadata_to_schema(self) -> None: "type": ["null", "string"], "format": "date-time", } - for col in ("_sdc_sequence", "_sdc_table_version"): + for col in ("_sdc_sequence", "_sdc_table_version", "_sdc_sync_started_at"): properties_dict[col] = {"type": ["null", "integer"]} def _remove_sdc_metadata_from_schema(self) -> None: @@ -287,6 +289,7 @@ def _remove_sdc_metadata_from_schema(self) -> None: "_sdc_deleted_at", "_sdc_sequence", "_sdc_table_version", + "_sdc_sync_started_at", ): properties_dict.pop(col, None) @@ -305,6 +308,7 @@ def _remove_sdc_metadata_from_record(self, record: dict) -> None: record.pop("_sdc_deleted_at", None) record.pop("_sdc_sequence", None) record.pop("_sdc_table_version", None) + record.pop("_sdc_sync_started_at", None) # Record validation diff --git a/singer_sdk/sinks/sql.py b/singer_sdk/sinks/sql.py index 9dac99b1e..238e83dec 100644 --- a/singer_sdk/sinks/sql.py +++ b/singer_sdk/sinks/sql.py @@ -20,7 +20,7 @@ if t.TYPE_CHECKING: from sqlalchemy.sql import Executable - from singer_sdk.plugin_base import PluginBase + from singer_sdk.target_base import Target class SQLSink(BatchSink): @@ -32,7 +32,7 @@ class SQLSink(BatchSink): def __init__( self, - target: PluginBase, + target: Target, stream_name: str, schema: dict, key_properties: list[str] | None, diff --git a/tests/core/sinks/__init__.py b/tests/core/sinks/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/core/sinks/test_sdc_metadata.py b/tests/core/sinks/test_sdc_metadata.py new file mode 100644 index 000000000..c07ac4f6a --- /dev/null +++ b/tests/core/sinks/test_sdc_metadata.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +from freezegun import freeze_time + +from tests.conftest import BatchSinkMock, TargetMock + + +def test_sdc_metadata(): + with freeze_time("2023-01-01T00:00:00+00:00"): + target = TargetMock() + + sink = BatchSinkMock( + target, + "users", + {"type": "object", "properties": {"id": {"type": "integer"}}}, + ["id"], + ) + + record_message = { + "type": "RECORD", + "stream": "users", + "record": {"id": 1}, + "time_extracted": "2021-01-01T00:00:00+00:00", + "version": 100, + } + record = record_message["record"] + + with freeze_time("2023-01-01T00:05:00+00:00"): + sink._add_sdc_metadata_to_record(record, record_message, {}) + + assert record == { + "id": 1, + "_sdc_extracted_at": "2021-01-01T00:00:00+00:00", + "_sdc_received_at": "2023-01-01T00:05:00+00:00", + "_sdc_batched_at": "2023-01-01T00:05:00+00:00", + "_sdc_deleted_at": None, + "_sdc_sequence": 1672531500000, + "_sdc_table_version": 100, + "_sdc_sync_started_at": 1672531200000, + } + + sink._add_sdc_metadata_to_schema() + assert sink.schema == { + "type": "object", + "properties": { + "id": {"type": "integer"}, + "_sdc_extracted_at": {"type": ["null", "string"], "format": "date-time"}, + "_sdc_received_at": {"type": ["null", "string"], "format": "date-time"}, + "_sdc_batched_at": {"type": ["null", "string"], "format": "date-time"}, + "_sdc_deleted_at": {"type": ["null", "string"], "format": "date-time"}, + "_sdc_sequence": {"type": ["null", "integer"]}, + "_sdc_table_version": {"type": ["null", "integer"]}, + "_sdc_sync_started_at": {"type": ["null", "integer"]}, + }, + }