Skip to content

Commit

Permalink
Add hubspot properties (#206)
Browse files Browse the repository at this point in the history
* add properties

* Refactor and add properties param to all entities

* Extract properties_with_history

* Unit test contacts+properties resource

* Cleanup

* Load property history separately due to URL length + tests and demos

* Test property history with real data

* Check api calls in mock test

* Test ts format

---------

Co-authored-by: Adrian <Adrian>
  • Loading branch information
steinitzu authored Jul 17, 2023
1 parent 68c0c25 commit a3baf32
Show file tree
Hide file tree
Showing 6 changed files with 1,078 additions and 98 deletions.
95 changes: 67 additions & 28 deletions sources/hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,33 @@

from typing import Any, Dict, List, Literal, Sequence, Iterator
from urllib.parse import quote
from itertools import chain

import dlt
from dlt.common import pendulum
from dlt.common.typing import TDataItems
from dlt.common.typing import TDataItems, TDataItem
from dlt.extract.source import DltResource

from .helpers import fetch_data

from .helpers import (
fetch_data,
_get_property_names,
fetch_property_history,
)
from .settings import (
STARTDATE,
CRM_CONTACTS_ENDPOINT,
CRM_COMPANIES_ENDPOINT,
CRM_DEALS_ENDPOINT,
CRM_TICKETS_ENDPOINT,
CRM_PRODUCTS_ENDPOINT,
WEB_ANALYTICS_EVENTS_ENDPOINT,
CRM_QUOTES_ENDPOINT,
OBJECT_TYPE_SINGULAR,
CRM_OBJECT_ENDPOINTS,
OBJECT_TYPE_PLURAL,
)

THubspotObjectType = Literal["company", "contact", "deal", "ticket", "product", "quote"]


@dlt.source(name="hubspot")
def hubspot() -> Sequence[DltResource]:
def hubspot(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Sequence[DltResource]:
"""
A DLT source that retrieves data from the HubSpot API using the specified API key.
Expand All @@ -58,6 +61,7 @@ def hubspot() -> Sequence[DltResource]:
Args:
api_key (str, optional): The API key used to authenticate with the HubSpot API. Defaults to dlt.secrets.value.
include_history: Whether to load history of property changes along with entities. The history entries are loaded to separate tables.
Returns:
tuple: A tuple of Dlt resources, one for each HubSpot API endpoint.
Expand All @@ -67,49 +71,84 @@ def hubspot() -> Sequence[DltResource]:
is passed to `fetch_data` as the `api_key` argument.
"""
return [
companies(),
contacts(),
deals(),
tickets(),
products(),
quotes(),
companies(include_history=include_history),
contacts(include_history=include_history),
deals(include_history=include_history),
tickets(include_history=include_history),
products(include_history=include_history),
quotes(include_history=include_history),
]


def crm_objects(
object_type: str,
api_key: str = dlt.secrets.value,
include_history: bool = False,
) -> Iterator[TDataItems]:
"""Building blocks for CRM resources."""
props = ",".join(_get_property_names(api_key, object_type))
params = {"properties": props, "limit": 100}

yield from fetch_data(CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params)
if include_history:
# Get history separately, as requesting both all properties and history together
# is likely to hit hubspot's URL length limit
for history_entries in fetch_property_history(
CRM_OBJECT_ENDPOINTS[object_type],
api_key,
props,
):
yield dlt.mark.with_table_name(
history_entries, OBJECT_TYPE_PLURAL[object_type] + "_property_history"
)


@dlt.resource(name="companies", write_disposition="replace")
def companies(api_key: str = dlt.secrets.value) -> Iterator[TDataItems]:
def companies(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot companies resource"""
yield from fetch_data(CRM_COMPANIES_ENDPOINT, api_key=api_key)
yield from crm_objects("company", api_key, include_history=False)


@dlt.resource(name="contacts", write_disposition="replace")
def contacts(api_key: str = dlt.secrets.value) -> Iterator[TDataItems]:
def contacts(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot contacts resource"""
yield from fetch_data(CRM_CONTACTS_ENDPOINT, api_key=api_key)
yield from crm_objects("contact", api_key, include_history)


@dlt.resource(name="deals", write_disposition="replace")
def deals(api_key: str = dlt.secrets.value) -> Iterator[TDataItems]:
def deals(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot deals resource"""
yield from fetch_data(CRM_DEALS_ENDPOINT, api_key=api_key)
yield from crm_objects("deal", api_key, include_history)


@dlt.resource(name="tickets", write_disposition="replace")
def tickets(api_key: str = dlt.secrets.value) -> Iterator[TDataItems]:
def tickets(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot tickets resource"""
yield from fetch_data(CRM_TICKETS_ENDPOINT, api_key=api_key)
yield from crm_objects("ticket", api_key, include_history)


@dlt.resource(name="products", write_disposition="replace")
def products(api_key: str = dlt.secrets.value) -> Iterator[TDataItems]:
def products(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot products resource"""
yield from fetch_data(CRM_PRODUCTS_ENDPOINT, api_key=api_key)
yield from crm_objects("product", api_key, include_history)


@dlt.resource(name="quotes", write_disposition="replace")
def quotes(api_key: str = dlt.secrets.value) -> Iterator[TDataItems]:
def quotes(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot quotes resource"""
yield from fetch_data(CRM_QUOTES_ENDPOINT, api_key=api_key)
yield from crm_objects("quote", api_key, include_history)


@dlt.resource
Expand Down
176 changes: 117 additions & 59 deletions sources/hubspot/helpers.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
"""Hubspot source helpers"""

import urllib.parse
from typing import Generator, Dict, Any, List
from typing import Iterator, Dict, Any, List, Optional, Iterable, Tuple

from dlt.sources.helpers import requests
from .settings import OBJECT_TYPE_PLURAL

BASE_URL = "https://api.hubapi.com/"


def get_url(endpoint: str, **kwargs: Any) -> str:
return urllib.parse.urljoin(BASE_URL, endpoint.format(**kwargs))
def get_url(endpoint: str) -> str:
"""Get absolute hubspot endpoint URL"""
return urllib.parse.urljoin(BASE_URL, endpoint)


def _get_headers(api_key: str) -> Dict[str, str]:
Expand All @@ -28,76 +30,75 @@ def _get_headers(api_key: str) -> Dict[str, str]:
return dict(authorization=f"Bearer {api_key}")


def _parse_response(
r: requests.Response, **kwargs: str
) -> Generator[List[Dict[str, Any]], None, None]:
"""
Parse a JSON response from HUBSPOT and yield the properties of each result.
def extract_property_history(objects: List[Dict[str, Any]]) -> Iterator[Dict[str, Any]]:
for item in objects:
history = item.get("propertiesWithHistory")
if not history:
return
# Yield a flat list of property history entries
for key, changes in history.items():
if not changes:
continue
for entry in changes:
yield {"object_id": item["id"], "property_name": key, **entry}


def fetch_property_history(
endpoint: str, api_key: str, props: str, params: Optional[Dict[str, Any]] = None
) -> Iterator[List[Dict[str, Any]]]:
"""Fetch property history from the given CRM endpoint.
Args:
r (requests.Response): The response object from the API call.
**kwargs: Additional keyword arguments to pass to the `fetch_data` function.
endpoint: The endpoint to fetch data from, as a string.
api_key: The API key to use for authentication, as a string.
props: A comma separated list of properties to retrieve the history for
params: Optional dict of query params to include in the request
Yields:
dict: The properties of each result in the API response.
Notes:
This method assumes that the API response is in JSON format, and that the results are contained
within the "results" key of the JSON object. If the response does not contain any results, a
`ValueError` will be raised.
List of property history entries (dicts)
"""
# Construct the URL and headers for the API request
url = get_url(endpoint)
headers = _get_headers(api_key)

If the response contains pagination information in the "paging" key of the JSON object, this method
will follow the "next" link in the pagination information and yield the properties of each result in
the subsequent pages. The `fetch_data` function is used to retrieve the subsequent pages, and any
additional keyword arguments passed to this method will be passed on to the `fetch_data` function.
params = dict(params or {})
params["propertiesWithHistory"] = props
params["limit"] = 50
# Make the API request
r = requests.get(url, headers=headers, params=params)
# Parse the API response and yield the properties of each result

"""
# Parse the response JSON data
_data = r.json()
while _data is not None:
if "results" in _data:
yield list(extract_property_history(_data["results"]))

# Yield the properties of each result in the API response
if "results" in _data:
_objects: List[Dict[str, Any]] = []
for _result in _data["results"]:
_obj = _result["properties"]
if "associations" in _result:
for association in _result["associations"]:
__values = [
{"value": _obj["hs_object_id"], f"{association}_id": __r["id"]}
for __r in _result["associations"][association]["results"]
]

# remove duplicates from list of dicts
__values = [dict(t) for t in {tuple(d.items()) for d in __values}]

_obj[association] = __values
_objects.append(_obj)
if _objects:
yield _objects

# Follow pagination links if they exist
if "paging" in _data:
_next = _data["paging"].get("next", None)
# Follow pagination links if they exist
_next = _data.get("paging", {}).get("next", None)
if _next:
# Replace the base URL with an empty string to get the relative URL for the next page
next_url = _next["link"].replace(BASE_URL, "")
# Recursively call the `fetch_data` function to get the next page of results
yield from fetch_data(next_url, **kwargs)
next_url = _next["link"]
# Get the next page response
r = requests.get(next_url, headers=headers)
_data = r.json()
else:
_data = None


def fetch_data(
endpoint: str, api_key: str, **kwargs: str
) -> Generator[List[Dict[str, Any]], None, None]:
endpoint: str, api_key: str, params: Optional[Dict[str, Any]] = None
) -> Iterator[List[Dict[str, Any]]]:
"""
Fetch data from HUBSPOT endpoint using a specified API key and yield the properties of each result.
For paginated endpoint this function yields item from all pages.
Args:
endpoint (str): The endpoint to fetch data from, as a string.
api_key (str): The API key to use for authentication, as a string.
**kwargs: Additional keyword arguments to pass to the `_parse_response` function.
params: Optional dict of query params to include in the request
Yields:
List[dict]: The properties of each result in the API response.
A List of CRM object dicts
Raises:
requests.exceptions.HTTPError: If the API returns an HTTP error status code.
Expand All @@ -108,19 +109,76 @@ def fetch_data(
404 Not Found), a `requests.exceptions.HTTPError` exception will be raised.
The `endpoint` argument should be a relative URL, which will be appended to the base URL for the
API. The `**kwargs` argument is used to pass additional keyword arguments to the `_parse_response`
function, such as any parameters that need to be included in the API request.
API. The `params` argument is used to pass additional query parameters to the request
This function also includes a retry decorator that will automatically retry the API call up to
3 times with a 5-second delay between retries, using an exponential backoff strategy.
"""
# Construct the URL and headers for the API request
url = get_url(endpoint, **kwargs)
url = get_url(endpoint)
headers = _get_headers(api_key)

# Make the API request
r = requests.get(url, headers=headers)

r = requests.get(url, headers=headers, params=params)
# Parse the API response and yield the properties of each result
return _parse_response(r, api_key=api_key, **kwargs)
# Parse the response JSON data
_data = r.json()
# Yield the properties of each result in the API response
while _data is not None:
if "results" in _data:
_objects: List[Dict[str, Any]] = []
for _result in _data["results"]:
_obj = _result.get("properties", _result)
if "id" not in _obj and "id" in _result:
# Move id from properties to top level
_obj["id"] = _result["id"]
if "associations" in _result:
for association in _result["associations"]:
__values = [
{
"value": _obj["hs_object_id"],
f"{association}_id": __r["id"],
}
for __r in _result["associations"][association]["results"]
]

# remove duplicates from list of dicts
__values = [
dict(t) for t in {tuple(d.items()) for d in __values}
]

_obj[association] = __values
_objects.append(_obj)
yield _objects

# Follow pagination links if they exist
_next = _data.get("paging", {}).get("next", None)
if _next:
next_url = _next["link"]
# Get the next page response
r = requests.get(next_url, headers=headers)
_data = r.json()
else:
_data = None


def _get_property_names(api_key: str, object_type: str) -> List[str]:
"""
Retrieve property names for a given entity from the HubSpot API.
Args:
entity: The entity name for which to retrieve property names.
Returns:
A list of property names.
Raises:
Exception: If an error occurs during the API request.
"""
properties = []
endpoint = f"/crm/v3/properties/{OBJECT_TYPE_PLURAL[object_type]}"

for page in fetch_data(endpoint, api_key):
properties.extend([prop["name"] for prop in page])

return properties
Loading

0 comments on commit a3baf32

Please sign in to comment.