Skip to content

Commit

Permalink
Implement schema discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Sep 26, 2024
1 parent e5a4a4f commit 95d8b82
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 47 deletions.
15 changes: 15 additions & 0 deletions samples/sample_tap_csv/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,21 @@
class CSVStream(FileStream):
"""CSV stream class."""

def get_schema(self, path: str) -> dict[str, t.Any]:
with self.filesystem.open(path, mode="r") as file:
reader = csv.DictReader(
file,
delimiter=self.config["delimiter"],
quotechar=self.config["quotechar"],
escapechar=self.config.get("escapechar"),
doublequote=self.config["doublequote"],
lineterminator=self.config["lineterminator"],
)
return {
"type": "object",
"properties": {key: {"type": "string"} for key in reader.fieldnames},
}

def read_file(self, path: str) -> t.Iterable[Record]:
with self.filesystem.open(path, mode="r") as file:
reader = csv.DictReader(
Expand Down
73 changes: 55 additions & 18 deletions singer_sdk/contrib/filesystem/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
from __future__ import annotations

import abc
import enum
import functools
import typing as t

from singer_sdk import Stream
from singer_sdk.exceptions import ConfigValidationError
from singer_sdk.helpers._util import utc_now
from singer_sdk.streams.core import REPLICATION_INCREMENTAL

Expand All @@ -17,57 +20,82 @@
from singer_sdk.helpers.types import Context, Record
from singer_sdk.tap_base import Tap


SDC_META_FILEPATH = "_sdc_path"
SDC_META_MODIFIED_AT = "_sdc_modified_at"


class ReadMode(str, enum.Enum):
"""Sync mode for the tap."""

one_stream_per_file = "one_stream_per_file"
merge = "merge"


class FileStream(Stream, metaclass=abc.ABCMeta):
"""Abstract base class for file streams."""

BASE_SCHEMA: t.ClassVar[dict[str, t.Any]] = {
"type": ["object"],
"properties": {
SDC_META_FILEPATH: {"type": "string"},
SDC_META_MODIFIED_AT: {"type": ["string", "null"], "format": "date-time"},
},
"required": [],
"additionalProperties": {"type": "string"},
SDC_PROPERTIES: t.ClassVar[dict[str, dict]] = {
SDC_META_FILEPATH: {"type": "string"},
SDC_META_MODIFIED_AT: {"type": ["string", "null"], "format": "date-time"},
}

def __init__(
self,
tap: Tap,
name: str,
*,
filepaths: t.Sequence[str],
filesystem: fsspec.AbstractFileSystem,
partitions: list[dict[str, t.Any]] | None = None,
) -> None:
"""Create a new FileStream instance.
Args:
tap: The tap for this stream.
name: The name of the stream.
filepaths: List of file paths to read.
filesystem: The filesystem implementation object to use.
partitions: List of partitions for this stream.
mode: The read mode for the stream.
Raises:
ConfigValidationError: If no file paths are provided.
"""
# TODO(edgarmondragon): Build schema from file.
super().__init__(tap, self.BASE_SCHEMA, name)
if not filepaths:
msg = "Configuration error"
raise ConfigValidationError(msg, errors=["No file paths provided"])

Check warning on line 64 in singer_sdk/contrib/filesystem/stream.py

View check run for this annotation

Codecov / codecov/patch

singer_sdk/contrib/filesystem/stream.py#L63-L64

Added lines #L63 - L64 were not covered by tests

self._filepaths = filepaths
self.filesystem = filesystem

super().__init__(tap, schema=None, name=name)

# TODO(edgarrmondragon): Make this None if the filesytem does not support it.
self.replication_key = SDC_META_MODIFIED_AT
self.filesystem = filesystem
self._sync_start_time = utc_now()
self._partitions = partitions or []
self._partitions = [{SDC_META_FILEPATH: path} for path in self._filepaths]

@property
def partitions(self) -> list[dict[str, t.Any]]:
"""Return the list of partitions for this stream."""
return self._partitions

@abc.abstractmethod
def read_file(self, path: str) -> t.Iterable[Record]:
"""Return a generator of records from the file."""
def _get_full_schema(self) -> dict[str, t.Any]:
"""Return the full schema for the stream.
Args:
context: Stream partition or context dictionary.
Returns:
The full schema for the stream.
"""
path: str = self._filepaths[0]
schema = self.get_schema(path)
schema["properties"].update(self.SDC_PROPERTIES)
return schema

@functools.cached_property
def schema(self) -> dict[str, t.Any]:
"""Return the schema for the stream."""
return self._get_full_schema()

def get_records(
self,
Expand Down Expand Up @@ -109,4 +137,13 @@ def get_records(

for record in self.read_file(path):
record[SDC_META_MODIFIED_AT] = mtime or self._sync_start_time
record[SDC_META_FILEPATH] = path
yield record

@abc.abstractmethod
def read_file(self, path: str) -> t.Iterable[Record]:
"""Return a generator of records from the file."""

@abc.abstractmethod
def get_schema(self, path: str) -> dict[str, t.Any]:
"""Return the schema for the file."""
26 changes: 7 additions & 19 deletions singer_sdk/contrib/filesystem/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import annotations

import enum
import functools
import os
import typing as t
Expand All @@ -12,18 +11,10 @@

import singer_sdk.typing as th
from singer_sdk import Tap
from singer_sdk.contrib.filesystem.stream import SDC_META_FILEPATH, FileStream
from singer_sdk.contrib.filesystem.stream import FileStream, ReadMode

DEFAULT_MERGE_STREAM_NAME = "files"


class ReadMode(str, enum.Enum):
"""Sync mode for the tap."""

one_stream_per_file = "one_stream_per_file"
merge = "merge"


BASE_CONFIG_SCHEMA = th.PropertiesList(
th.Property(
"filesystem",
Expand Down Expand Up @@ -144,26 +135,23 @@ def discover_streams(self) -> list:
self.default_stream_class(
tap=self,
name=file_path_to_stream_name(member),
filepaths=[os.path.join(path, member)], # noqa: PTH118
filesystem=self.fs,
partitions=[{SDC_META_FILEPATH: os.path.join(path, member)}], # noqa: PTH118
)
for member in os.listdir(path)
if member.endswith(self.valid_extensions)
]

# Merge
contexts = [
{
SDC_META_FILEPATH: os.path.join(path, member), # noqa: PTH118
}
for member in os.listdir(path)
if member.endswith(self.valid_extensions)
]
return [
self.default_stream_class(
tap=self,
name=self.config["stream_name"],
filepaths=[
os.path.join(path, member) # noqa: PTH118
for member in os.listdir(path)
if member.endswith(self.valid_extensions)
],
filesystem=self.fs,
partitions=contexts,
)
]
12 changes: 2 additions & 10 deletions tests/samples/test_tap_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@


class TestCSVMerge(_TestCSVMerge):
@pytest.mark.xfail(reason="Schema generation not implemented", strict=True)
def test_tap_stream_record_schema_matches_transformed_catalog(self, stream: str):
super().test_tap_stream_record_schema_matches_transformed_catalog(stream)
pass


_TestCSVOneStreamPerFile = get_tap_test_class(
Expand All @@ -35,9 +33,7 @@ def test_tap_stream_record_schema_matches_transformed_catalog(self, stream: str)


class TestCSVOneStreamPerFile(_TestCSVOneStreamPerFile):
@pytest.mark.xfail(reason="Schema generation not implemented", strict=True)
def test_tap_stream_record_schema_matches_transformed_catalog(self, stream: str):
super().test_tap_stream_record_schema_matches_transformed_catalog(stream)
pass


# Three days into the future.
Expand Down Expand Up @@ -80,10 +76,6 @@ def test_tap_stream_record_schema_matches_transformed_catalog(self, stream: str)


class TestCSVOneStreamPerFileIncremental(_TestCSVOneStreamPerFileIncremental):
@pytest.mark.xfail(reason="Schema generation not implemented", strict=True)
def test_tap_stream_record_schema_matches_transformed_catalog(self, stream: str):
super().test_tap_stream_record_schema_matches_transformed_catalog(stream)

@pytest.mark.xfail(reason="No records are extracted", strict=True)
def test_tap_stream_transformed_catalog_schema_matches_record(self, stream: str):
super().test_tap_stream_transformed_catalog_schema_matches_record(stream)
Expand Down

0 comments on commit 95d8b82

Please sign in to comment.