Skip to content

Commit

Permalink
Load property history separately due to URL length + tests and demos
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Jul 15, 2023
1 parent dab40ce commit 125546b
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 30 deletions.
28 changes: 17 additions & 11 deletions sources/hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
from dlt.common.typing import TDataItems, TDataItem
from dlt.extract.source import DltResource

from .helpers import fetch_data, _get_property_names, fetch_data_with_history
from .helpers import (
fetch_data,
_get_property_names,
fetch_property_history,
)
from .settings import (
STARTDATE,
WEB_ANALYTICS_EVENTS_ENDPOINT,
Expand Down Expand Up @@ -84,17 +88,19 @@ def crm_objects(
"""Building blocks for CRM resources."""
props = ",".join(_get_property_names(api_key, object_type))
params = {"properties": props, "limit": 100}

yield from fetch_data(CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params)
if include_history:
params["propertiesWithHistory"] = props
# API allows max 50 items per call with property history
params["limit"] = 50
for objects, history_entries in fetch_data_with_history(
CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params
):
yield objects
yield dlt.mark.with_table_name(
history_entries, OBJECT_TYPE_PLURAL[object_type] + "_property_history"
)
# Get history separately, as requesting both all properties and history together
# is likely to hit hubspot's URL length limit
for history_entries in fetch_property_history(
CRM_OBJECT_ENDPOINTS[object_type],
api_key,
props,
):
yield dlt.mark.with_table_name(
history_entries, OBJECT_TYPE_PLURAL[object_type] + "_property_history"
)


@dlt.resource(name="companies", write_disposition="replace")
Expand Down
63 changes: 46 additions & 17 deletions sources/hubspot/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,62 @@ def extract_property_history(objects: List[Dict[str, Any]]) -> Iterator[Dict[str
yield {"object_id": item["id"], "property_name": key, **entry}


def fetch_data_with_history(
def fetch_property_history(
endpoint: str, api_key: str, props: str, params: Optional[Dict[str, Any]] = None
) -> Iterator[List[Dict[str, Any]]]:
"""Fetch property history from the given CRM endpoint.
Args:
endpoint: The endpoint to fetch data from, as a string.
api_key: The API key to use for authentication, as a string.
props: A comma separated list of properties to retrieve the history for
params: Optional dict of query params to include in the request
Yields:
List of property history entries (dicts)
"""
# Construct the URL and headers for the API request
url = get_url(endpoint)
headers = _get_headers(api_key)

params = dict(params or {})
params["propertiesWithHistory"] = props
params["limit"] = 50
# Make the API request
r = requests.get(url, headers=headers, params=params)
# Parse the API response and yield the properties of each result

# Parse the response JSON data
_data = r.json()
while _data is not None:
if "results" in _data:
yield list(extract_property_history(_data["results"]))

# Follow pagination links if they exist
_next = _data.get("paging", {}).get("next", None)
if _next:
next_url = _next["link"]
# Get the next page response
r = requests.get(next_url, headers=headers)
_data = r.json()
else:
_data = None


def fetch_data(
endpoint: str, api_key: str, params: Optional[Dict[str, Any]] = None
) -> Iterator[Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]]:
) -> Iterator[List[Dict[str, Any]]]:
"""
Fetch data from HUBSPOT endpoint using a specified API key and yield the properties of each result.
For paginated endpoint this function yields item from all pages.
For objects that support it and When params includes `propertiesWithHistory`
a flattened list of history entries is included in the return tuple.
Args:
endpoint (str): The endpoint to fetch data from, as a string.
api_key (str): The API key to use for authentication, as a string.
params: Optional dict of query params to include in the request
Yields:
A tuple consisting of 1. List of CRM object dicts and 2. List of property history entries
A List of CRM object dicts
Raises:
requests.exceptions.HTTPError: If the API returns an HTTP error status code.
Expand All @@ -81,7 +121,6 @@ def fetch_data_with_history(
# Make the API request
r = requests.get(url, headers=headers, params=params)
# Parse the API response and yield the properties of each result

# Parse the response JSON data
_data = r.json()
# Yield the properties of each result in the API response
Expand Down Expand Up @@ -110,7 +149,7 @@ def fetch_data_with_history(

_obj[association] = __values
_objects.append(_obj)
yield _objects, list(extract_property_history(_data["results"]))
yield _objects

# Follow pagination links if they exist
_next = _data.get("paging", {}).get("next", None)
Expand All @@ -123,16 +162,6 @@ def fetch_data_with_history(
_data = None


def fetch_data(
endpoint: str, api_key: str, params: Optional[Dict[str, Any]] = None
) -> Iterator[List[Dict[str, Any]]]:
"""Fetch data objects from the hubspot API.
Same as `fetch_data_with_history` but does not include history entries.
"""
for page, _ in fetch_data_with_history(endpoint, api_key, params=params):
yield page


def _get_property_names(api_key: str, object_type: str) -> List[str]:
"""
Retrieve property names for a given entity from the HubSpot API.
Expand Down
28 changes: 28 additions & 0 deletions sources/hubspot_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,33 @@ def load_crm_data() -> None:
print(info)


def load_crm_data_with_history() -> None:
"""
Loads all HubSpot CRM resources and property change history for each entity.
The history entries are loaded to a tables per resource `{resource_name}_property_history`, e.g. `contacts_property_history`
Returns:
None
"""

# Create a DLT pipeline object with the pipeline name, dataset name, and destination database type
# Add full_refresh=(True or False) if you need your pipeline to create the dataset in your destination
p = dlt.pipeline(
pipeline_name="hubspot_pipeline",
dataset_name="hubspot",
destination="postgres",
)

# Configure the source with `include_history` to enable property history load, history is disabled by default
data = hubspot(include_history=True)

# Run the pipeline with the HubSpot source connector
info = p.run(data)

# Print information about the pipeline run
print(info)


def load_web_analytics_events(
object_type: THubspotObjectType, object_ids: List[str]
) -> None:
Expand Down Expand Up @@ -57,4 +84,5 @@ def load_web_analytics_events(
if __name__ == "__main__":
# Call the functions to load HubSpot data into the database with and without company events enabled
load_crm_data()
# load_crm_data_with_history()
# load_web_analytics_events("company", ["7086461639", "7086464459"])
27 changes: 25 additions & 2 deletions tests/hubspot/test_hubspot_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,40 @@ def test_all_resources(destination_name: str) -> None:
dataset_name="hubspot_data",
full_refresh=True,
)
load_info = pipeline.run(hubspot())
load_info = pipeline.run(hubspot(include_history=True))
print(load_info)
assert_load_info(load_info)
table_names = [t["name"] for t in pipeline.default_schema.data_tables()]
table_names = [
t["name"]
for t in pipeline.default_schema.data_tables()
if not t["name"].endswith("_property_history") and not t.get("parent")
]

# make sure no duplicates (ie. pages wrongly overlap)
assert (
load_table_counts(pipeline, *table_names)
== load_table_distinct_counts(pipeline, "hs_object_id", *table_names)
== {"companies": 200, "deals": 500, "contacts": 402}
)

history_table_names = [
t["name"]
for t in pipeline.default_schema.data_tables()
if t["name"].endswith("_property_history")
]
# Check history tables
history_counts = load_table_counts(pipeline, *history_table_names)
# Only check there are some records for now
assert history_counts["contacts_property_history"] >= 1
assert history_counts["deals_property_history"] >= 1

# Check common columns
with pipeline.sql_client() as client:
row = client.execute_sql(
"SELECT object_id, value, property_name FROM deals_property_history LIMIT 1"
)[0]
assert all(bool(val) and isinstance(val, str) for val in row)


@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
def test_event_resources(destination_name: str) -> None:
Expand Down

0 comments on commit 125546b

Please sign in to comment.