Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hubspot properties #206

Merged
merged 9 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 61 additions & 28 deletions sources/hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,29 @@

from typing import Any, Dict, List, Literal, Sequence, Iterator
from urllib.parse import quote
from itertools import chain

import dlt
from dlt.common import pendulum
from dlt.common.typing import TDataItems
from dlt.common.typing import TDataItems, TDataItem
from dlt.extract.source import DltResource

from .helpers import fetch_data

from .helpers import fetch_data, _get_property_names, fetch_data_with_history
from .settings import (
STARTDATE,
CRM_CONTACTS_ENDPOINT,
CRM_COMPANIES_ENDPOINT,
CRM_DEALS_ENDPOINT,
CRM_TICKETS_ENDPOINT,
CRM_PRODUCTS_ENDPOINT,
WEB_ANALYTICS_EVENTS_ENDPOINT,
CRM_QUOTES_ENDPOINT,
OBJECT_TYPE_SINGULAR,
CRM_OBJECT_ENDPOINTS,
OBJECT_TYPE_PLURAL,
)

THubspotObjectType = Literal["company", "contact", "deal", "ticket", "product", "quote"]


@dlt.source(name="hubspot")
def hubspot() -> Sequence[DltResource]:
def hubspot(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Sequence[DltResource]:
"""
A DLT source that retrieves data from the HubSpot API using the specified API key.

Expand All @@ -58,6 +57,7 @@ def hubspot() -> Sequence[DltResource]:

Args:
api_key (str, optional): The API key used to authenticate with the HubSpot API. Defaults to dlt.secrets.value.
include_history: Whether to load history of property changes along with entities. The history entries are loaded to separate tables.

Returns:
tuple: A tuple of Dlt resources, one for each HubSpot API endpoint.
Expand All @@ -67,49 +67,82 @@ def hubspot() -> Sequence[DltResource]:
is passed to `fetch_data` as the `api_key` argument.
"""
return [
companies(),
contacts(),
deals(),
tickets(),
products(),
quotes(),
companies(include_history=include_history),
contacts(include_history=include_history),
deals(include_history=include_history),
tickets(include_history=include_history),
products(include_history=include_history),
quotes(include_history=include_history),
]


def crm_objects(
object_type: str,
api_key: str = dlt.secrets.value,
include_history: bool = False,
) -> Iterator[TDataItems]:
"""Building blocks for CRM resources."""
props = ",".join(_get_property_names(api_key, object_type))
params = {"properties": props, "limit": 100}
if include_history:
params["propertiesWithHistory"] = props
# API allows max 50 items per call with property history
params["limit"] = 50
for objects, history_entries in fetch_data_with_history(
CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params
):
yield objects
yield dlt.mark.with_table_name(
history_entries, OBJECT_TYPE_PLURAL[object_type] + "_property_history"
)


@dlt.resource(name="companies", write_disposition="replace")
def companies(api_key: str = dlt.secrets.value) -> Iterator[TDataItems]:
def companies(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot companies resource"""
yield from fetch_data(CRM_COMPANIES_ENDPOINT, api_key=api_key)
yield from crm_objects("company", api_key, include_history=False)


@dlt.resource(name="contacts", write_disposition="replace")
def contacts(api_key: str = dlt.secrets.value) -> Iterator[TDataItems]:
def contacts(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot contacts resource"""
yield from fetch_data(CRM_CONTACTS_ENDPOINT, api_key=api_key)
yield from crm_objects("contact", api_key, include_history)


@dlt.resource(name="deals", write_disposition="replace")
def deals(api_key: str = dlt.secrets.value) -> Iterator[TDataItems]:
def deals(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot deals resource"""
yield from fetch_data(CRM_DEALS_ENDPOINT, api_key=api_key)
yield from crm_objects("deal", api_key, include_history)


@dlt.resource(name="tickets", write_disposition="replace")
def tickets(api_key: str = dlt.secrets.value) -> Iterator[TDataItems]:
def tickets(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot tickets resource"""
yield from fetch_data(CRM_TICKETS_ENDPOINT, api_key=api_key)
yield from crm_objects("ticket", api_key, include_history)


@dlt.resource(name="products", write_disposition="replace")
def products(api_key: str = dlt.secrets.value) -> Iterator[TDataItems]:
def products(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot products resource"""
yield from fetch_data(CRM_PRODUCTS_ENDPOINT, api_key=api_key)
yield from crm_objects("product", api_key, include_history)


@dlt.resource(name="quotes", write_disposition="replace")
def quotes(api_key: str = dlt.secrets.value) -> Iterator[TDataItems]:
def quotes(
api_key: str = dlt.secrets.value, include_history: bool = False
) -> Iterator[TDataItems]:
"""Hubspot quotes resource"""
yield from fetch_data(CRM_QUOTES_ENDPOINT, api_key=api_key)
yield from crm_objects("quote", api_key, include_history)


@dlt.resource
Expand Down
169 changes: 99 additions & 70 deletions sources/hubspot/helpers.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
"""Hubspot source helpers"""

import urllib.parse
from typing import Generator, Dict, Any, List
from typing import Iterator, Dict, Any, List, Optional, Iterable, Tuple

from dlt.sources.helpers import requests
from .settings import OBJECT_TYPE_PLURAL

BASE_URL = "https://api.hubapi.com/"


def get_url(endpoint: str, **kwargs: Any) -> str:
return urllib.parse.urljoin(BASE_URL, endpoint.format(**kwargs))
def get_url(endpoint: str) -> str:
"""Get absolute hubspot endpoint URL"""
return urllib.parse.urljoin(BASE_URL, endpoint)


def _get_headers(api_key: str) -> Dict[str, str]:
Expand All @@ -28,76 +30,35 @@ def _get_headers(api_key: str) -> Dict[str, str]:
return dict(authorization=f"Bearer {api_key}")


def _parse_response(
r: requests.Response, **kwargs: str
) -> Generator[List[Dict[str, Any]], None, None]:
"""
Parse a JSON response from HUBSPOT and yield the properties of each result.

Args:
r (requests.Response): The response object from the API call.
**kwargs: Additional keyword arguments to pass to the `fetch_data` function.

Yields:
dict: The properties of each result in the API response.

Notes:
This method assumes that the API response is in JSON format, and that the results are contained
within the "results" key of the JSON object. If the response does not contain any results, a
`ValueError` will be raised.

If the response contains pagination information in the "paging" key of the JSON object, this method
will follow the "next" link in the pagination information and yield the properties of each result in
the subsequent pages. The `fetch_data` function is used to retrieve the subsequent pages, and any
additional keyword arguments passed to this method will be passed on to the `fetch_data` function.

"""
# Parse the response JSON data
_data = r.json()

# Yield the properties of each result in the API response
if "results" in _data:
_objects: List[Dict[str, Any]] = []
for _result in _data["results"]:
_obj = _result["properties"]
if "associations" in _result:
for association in _result["associations"]:
__values = [
{"value": _obj["hs_object_id"], f"{association}_id": __r["id"]}
for __r in _result["associations"][association]["results"]
]

# remove duplicates from list of dicts
__values = [dict(t) for t in {tuple(d.items()) for d in __values}]

_obj[association] = __values
_objects.append(_obj)
if _objects:
yield _objects

# Follow pagination links if they exist
if "paging" in _data:
_next = _data["paging"].get("next", None)
if _next:
# Replace the base URL with an empty string to get the relative URL for the next page
next_url = _next["link"].replace(BASE_URL, "")
# Recursively call the `fetch_data` function to get the next page of results
yield from fetch_data(next_url, **kwargs)
def extract_property_history(objects: List[Dict[str, Any]]) -> Iterator[Dict[str, Any]]:
for item in objects:
history = item.get("propertiesWithHistory")
if not history:
return
# Yield a flat list of property history entries
for key, changes in history.items():
if not changes:
continue
for entry in changes:
yield {"object_id": item["id"], "property_name": key, **entry}


def fetch_data(
endpoint: str, api_key: str, **kwargs: str
) -> Generator[List[Dict[str, Any]], None, None]:
def fetch_data_with_history(
endpoint: str, api_key: str, params: Optional[Dict[str, Any]] = None
) -> Iterator[Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]]:
"""
Fetch data from HUBSPOT endpoint using a specified API key and yield the properties of each result.
For paginated endpoint this function yields item from all pages.
For objects that support it and When params includes `propertiesWithHistory`
a flattened list of history entries is included in the return tuple.

Args:
endpoint (str): The endpoint to fetch data from, as a string.
api_key (str): The API key to use for authentication, as a string.
**kwargs: Additional keyword arguments to pass to the `_parse_response` function.
params: Optional dict of query params to include in the request

Yields:
List[dict]: The properties of each result in the API response.
A tuple consisting of 1. List of CRM object dicts and 2. List of property history entries

Raises:
requests.exceptions.HTTPError: If the API returns an HTTP error status code.
Expand All @@ -108,19 +69,87 @@ def fetch_data(
404 Not Found), a `requests.exceptions.HTTPError` exception will be raised.

The `endpoint` argument should be a relative URL, which will be appended to the base URL for the
API. The `**kwargs` argument is used to pass additional keyword arguments to the `_parse_response`
function, such as any parameters that need to be included in the API request.
API. The `params` argument is used to pass additional query parameters to the request

This function also includes a retry decorator that will automatically retry the API call up to
3 times with a 5-second delay between retries, using an exponential backoff strategy.

"""
# Construct the URL and headers for the API request
url = get_url(endpoint, **kwargs)
url = get_url(endpoint)
headers = _get_headers(api_key)

# Make the API request
r = requests.get(url, headers=headers)

r = requests.get(url, headers=headers, params=params)
# Parse the API response and yield the properties of each result
return _parse_response(r, api_key=api_key, **kwargs)

# Parse the response JSON data
_data = r.json()
# Yield the properties of each result in the API response
while _data is not None:
if "results" in _data:
_objects: List[Dict[str, Any]] = []
for _result in _data["results"]:
_obj = _result.get("properties", _result)
if "id" not in _obj and "id" in _result:
# Move id from properties to top level
_obj["id"] = _result["id"]
if "associations" in _result:
for association in _result["associations"]:
__values = [
{
"value": _obj["hs_object_id"],
f"{association}_id": __r["id"],
}
for __r in _result["associations"][association]["results"]
]

# remove duplicates from list of dicts
__values = [
dict(t) for t in {tuple(d.items()) for d in __values}
]

_obj[association] = __values
_objects.append(_obj)
yield _objects, list(extract_property_history(_data["results"]))

# Follow pagination links if they exist
_next = _data.get("paging", {}).get("next", None)
if _next:
next_url = _next["link"]
# Get the next page response
r = requests.get(next_url, headers=headers)
_data = r.json()
else:
_data = None


def fetch_data(
endpoint: str, api_key: str, params: Optional[Dict[str, Any]] = None
) -> Iterator[List[Dict[str, Any]]]:
"""Fetch data objects from the hubspot API.
Same as `fetch_data_with_history` but does not include history entries.
"""
for page, _ in fetch_data_with_history(endpoint, api_key, params=params):
yield page


def _get_property_names(api_key: str, object_type: str) -> List[str]:
"""
Retrieve property names for a given entity from the HubSpot API.

Args:
entity: The entity name for which to retrieve property names.

Returns:
A list of property names.

Raises:
Exception: If an error occurs during the API request.
"""
properties = []
endpoint = f"/crm/v3/properties/{OBJECT_TYPE_PLURAL[object_type]}"

for page in fetch_data(endpoint, api_key):
properties.extend([prop["name"] for prop in page])

return properties
Loading