Skip to content

Commit

Permalink
feat: implement schema registry client typings (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
benbenbang authored Aug 26, 2023
1 parent 08a45cb commit 63fc8a2
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 6 deletions.
10 changes: 5 additions & 5 deletions confluent_kafka-stubs/schema_registry/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ types-confluent-kafka: A package providing type hints for the confluent-kafka Py
This package is licensed under the Apache 2.0 License.
"""

# from .schema_registry_client import RegisteredSchema as RegisteredSchema
# from .schema_registry_client import Schema as Schema
# from .schema_registry_client import SchemaReference as SchemaReference
# from .schema_registry_client import SchemaRegistryClient as SchemaRegistryClient
# from .schema_registry_client import SchemaRegistryError as SchemaRegistryError
from .schema_registry_client import RegisteredSchema as RegisteredSchema
from .schema_registry_client import Schema as Schema
from .schema_registry_client import SchemaReference as SchemaReference
from .schema_registry_client import SchemaRegistryClient as SchemaRegistryClient
from .schema_registry_client import SchemaRegistryError as SchemaRegistryError

def topic_subject_name_strategy(ctx, record_name): ...
def topic_record_subject_name_strategy(ctx, record_name): ...
Expand Down
88 changes: 87 additions & 1 deletion confluent_kafka-stubs/schema_registry/schema_registry_client.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,91 @@
types-confluent-kafka: A package providing type hints for the confluent-kafka Python package.
This package is licensed under the Apache 2.0 License.
"""
from __future__ import annotations

class SchemaRegistryClient: ...
# standard library
from logging import Logger
from threading import Lock
from typing import ClassVar, DefaultDict, Literal

# pypi/conda library
from requests import Session

from .error import SchemaRegistryError as SchemaRegistryError

log: Logger
VALID_AUTH_PROVIDERS: Literal["URL", "USER_INFO"]

class _RestClient:
session: Session
base_url: ClassVar[str]

def __init__(self, conf: dict) -> None: ...
def get(self, url: str, query: dict | None = ...): ...
def post(self, url: str, body, **kwargs): ...
def delete(self, url: str): ...
def put(self, url: str, body: str | None = None): ...
def send_request(
self,
url: str,
method: Literal["GET", "POST", "DELETE", "PUT"],
body: str | None = None,
query: dict | None = None,
): ...

class _SchemaCache:
lock: ClassVar[Lock]
schema_id_index: ClassVar[dict]
schema_index: ClassVar[dict]
subject_schemas: DefaultDict[str, set]

def __init__(self) -> None: ...
def set(self, schema_id: int, schema: Schema, subject_name: str | None = None) -> None: ...
def get_schema(self, schema_id: int) -> Schema | None: ...
def get_schema_id_by_subject(self, subject, schema: Schema) -> Schema | None: ...

class SchemaRegistryClient:
def __init__(self, conf: dict) -> None: ...
def __enter__(self) -> SchemaRegistryClient: ...
def __exit__(self, *args) -> None: ...
def register_schema(self, subject_name: str, schema: Schema, normalize_schemas: bool = False) -> int: ...
def get_schema(self, schema_id) -> Schema: ...
def lookup_schema(self, subject_name: str, schema: Schema, normalize_schemas: bool = False) -> RegisteredSchema: ...
def get_subjects(self) -> list[str]: ...
def delete_subject(self, subject_name: str, permanent: bool = ...) -> list[int]: ...
def get_latest_version(self, subject_name: str) -> RegisteredSchema: ...
def get_version(self, subject_name: str, version: int) -> RegisteredSchema: ...
def get_versions(self, subject_name: str) -> list[int]: ...
def delete_version(self, subject_name: str, version: int) -> int: ...
def set_compatibility(self, subject_name: str | None = None, level: str | None = None) -> str: ...
def get_compatibility(self, subject_name: str | None = None) -> str: ...
def test_compatibility(
self,
subject_name: str,
schema: Schema,
version: int | Literal["latest"] = "latest",
) -> bool: ...

class Schema:
schema_str: ClassVar[str]
schema_type: ClassVar[str]
references: ClassVar[list[SchemaReference]]

def __init__(self, schema_str: str, schema_type: str, references: list[SchemaReference] | None = None) -> None: ...
def __eq__(self, other) -> bool: ...
def __hash__(self) -> int: ...

class RegisteredSchema:
schema_id: ClassVar[int]
schema: ClassVar[Schema]
subject: ClassVar[str]
version: ClassVar[int]

def __init__(self, schema_id: int, schema: Schema, subject: str, version: int) -> None: ...

class SchemaReference:
name: ClassVar[str]
subject: ClassVar[str]
version: ClassVar[str]

def __init__(self, name: str, subject: str, version: str) -> None: ...

0 comments on commit 63fc8a2

Please sign in to comment.