Skip to content

Commit

Permalink
Extract properties_with_history
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Jul 2, 2023
1 parent 2c5c59b commit 865b6e9
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 55 deletions.
101 changes: 61 additions & 40 deletions sources/hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,29 @@

from typing import Any, Dict, List, Literal, Sequence, Iterator
from urllib.parse import quote
from itertools import chain

import dlt
from dlt.common import pendulum
from dlt.common.typing import TDataItems
from dlt.common.typing import TDataItems, TDataItem
from dlt.extract.source import DltResource
from .helpers import fetch_data, _get_property_names


from .helpers import fetch_data, _get_property_names, fetch_data_with_history
from .settings import (
STARTDATE,
CRM_CONTACTS_ENDPOINT,
CRM_COMPANIES_ENDPOINT,
CRM_DEALS_ENDPOINT,
CRM_TICKETS_ENDPOINT,
CRM_PRODUCTS_ENDPOINT,
WEB_ANALYTICS_EVENTS_ENDPOINT,
CRM_QUOTES_ENDPOINT,
OBJECT_TYPE_SINGULAR,
CRM_OBJECT_ENDPOINTS,
OBJECT_TYPE_PLURAL,
)

THubspotObjectType = Literal["company", "contact", "deal", "ticket", "product", "quote"]


@dlt.source(name="hubspot")
def hubspot() -> Sequence[DltResource]:
def hubspot(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Sequence[DltResource]:
"""
A DLT source that retrieves data from the HubSpot API using the specified API key.
Expand All @@ -58,6 +57,7 @@ def hubspot() -> Sequence[DltResource]:
Args:
api_key (str, optional): The API key used to authenticate with the HubSpot API. Defaults to dlt.secrets.value.
include_history: Whether to load history of property changes along with entities. The history entries are loaded to separate tables.
Returns:
tuple: A tuple of Dlt resources, one for each HubSpot API endpoint.
Expand All @@ -67,61 +67,82 @@ def hubspot() -> Sequence[DltResource]:
is passed to `fetch_data` as the `api_key` argument.
"""
return [
companies(),
contacts(),
deals(),
tickets(),
products(),
quotes(),
companies(include_history=include_history),
contacts(include_history=include_history),
deals(include_history=include_history),
tickets(include_history=include_history),
products(include_history=include_history),
quotes(include_history=include_history),
]


def crm_objects(
object_type: str,
api_key: str = dlt.secrets.value,
include_history: bool = False,
) -> Iterator[TDataItems]:
"""Building blocks for CRM resources."""
props = ",".join(_get_property_names(api_key, object_type))
params = {"properties": props, "limit": 100}
if include_history:
params["propertiesWithHistory"] = props
# API allows max 50 items per call with property history
params["limit"] = 50
for objects, history_entries in fetch_data_with_history(
CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params
):
yield objects
yield dlt.mark.with_table_name(
history_entries, OBJECT_TYPE_PLURAL[object_type] + "_property_history"
)


@dlt.resource(name="companies", write_disposition="replace")
def companies(api_key: str = dlt.secrets.value) -> Iterator[TDataItems]:
def companies(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot companies resource"""
props = _get_property_names(api_key=api_key, entity="companies")
params = {"properties": ",".join(props)}
yield from fetch_data(CRM_COMPANIES_ENDPOINT, api_key=api_key, params=params)
yield from crm_objects("company", api_key, include_history=False)


@dlt.resource(name="contacts", write_disposition="replace")
def contacts(api_key: str = dlt.secrets.value) -> Iterator[TDataItems]:
def contacts(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot contacts resource"""
props = _get_property_names(api_key=api_key, entity="contacts")
params = {"properties": ",".join(props)}
yield from fetch_data(CRM_CONTACTS_ENDPOINT, api_key=api_key, params=params)
yield from crm_objects("contact", api_key, include_history)


@dlt.resource(name="deals", write_disposition="replace")
def deals(api_key: str = dlt.secrets.value) -> Iterator[TDataItems]:
def deals(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot deals resource"""
props = _get_property_names(api_key=api_key, entity="deals")
params = {"properties": ",".join(props)}
yield from fetch_data(CRM_DEALS_ENDPOINT, api_key=api_key, params=params)
yield from crm_objects("deal", api_key, include_history)


@dlt.resource(name="tickets", write_disposition="replace")
def tickets(api_key: str = dlt.secrets.value) -> Iterator[TDataItems]:
def tickets(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot tickets resource"""
props = _get_property_names(api_key=api_key, entity="tickets")
params = {"properties": ",".join(props)}
yield from fetch_data(CRM_TICKETS_ENDPOINT, api_key=api_key, params=params)
yield from crm_objects("ticket", api_key, include_history)


@dlt.resource(name="products", write_disposition="replace")
def products(api_key: str = dlt.secrets.value) -> Iterator[TDataItems]:
def products(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot products resource"""
props = _get_property_names(api_key=api_key, entity="products")
params = {"properties": ",".join(props)}
yield from fetch_data(CRM_PRODUCTS_ENDPOINT, api_key=api_key, params=params)
yield from crm_objects("product", api_key, include_history)


@dlt.resource(name="quotes", write_disposition="replace")
def quotes(api_key: str = dlt.secrets.value) -> Iterator[TDataItems]:
def quotes(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot quotes resource"""
props = _get_property_names(api_key=api_key, entity="quotes")
params = {"properties": ",".join(props)}
yield from fetch_data(CRM_QUOTES_ENDPOINT, api_key=api_key, params=params)
yield from crm_objects("quote", api_key, include_history)


@dlt.resource
Expand Down
55 changes: 46 additions & 9 deletions sources/hubspot/helpers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Hubspot source helpers"""

import urllib.parse
from typing import Iterator, Dict, Any, List, Optional
from typing import Iterator, Dict, Any, List, Optional, Iterable, Tuple

from dlt.sources.helpers import requests
from .settings import OBJECT_TYPE_PLURAL

BASE_URL = "https://api.hubapi.com/"

Expand All @@ -29,20 +30,45 @@ def _get_headers(api_key: str) -> Dict[str, str]:
return dict(authorization=f"Bearer {api_key}")


def fetch_data(
def extract_property_history(objects: List[Dict[str, Any]]) -> Iterator[Dict[str, Any]]:
for item in objects:
history = item.get("propertiesWithHistory")
if not history:
return
# Yield a flat list of property history entries
for key, changes in history.items():
if not changes:
continue
for entry in changes:
yield {"object_id": item["id"], "property_name": key, **entry}


def fetch_object(
endpoint: str, api_key: str, params: Optional[Dict[str, Any]] = None
) -> Iterator[List[Dict[str, Any]]]:
) -> Dict[str, Any]:
"""Fetch a single data object from the API. From e.g. `.../contacts/{id}` endpoint"""
url = get_url(endpoint)
headers = _get_headers(api_key)
r = requests.get(url, headers=headers, params=params)
return r.json() # type: ignore


def fetch_data_with_history(
endpoint: str, api_key: str, params: Optional[Dict[str, Any]] = None
) -> Iterator[Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]]:
"""
Fetch data from HUBSPOT endpoint using a specified API key and yield the properties of each result.
For paginated endpoint this function yields item from all pages.
For objects that support it and When params includes `propertiesWithHistory`
a flattened list of history entries is included in the return tuple.
Args:
endpoint (str): The endpoint to fetch data from, as a string.
api_key (str): The API key to use for authentication, as a string.
params: Optional dict of query params to include in the request
Yields:
List[dict]: The properties of each result in the API response.
A tuple consisting of 1. List of CRM object dicts and 2. List of property history entries
Raises:
requests.exceptions.HTTPError: If the API returns an HTTP error status code.
Expand All @@ -57,7 +83,6 @@ def fetch_data(
This function also includes a retry decorator that will automatically retry the API call up to
3 times with a 5-second delay between retries, using an exponential backoff strategy.
"""
# Construct the URL and headers for the API request
url = get_url(endpoint)
Expand All @@ -75,6 +100,9 @@ def fetch_data(
_objects: List[Dict[str, Any]] = []
for _result in _data["results"]:
_obj = _result.get("properties", _result)
if "id" not in _obj and "id" in _result:
# Move id from properties to top level
_obj["id"] = _result["id"]
if "associations" in _result:
for association in _result["associations"]:
__values = [
Expand All @@ -92,8 +120,7 @@ def fetch_data(

_obj[association] = __values
_objects.append(_obj)
if _objects:
yield _objects
yield _objects, list(extract_property_history(_data["results"]))

# Follow pagination links if they exist
_next = _data.get("paging", {}).get("next", None)
Expand All @@ -106,7 +133,17 @@ def fetch_data(
_data = None


def _get_property_names(api_key: str, entity: str) -> List[str]:
def fetch_data(
endpoint: str, api_key: str, params: Optional[Dict[str, Any]] = None
) -> Iterator[List[Dict[str, Any]]]:
"""Fetch data objects from the hubspot API.
Same as `fetch_data_with_history` but does not include history entries.
"""
for page, _ in fetch_data_with_history(endpoint, api_key, params=params):
yield page


def _get_property_names(api_key: str, object_type: str) -> List[str]:
"""
Retrieve property names for a given entity from the HubSpot API.
Expand All @@ -120,7 +157,7 @@ def _get_property_names(api_key: str, entity: str) -> List[str]:
Exception: If an error occurs during the API request.
"""
properties = []
endpoint = f"/crm/v3/properties/{entity}"
endpoint = f"/crm/v3/properties/{OBJECT_TYPE_PLURAL[object_type]}"

for page in fetch_data(endpoint, api_key):
properties.extend([prop["name"] for prop in page])
Expand Down
35 changes: 29 additions & 6 deletions sources/hubspot/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,34 @@
STARTDATE = pendulum.datetime(year=2000, month=1, day=1)

CRM_CONTACTS_ENDPOINT = (
"/crm/v3/objects/contacts?associations=deals,products,tickets,quotes&limit=100"
"/crm/v3/objects/contacts?associations=deals,products,tickets,quotes"
)
CRM_COMPANIES_ENDPOINT = "/crm/v3/objects/companies?associations=contacts,deals,products,tickets,quotes&limit=100"
CRM_DEALS_ENDPOINT = "/crm/v3/objects/deals?limit=100"
CRM_PRODUCTS_ENDPOINT = "/crm/v3/objects/products?limit=100"
CRM_TICKETS_ENDPOINT = "/crm/v3/objects/tickets?limit=100"
CRM_QUOTES_ENDPOINT = "/crm/v3/objects/quotes?limit=100"
CRM_COMPANIES_ENDPOINT = (
"/crm/v3/objects/companies?associations=contacts,deals,products,tickets,quotes"
)
CRM_DEALS_ENDPOINT = "/crm/v3/objects/deals"
CRM_PRODUCTS_ENDPOINT = "/crm/v3/objects/products"
CRM_TICKETS_ENDPOINT = "/crm/v3/objects/tickets"
CRM_QUOTES_ENDPOINT = "/crm/v3/objects/quotes"

CRM_OBJECT_ENDPOINTS = {
"contact": CRM_CONTACTS_ENDPOINT,
"company": CRM_COMPANIES_ENDPOINT,
"deal": CRM_DEALS_ENDPOINT,
"product": CRM_PRODUCTS_ENDPOINT,
"ticket": CRM_TICKETS_ENDPOINT,
"quote": CRM_QUOTES_ENDPOINT,
}

WEB_ANALYTICS_EVENTS_ENDPOINT = "/events/v3/events?objectType={objectType}&objectId={objectId}&occurredAfter={occurredAfter}&occurredBefore={occurredBefore}&sort=-occurredAt"

OBJECT_TYPE_SINGULAR = {
"companies": "company",
"contacts": "contact",
"deals": "deal",
"tickets": "ticket",
"products": "product",
"quotes": "quote",
}

OBJECT_TYPE_PLURAL = {v: k for k, v in OBJECT_TYPE_SINGULAR.items()}

0 comments on commit 865b6e9

Please sign in to comment.