Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add sdc metadata #9

Merged
merged 2 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ plugins:
kind: string
- name: s3.paths
kind: array
- name: add_record_metadata
kind: boolean
- name: tap-carbon-intensity
variant: meltano
pip_url: git+https://gitlab.com/meltano/tap-carbon-intensity.git
Expand All @@ -66,6 +68,8 @@ plugins:
kind: string
- name: s3.prefix
kind: string
- name: add_record_metadata
kind: boolean
jobs:
- name: job-simple-test
tasks:
Expand Down
90 changes: 63 additions & 27 deletions target_singer_jsonl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import logging
import sys
import time
from datetime import datetime
from functools import reduce
from pathlib import Path
Expand All @@ -23,11 +24,13 @@
"destination": "local",
"s3": {"bucket": "my-s3-bucket", "prefix": "put/files/in/here/"},
"local": {"folder": ".secrets/output/"},
"add_record_metadata": False,
}

stream_files = {}
stream_lines = {}
now = datetime.now().strftime("%Y%m%dT%H%M%S%z")
file_timestamp = datetime.now().strftime("%Y%m%dT%H%M%S%z")
target_start_timestamp = datetime.now().isoformat()


def join_slash(a, b):
Expand All @@ -47,7 +50,7 @@ def emit_state(state):


def get_file_path(stream, destination, config):
filename = f"{stream}/{stream}-{now}.singer.gz"
filename = f"{stream}/{stream}-{file_timestamp}.singer.gz"
if destination == "local":
return Path(config["folder"]).joinpath(filename)
elif destination == "s3":
Expand All @@ -68,7 +71,7 @@ def write_lines_local(destination, config, stream, lines):
with stream_files[stream].open("w", encoding="utf-8") as outfile:
logging.info(f"Writing to file: {stream_files[stream]}")
for line in lines:
outfile.write(line)
outfile.write(line + "\n")


def write_lines_s3(destination, config, stream, lines):
Expand All @@ -79,7 +82,7 @@ def write_lines_s3(destination, config, stream, lines):
with open(stream_files[stream], "w", encoding="utf-8") as outfile:
logging.info(f"Writing to file: {stream_files[stream]}")
for line in lines:
outfile.write(line)
outfile.write(line + "\n")


def write_lines(config, stream, lines):
Expand All @@ -104,60 +107,93 @@ def persist_lines(config, lines):
state = None
schemas = {}
key_properties = {}
headers = {}
validators = {}
add_record_metadata = config.get("add_record_metadata", True)

# Loop over lines from stdin
for line in lines:
try:
o = json.loads(line)
message = json.loads(line)
except json.decoder.JSONDecodeError:
logger.error(f"Unable to parse:\n{line}")
raise

if "type" not in o:
if "type" not in message:
raise Exception(f"Line is missing required key 'type': {line}")
t = o["type"]
t = message["type"]

if t != "STATE":
if "stream" not in o:
if "stream" not in message:
raise Exception(f"Line is missing required key 'stream': {line}")

stream = o["stream"]
stream = message["stream"]

if stream not in stream_lines:
stream_lines[stream] = []

# persisting STATE messages is problematic when splitting records into separate
# files, therefore we omit them and allow tap-singer-jsonl to create new
# state messages from observed records
stream_lines[stream].append(line)

if t == "RECORD":

if stream not in schemas:
raise Exception(
f"A record for stream {stream} was encountered before a corresponding schema"
)

record = message["record"]
# Get schema for this record's stream
schema = schemas[stream]

# Validate record
validators[stream].validate(o["record"])

validators[stream].validate(record)
# Process record
if add_record_metadata:
now = datetime.now().isoformat()
record.update(
{
"_sdc_extracted_at": message.get(
"time_extracted", target_start_timestamp
),
"_sdc_received_at": now,
"_sdc_batched_at": now,
"_sdc_deleted_at": record.get("_sdc_deleted_at"),
"_sdc_sequence": int(round(time.time() * 1000)),
"_sdc_table_version": message.get("version"),
}
)
# Queue message for write
state = None
elif t == "STATE":
logger.debug(f'Setting state to {o["value"]}')
state = o["value"]
stream_lines[stream].append(json.dumps(message))

elif t == "SCHEMA":
schemas[stream] = o["schema"]
validators[stream] = Draft4Validator(o["schema"])
if "key_properties" not in o:
schemas[stream] = message["schema"]
validators[stream] = Draft4Validator(message["schema"])
if "key_properties" not in message:
raise Exception("key_properties field is required")
key_properties[stream] = o["key_properties"]
key_properties[stream] = message["key_properties"]
# Add metadata properties
if add_record_metadata:
properties_dict = schemas[stream]["properties"]
for col in {
"_sdc_extracted_at",
"_sdc_received_at",
"_sdc_batched_at",
"_sdc_deleted_at",
}:
properties_dict[col] = {
"type": ["null", "string"],
"format": "date-time",
}
for col in {"_sdc_sequence", "_sdc_table_version"}:
properties_dict[col] = {"type": ["null", "integer"]}
# Queue message for write
stream_lines[stream].append(json.dumps(message))

elif t == "STATE":
# persisting STATE messages is problematic when splitting records into separate
# files, therefore we omit them and allow tap-singer-jsonl to create new
# state messages from observed records on read
logger.debug(f'Setting state to {message["value"]}')
state = message["value"]

else:
raise Exception(f"Unknown message type {t} in message {o}")
raise Exception(f"Unknown message type {t} in message {message}")

for stream, messages in stream_lines.items():
write_lines(config, stream, messages)
Expand Down