Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(hubspot): provide default lists of properties #311

Merged
merged 7 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 150 additions & 70 deletions sources/hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,54 +38,181 @@
fetch_property_history,
)
from .settings import (
STARTDATE,
WEB_ANALYTICS_EVENTS_ENDPOINT,
OBJECT_TYPE_SINGULAR,
ALL,
CRM_OBJECT_ENDPOINTS,
DEFAULT_COMPANY_PROPS,
DEFAULT_CONTACT_PROPS,
DEFAULT_DEAL_PROPS,
DEFAULT_PRODUCT_PROPS,
DEFAULT_TICKET_PROPS,
DEFAULT_QUOTE_PROPS,
OBJECT_TYPE_PLURAL,
STARTDATE,
WEB_ANALYTICS_EVENTS_ENDPOINT,
)

THubspotObjectType = Literal["company", "contact", "deal", "ticket", "product", "quote"]


@dlt.source(name="hubspot")
def hubspot(
api_key: str = dlt.secrets.value, include_history: bool = False
api_key: str = dlt.secrets.value,
include_history: bool = False,
) -> Sequence[DltResource]:
"""
A DLT source that retrieves data from the HubSpot API using the specified API key.
A DLT source that retrieves data from the HubSpot API using the
specified API key.

This function retrieves data for several HubSpot API endpoints, including companies, contacts, deals,
tickets, products and web analytics events. It returns a tuple of Dlt resources, one for each endpoint.
This function retrieves data for several HubSpot API endpoints,
including companies, contacts, deals, tickets, products and web
analytics events. It returns a tuple of Dlt resources, one for
each endpoint.

Args:
api_key (str, optional): The API key used to authenticate with the HubSpot API. Defaults to dlt.secrets.value.
include_history: Whether to load history of property changes along with entities. The history entries are loaded to separate tables.
api_key (Optional[str]):
The API key used to authenticate with the HubSpot API. Defaults
to dlt.secrets.value.
include_history (Optional[bool]):
Whether to load history of property changes along with entities.
The history entries are loaded to separate tables.

Returns:
tuple: A tuple of Dlt resources, one for each HubSpot API endpoint.
Sequence[DltResource]: Dlt resources, one for each HubSpot API endpoint.

Notes:
This function uses the `fetch_data` function to retrieve data from the HubSpot CRM API. The API key
is passed to `fetch_data` as the `api_key` argument.
This function uses the `fetch_data` function to retrieve data from the
HubSpot CRM API. The API key is passed to `fetch_data` as the
`api_key` argument.
"""
return [
companies(include_history=include_history),
contacts(include_history=include_history),
deals(include_history=include_history),
tickets(include_history=include_history),
products(include_history=include_history),
quotes(include_history=include_history),
]

@dlt.resource(name="companies", write_disposition="replace")
def companies(
api_key: str = api_key,
include_history: bool = include_history,
props: Sequence[str] = DEFAULT_COMPANY_PROPS,
include_custom_props: bool = True,
) -> Iterator[TDataItems]:
"""Hubspot companies resource"""
yield from crm_objects(
"company",
api_key,
include_history=include_history,
props=props,
include_custom_props=include_custom_props,
)

@dlt.resource(name="contacts", write_disposition="replace")
def contacts(
api_key: str = api_key,
include_history: bool = include_history,
props: Sequence[str] = DEFAULT_CONTACT_PROPS,
include_custom_props: bool = True,
) -> Iterator[TDataItems]:
"""Hubspot contacts resource"""
yield from crm_objects(
"contact",
api_key,
include_history,
props,
include_custom_props,
)

@dlt.resource(name="deals", write_disposition="replace")
def deals(
api_key: str = api_key,
include_history: bool = include_history,
props: Sequence[str] = DEFAULT_DEAL_PROPS,
include_custom_props: bool = True,
) -> Iterator[TDataItems]:
"""Hubspot deals resource"""
yield from crm_objects(
"deal",
api_key,
include_history,
props,
include_custom_props,
)

@dlt.resource(name="tickets", write_disposition="replace")
def tickets(
api_key: str = api_key,
include_history: bool = include_history,
props: Sequence[str] = DEFAULT_TICKET_PROPS,
include_custom_props: bool = True,
) -> Iterator[TDataItems]:
"""Hubspot tickets resource"""
yield from crm_objects(
"ticket",
api_key,
include_history,
props,
include_custom_props,
)

@dlt.resource(name="products", write_disposition="replace")
def products(
api_key: str = api_key,
include_history: bool = include_history,
props: Sequence[str] = DEFAULT_PRODUCT_PROPS,
include_custom_props: bool = True,
) -> Iterator[TDataItems]:
"""Hubspot products resource"""
yield from crm_objects(
"product",
api_key,
include_history,
props,
include_custom_props,
)

@dlt.resource(name="quotes", write_disposition="replace")
def quotes(
api_key: str = api_key,
include_history: bool = include_history,
props: Sequence[str] = DEFAULT_QUOTE_PROPS,
include_custom_props: bool = True,
) -> Iterator[TDataItems]:
"""Hubspot quotes resource"""
yield from crm_objects(
"quote",
api_key,
include_history,
props,
include_custom_props,
)

return companies, contacts, deals, tickets, products, quotes


def crm_objects(
object_type: str,
api_key: str = dlt.secrets.value,
include_history: bool = False,
props: Sequence[str] = None,
include_custom_props: bool = True,
) -> Iterator[TDataItems]:
"""Building blocks for CRM resources."""
props = ",".join(_get_property_names(api_key, object_type))
if props == ALL:
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
props = list(_get_property_names(api_key, object_type))

if include_custom_props:
all_props = _get_property_names(api_key, object_type)
custom_props = [prop for prop in all_props if not prop.startswith("hs_")]
props = props + custom_props # type: ignore

props = ",".join(sorted(list(set(props))))

if len(props) > 2000:
raise ValueError(
(
"Your request to Hubspot is too long to process. "
"Maximum allowed query length is 2000 symbols, while "
f"your list of properties `{props[:200]}`... is {len(props)} "
"symbols long. Use the `props` argument of the resource to "
"set the list of properties to extract from the endpoint."
)
)

params = {"properties": props, "limit": 100}

yield from fetch_data(CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params)
Expand All @@ -98,58 +225,11 @@ def crm_objects(
props,
):
yield dlt.mark.with_table_name(
history_entries, OBJECT_TYPE_PLURAL[object_type] + "_property_history"
history_entries,
OBJECT_TYPE_PLURAL[object_type] + "_property_history",
)


@dlt.resource(name="companies", write_disposition="replace")
def companies(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot companies resource"""
yield from crm_objects("company", api_key, include_history=False)


@dlt.resource(name="contacts", write_disposition="replace")
def contacts(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot contacts resource"""
yield from crm_objects("contact", api_key, include_history)


@dlt.resource(name="deals", write_disposition="replace")
def deals(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot deals resource"""
yield from crm_objects("deal", api_key, include_history)


@dlt.resource(name="tickets", write_disposition="replace")
def tickets(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot tickets resource"""
yield from crm_objects("ticket", api_key, include_history)


@dlt.resource(name="products", write_disposition="replace")
def products(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot products resource"""
yield from crm_objects("product", api_key, include_history)


@dlt.resource(name="quotes", write_disposition="replace")
def quotes(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot quotes resource"""
yield from crm_objects("quote", api_key, include_history)


@dlt.resource
def hubspot_events_for_objects(
object_type: THubspotObjectType,
Expand Down
5 changes: 4 additions & 1 deletion sources/hubspot/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ def extract_property_history(objects: List[Dict[str, Any]]) -> Iterator[Dict[str


def fetch_property_history(
endpoint: str, api_key: str, props: str, params: Optional[Dict[str, Any]] = None
endpoint: str,
api_key: str,
props: str,
params: Optional[Dict[str, Any]] = None,
) -> Iterator[List[Dict[str, Any]]]:
"""Fetch property history from the given CRM endpoint.

Expand Down
61 changes: 61 additions & 0 deletions sources/hubspot/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,64 @@
}

OBJECT_TYPE_PLURAL = {v: k for k, v in OBJECT_TYPE_SINGULAR.items()}

DEFAULT_DEAL_PROPS = [
"amount",
"closedate",
"createdate",
"dealname",
"dealstage",
"hs_lastmodifieddate",
"hs_object_id",
"pipeline",
]

DEFAULT_COMPANY_PROPS = [
"createdate",
"domain",
"hs_lastmodifieddate",
"hs_object_id",
"name",
]

DEFAULT_CONTACT_PROPS = [
"createdate",
"email",
"firstname",
"hs_object_id",
"lastmodifieddate",
"lastname",
]

DEFAULT_TICKET_PROPS = [
"createdate",
"content",
"hs_lastmodifieddate",
"hs_object_id",
"hs_pipeline",
"hs_pipeline_stage",
"hs_ticket_category",
"hs_ticket_priority",
"subject",
]

DEFAULT_PRODUCT_PROPS = [
"createdate",
"description",
"hs_lastmodifieddate",
"hs_object_id",
"name",
"price",
]

DEFAULT_QUOTE_PROPS = [
"hs_createdate",
"hs_expiration_date",
"hs_lastmodifieddate",
"hs_object_id",
"hs_public_url_key",
"hs_status",
"hs_title",
]

ALL = ("ALL",)
34 changes: 34 additions & 0 deletions sources/hubspot_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,39 @@ def load_crm_data_with_history() -> None:
print(info)


def load_crm_objects_with_custom_properties() -> None:
"""
Loads CRM objects, reading only properties defined by the user.
"""

# Create a DLT pipeline object with the pipeline name,
# dataset name, properties to read and destination database
# type Add full_refresh=(True or False) if you need your
# pipeline to create the dataset in your destination
p = dlt.pipeline(
pipeline_name="hubspot_pipeline",
dataset_name="hubspot",
destination="postgres",
)

source = hubspot()

# By default, all the custom properties of a CRM object are extracted,
# ignoring those driven by Hubspot (prefixed with `hs_`).

# To read fields in addition to the custom ones:
# source.contacts.bind(props=["date_of_birth", "degree"])

# To read only two particular fields:
source.contacts.bind(props=["date_of_birth", "degree"], include_custom_props=False)

# Run the pipeline with the HubSpot source connector
info = p.run(source)

# Print information about the pipeline run
print(info)


def load_web_analytics_events(
object_type: THubspotObjectType, object_ids: List[str]
) -> None:
Expand Down Expand Up @@ -86,3 +119,4 @@ def load_web_analytics_events(
load_crm_data()
# load_crm_data_with_history()
# load_web_analytics_events("company", ["7086461639", "7086464459"])
# load_crm_objects_with_custom_properties()
Loading
Loading