Skip to content

Commit

Permalink
Implement primary keys
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Oct 2, 2024
1 parent 6f2e30c commit e527cee
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 6 deletions.
16 changes: 14 additions & 2 deletions samples/sample_tap_csv/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@
import typing as t

from singer_sdk.contrib.filesystem import FileStream
from singer_sdk.contrib.filesystem.stream import SDC_META_FILEPATH

if t.TYPE_CHECKING:
from singer_sdk.helpers.types import Record


SDC_META_LINE_NUMBER = "_sdc_line_number"


class CSVStream(FileStream):
"""CSV stream class."""

@property
def primary_keys(self) -> t.Sequence[str]:
return (SDC_META_FILEPATH, SDC_META_LINE_NUMBER)

def get_schema(self, path: str) -> dict[str, t.Any]:
with self.filesystem.open(path, mode="r") as file:
reader = csv.DictReader(
Expand All @@ -22,10 +30,12 @@ def get_schema(self, path: str) -> dict[str, t.Any]:
doublequote=self.config["doublequote"],
lineterminator=self.config["lineterminator"],
)
return {
schema = {
"type": "object",
"properties": {key: {"type": "string"} for key in reader.fieldnames},
}
schema["properties"][SDC_META_LINE_NUMBER] = {"type": "integer"}
return schema

def read_file(self, path: str) -> t.Iterable[Record]:
with self.filesystem.open(path, mode="r") as file:
Expand All @@ -37,4 +47,6 @@ def read_file(self, path: str) -> t.Iterable[Record]:
doublequote=self.config["doublequote"],
lineterminator=self.config["lineterminator"],
)
yield from reader
for record in reader:
record[SDC_META_LINE_NUMBER] = reader.line_num
yield record
8 changes: 4 additions & 4 deletions singer_sdk/contrib/filesystem/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ def get_records(
record[SDC_META_FILEPATH] = path
yield record

@abc.abstractmethod
def read_file(self, path: str) -> t.Iterable[Record]:
"""Return a generator of records from the file."""

@abc.abstractmethod
def get_schema(self, path: str) -> dict[str, t.Any]:
"""Return the schema for the file."""

@abc.abstractmethod
def read_file(self, path: str) -> t.Iterable[Record]:
"""Return a generator of records from the file."""
8 changes: 8 additions & 0 deletions singer_sdk/contrib/filesystem/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,16 @@ class FolderTap(Tap, t.Generic[_T]):
"""Singer tap for files in a directory."""

valid_extensions: tuple[str, ...]
"""Valid file extensions for this tap.
Files with extensions not in this list will be ignored.
"""

default_stream_class: type[_T]
"""The default stream class to use for this tap.
This should be a subclass of `FileStream`.
"""

config_jsonschema: t.ClassVar[dict] = {"properties": {}}

Expand Down

0 comments on commit e527cee

Please sign in to comment.