diff --git a/sources/hubspot/__init__.py b/sources/hubspot/__init__.py index 16a56403a..e0674761b 100644 --- a/sources/hubspot/__init__.py +++ b/sources/hubspot/__init__.py @@ -38,11 +38,17 @@ fetch_property_history, ) from .settings import ( - STARTDATE, - WEB_ANALYTICS_EVENTS_ENDPOINT, - OBJECT_TYPE_SINGULAR, + ALL, CRM_OBJECT_ENDPOINTS, + DEFAULT_COMPANY_PROPS, + DEFAULT_CONTACT_PROPS, + DEFAULT_DEAL_PROPS, + DEFAULT_PRODUCT_PROPS, + DEFAULT_TICKET_PROPS, + DEFAULT_QUOTE_PROPS, OBJECT_TYPE_PLURAL, + STARTDATE, + WEB_ANALYTICS_EVENTS_ENDPOINT, ) THubspotObjectType = Literal["company", "contact", "deal", "ticket", "product", "quote"] @@ -50,42 +56,163 @@ @dlt.source(name="hubspot") def hubspot( - api_key: str = dlt.secrets.value, include_history: bool = False + api_key: str = dlt.secrets.value, + include_history: bool = False, ) -> Sequence[DltResource]: """ - A DLT source that retrieves data from the HubSpot API using the specified API key. + A DLT source that retrieves data from the HubSpot API using the + specified API key. - This function retrieves data for several HubSpot API endpoints, including companies, contacts, deals, - tickets, products and web analytics events. It returns a tuple of Dlt resources, one for each endpoint. + This function retrieves data for several HubSpot API endpoints, + including companies, contacts, deals, tickets, products and web + analytics events. It returns a tuple of Dlt resources, one for + each endpoint. Args: - api_key (str, optional): The API key used to authenticate with the HubSpot API. Defaults to dlt.secrets.value. - include_history: Whether to load history of property changes along with entities. The history entries are loaded to separate tables. + api_key (Optional[str]): + The API key used to authenticate with the HubSpot API. Defaults + to dlt.secrets.value. + include_history (Optional[bool]): + Whether to load history of property changes along with entities. + The history entries are loaded to separate tables. Returns: - tuple: A tuple of Dlt resources, one for each HubSpot API endpoint. + Sequence[DltResource]: Dlt resources, one for each HubSpot API endpoint. Notes: - This function uses the `fetch_data` function to retrieve data from the HubSpot CRM API. The API key - is passed to `fetch_data` as the `api_key` argument. + This function uses the `fetch_data` function to retrieve data from the + HubSpot CRM API. The API key is passed to `fetch_data` as the + `api_key` argument. """ - return [ - companies(include_history=include_history), - contacts(include_history=include_history), - deals(include_history=include_history), - tickets(include_history=include_history), - products(include_history=include_history), - quotes(include_history=include_history), - ] + + @dlt.resource(name="companies", write_disposition="replace") + def companies( + api_key: str = api_key, + include_history: bool = include_history, + props: Sequence[str] = DEFAULT_COMPANY_PROPS, + include_custom_props: bool = True, + ) -> Iterator[TDataItems]: + """Hubspot companies resource""" + yield from crm_objects( + "company", + api_key, + include_history=include_history, + props=props, + include_custom_props=include_custom_props, + ) + + @dlt.resource(name="contacts", write_disposition="replace") + def contacts( + api_key: str = api_key, + include_history: bool = include_history, + props: Sequence[str] = DEFAULT_CONTACT_PROPS, + include_custom_props: bool = True, + ) -> Iterator[TDataItems]: + """Hubspot contacts resource""" + yield from crm_objects( + "contact", + api_key, + include_history, + props, + include_custom_props, + ) + + @dlt.resource(name="deals", write_disposition="replace") + def deals( + api_key: str = api_key, + include_history: bool = include_history, + props: Sequence[str] = DEFAULT_DEAL_PROPS, + include_custom_props: bool = True, + ) -> Iterator[TDataItems]: + """Hubspot deals resource""" + yield from crm_objects( + "deal", + api_key, + include_history, + props, + include_custom_props, + ) + + @dlt.resource(name="tickets", write_disposition="replace") + def tickets( + api_key: str = api_key, + include_history: bool = include_history, + props: Sequence[str] = DEFAULT_TICKET_PROPS, + include_custom_props: bool = True, + ) -> Iterator[TDataItems]: + """Hubspot tickets resource""" + yield from crm_objects( + "ticket", + api_key, + include_history, + props, + include_custom_props, + ) + + @dlt.resource(name="products", write_disposition="replace") + def products( + api_key: str = api_key, + include_history: bool = include_history, + props: Sequence[str] = DEFAULT_PRODUCT_PROPS, + include_custom_props: bool = True, + ) -> Iterator[TDataItems]: + """Hubspot products resource""" + yield from crm_objects( + "product", + api_key, + include_history, + props, + include_custom_props, + ) + + @dlt.resource(name="quotes", write_disposition="replace") + def quotes( + api_key: str = api_key, + include_history: bool = include_history, + props: Sequence[str] = DEFAULT_QUOTE_PROPS, + include_custom_props: bool = True, + ) -> Iterator[TDataItems]: + """Hubspot quotes resource""" + yield from crm_objects( + "quote", + api_key, + include_history, + props, + include_custom_props, + ) + + return companies, contacts, deals, tickets, products, quotes def crm_objects( object_type: str, api_key: str = dlt.secrets.value, include_history: bool = False, + props: Sequence[str] = None, + include_custom_props: bool = True, ) -> Iterator[TDataItems]: """Building blocks for CRM resources.""" - props = ",".join(_get_property_names(api_key, object_type)) + if props == ALL: + props = list(_get_property_names(api_key, object_type)) + + if include_custom_props: + all_props = _get_property_names(api_key, object_type) + custom_props = [prop for prop in all_props if not prop.startswith("hs_")] + props = props + custom_props # type: ignore + + props = ",".join(sorted(list(set(props)))) + + if len(props) > 2000: + raise ValueError( + ( + "Your request to Hubspot is too long to process. " + "Maximum allowed query length is 2000 symbols, while " + f"your list of properties `{props[:200]}`... is {len(props)} " + "symbols long. Use the `props` argument of the resource to " + "set the list of properties to extract from the endpoint." + ) + ) + params = {"properties": props, "limit": 100} yield from fetch_data(CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params) @@ -98,58 +225,11 @@ def crm_objects( props, ): yield dlt.mark.with_table_name( - history_entries, OBJECT_TYPE_PLURAL[object_type] + "_property_history" + history_entries, + OBJECT_TYPE_PLURAL[object_type] + "_property_history", ) -@dlt.resource(name="companies", write_disposition="replace") -def companies( - api_key: str = dlt.secrets.value, include_history: bool = False -) -> Iterator[TDataItems]: - """Hubspot companies resource""" - yield from crm_objects("company", api_key, include_history=False) - - -@dlt.resource(name="contacts", write_disposition="replace") -def contacts( - api_key: str = dlt.secrets.value, include_history: bool = False -) -> Iterator[TDataItems]: - """Hubspot contacts resource""" - yield from crm_objects("contact", api_key, include_history) - - -@dlt.resource(name="deals", write_disposition="replace") -def deals( - api_key: str = dlt.secrets.value, include_history: bool = False -) -> Iterator[TDataItems]: - """Hubspot deals resource""" - yield from crm_objects("deal", api_key, include_history) - - -@dlt.resource(name="tickets", write_disposition="replace") -def tickets( - api_key: str = dlt.secrets.value, include_history: bool = False -) -> Iterator[TDataItems]: - """Hubspot tickets resource""" - yield from crm_objects("ticket", api_key, include_history) - - -@dlt.resource(name="products", write_disposition="replace") -def products( - api_key: str = dlt.secrets.value, include_history: bool = False -) -> Iterator[TDataItems]: - """Hubspot products resource""" - yield from crm_objects("product", api_key, include_history) - - -@dlt.resource(name="quotes", write_disposition="replace") -def quotes( - api_key: str = dlt.secrets.value, include_history: bool = False -) -> Iterator[TDataItems]: - """Hubspot quotes resource""" - yield from crm_objects("quote", api_key, include_history) - - @dlt.resource def hubspot_events_for_objects( object_type: THubspotObjectType, diff --git a/sources/hubspot/helpers.py b/sources/hubspot/helpers.py index 946d6b6ba..4b4cf190b 100644 --- a/sources/hubspot/helpers.py +++ b/sources/hubspot/helpers.py @@ -44,7 +44,10 @@ def extract_property_history(objects: List[Dict[str, Any]]) -> Iterator[Dict[str def fetch_property_history( - endpoint: str, api_key: str, props: str, params: Optional[Dict[str, Any]] = None + endpoint: str, + api_key: str, + props: str, + params: Optional[Dict[str, Any]] = None, ) -> Iterator[List[Dict[str, Any]]]: """Fetch property history from the given CRM endpoint. diff --git a/sources/hubspot/settings.py b/sources/hubspot/settings.py index 87669aa81..05fe4d9d0 100644 --- a/sources/hubspot/settings.py +++ b/sources/hubspot/settings.py @@ -36,3 +36,64 @@ } OBJECT_TYPE_PLURAL = {v: k for k, v in OBJECT_TYPE_SINGULAR.items()} + +DEFAULT_DEAL_PROPS = [ + "amount", + "closedate", + "createdate", + "dealname", + "dealstage", + "hs_lastmodifieddate", + "hs_object_id", + "pipeline", +] + +DEFAULT_COMPANY_PROPS = [ + "createdate", + "domain", + "hs_lastmodifieddate", + "hs_object_id", + "name", +] + +DEFAULT_CONTACT_PROPS = [ + "createdate", + "email", + "firstname", + "hs_object_id", + "lastmodifieddate", + "lastname", +] + +DEFAULT_TICKET_PROPS = [ + "createdate", + "content", + "hs_lastmodifieddate", + "hs_object_id", + "hs_pipeline", + "hs_pipeline_stage", + "hs_ticket_category", + "hs_ticket_priority", + "subject", +] + +DEFAULT_PRODUCT_PROPS = [ + "createdate", + "description", + "hs_lastmodifieddate", + "hs_object_id", + "name", + "price", +] + +DEFAULT_QUOTE_PROPS = [ + "hs_createdate", + "hs_expiration_date", + "hs_lastmodifieddate", + "hs_object_id", + "hs_public_url_key", + "hs_status", + "hs_title", +] + +ALL = ("ALL",) diff --git a/sources/hubspot_pipeline.py b/sources/hubspot_pipeline.py index 0c1308f95..29a555436 100644 --- a/sources/hubspot_pipeline.py +++ b/sources/hubspot_pipeline.py @@ -54,6 +54,39 @@ def load_crm_data_with_history() -> None: print(info) +def load_crm_objects_with_custom_properties() -> None: + """ + Loads CRM objects, reading only properties defined by the user. + """ + + # Create a DLT pipeline object with the pipeline name, + # dataset name, properties to read and destination database + # type Add full_refresh=(True or False) if you need your + # pipeline to create the dataset in your destination + p = dlt.pipeline( + pipeline_name="hubspot_pipeline", + dataset_name="hubspot", + destination="postgres", + ) + + source = hubspot() + + # By default, all the custom properties of a CRM object are extracted, + # ignoring those driven by Hubspot (prefixed with `hs_`). + + # To read fields in addition to the custom ones: + # source.contacts.bind(props=["date_of_birth", "degree"]) + + # To read only two particular fields: + source.contacts.bind(props=["date_of_birth", "degree"], include_custom_props=False) + + # Run the pipeline with the HubSpot source connector + info = p.run(source) + + # Print information about the pipeline run + print(info) + + def load_web_analytics_events( object_type: THubspotObjectType, object_ids: List[str] ) -> None: @@ -86,3 +119,4 @@ def load_web_analytics_events( load_crm_data() # load_crm_data_with_history() # load_web_analytics_events("company", ["7086461639", "7086464459"]) + # load_crm_objects_with_custom_properties() diff --git a/tests/hubspot/test_hubspot_source.py b/tests/hubspot/test_hubspot_source.py index af400609c..e7c5efc79 100644 --- a/tests/hubspot/test_hubspot_source.py +++ b/tests/hubspot/test_hubspot_source.py @@ -7,10 +7,12 @@ from urllib.parse import urljoin from dlt.common import pendulum +from dlt.extract.exceptions import ResourceExtractionError from dlt.sources.helpers import requests -from sources.hubspot import hubspot, hubspot_events_for_objects, contacts +from sources.hubspot import hubspot, hubspot_events_for_objects from sources.hubspot.helpers import fetch_data, BASE_URL from sources.hubspot.settings import ( + DEFAULT_CONTACT_PROPS, CRM_CONTACTS_ENDPOINT, CRM_COMPANIES_ENDPOINT, CRM_DEALS_ENDPOINT, @@ -118,15 +120,14 @@ def test_fetch_data_quotes(mock_response): @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) def test_resource_contacts_with_history(destination_name: str, mock_response) -> None: - prop_names = [p["name"] for p in mock_contacts_properties["results"]] - prop_string = ",".join(prop_names) + expected_rows = [] + expected_props = "address,annualrevenue,associatedcompanyid,associatedcompanylastupdated,city,closedate,company,company_size,country,createdate,currentlyinworkflow,date_of_birth,days_to_close,degree,email,engagements_last_meeting_booked,engagements_last_meeting_booked_campaign,engagements_last_meeting_booked_medium,engagements_last_meeting_booked_source,fax,field_of_study,first_conversion_date,first_conversion_event_name,first_deal_created_date,firstname,followercount,gender,graduation_date,hs_object_id,hubspot_owner_assigneddate,hubspot_owner_id,hubspot_team_id,hubspotscore,industry,ip_city,ip_country,ip_country_code,ip_latlon,ip_state,ip_state_code,ip_zipcode,job_function,jobtitle,kloutscoregeneral,lastmodifieddate,lastname,lifecyclestage,linkedinbio,linkedinconnections,marital_status,message,military_status,mobilephone,notes_last_contacted,notes_last_updated,notes_next_activity_date,num_associated_deals,num_contacted_notes,num_conversion_events,num_notes,num_unique_conversion_events,numemployees,owneremail,ownername,phone,photo,recent_conversion_date,recent_conversion_event_name,recent_deal_amount,recent_deal_close_date,relationship_status,salutation,school,seniority,start_date,state,surveymonkeyeventlastupdated,total_revenue,twitterbio,twitterhandle,twitterprofilephoto,webinareventlastupdated,website,work_email,zip" def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] if "/properties" in url: return mock_response(json_data=mock_contacts_properties) return mock_response(json_data=mock_contacts_with_history) - expected_rows = [] for contact in mock_contacts_with_history["results"]: for items in contact["propertiesWithHistory"].values(): # type: ignore[attr-defined] expected_rows.extend(items) @@ -138,7 +139,12 @@ def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] dataset_name="hubspot_data", full_refresh=True, ) - load_info = pipeline.run(contacts(api_key="fake_key", include_history=True)) + source = hubspot( + api_key="fake_key", + include_history=True, + ) + load_info = pipeline.run(source.with_resources("contacts")) + assert_load_info(load_info) assert m.call_count == 3 @@ -154,12 +160,18 @@ def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] call( urljoin(BASE_URL, CRM_CONTACTS_ENDPOINT), headers=ANY, - params={"properties": prop_string, "limit": 100}, + params={ + "properties": expected_props, + "limit": 100, + }, ), call( urljoin(BASE_URL, CRM_CONTACTS_ENDPOINT), headers=ANY, - params={"propertiesWithHistory": prop_string, "limit": 50}, + params={ + "propertiesWithHistory": expected_props, + "limit": 50, + }, ), ] ) @@ -169,6 +181,156 @@ def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] } +@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) +def test_too_many_properties(destination_name: str) -> None: + with pytest.raises(ResourceExtractionError): + source = hubspot(api_key="fake_key", include_history=True) + source.contacts.bind(props=["property"] * 500) + list(source.with_resources("contacts")) + + +@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) +def test_only_users_properties(destination_name: str, mock_response) -> None: + def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] + if "/properties" in url: + return mock_response(json_data=mock_contacts_properties) + return mock_response(json_data=mock_contacts_with_history) + + expected_props = "prop1,prop2,prop3" + props = ["prop1", "prop2", "prop3"] + + pipeline = dlt.pipeline( + pipeline_name="hubspot", + destination=destination_name, + dataset_name="hubspot_data", + full_refresh=True, + ) + source = hubspot(api_key="fake_key") + source.contacts.bind(props=props, include_custom_props=False) + + with patch("dlt.sources.helpers.requests.get", side_effect=fake_get) as m: + load_info = pipeline.run(source.with_resources("contacts")) + + assert_load_info(load_info) + + m.assert_has_calls( + [ + call( + urljoin(BASE_URL, CRM_CONTACTS_ENDPOINT), + headers=ANY, + params={ + "properties": expected_props, + "limit": 100, + }, + ), + ] + ) + + +@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) +def test_only_default_props(destination_name: str, mock_response) -> None: + def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] + if "/properties" in url: + return mock_response(json_data=mock_contacts_properties) + return mock_response(json_data=mock_contacts_with_history) + + expected_props = ",".join(DEFAULT_CONTACT_PROPS) + + pipeline = dlt.pipeline( + pipeline_name="hubspot", + destination=destination_name, + dataset_name="hubspot_data", + full_refresh=True, + ) + source = hubspot(api_key="fake_key") + source.contacts.bind(include_custom_props=False) + + with patch("dlt.sources.helpers.requests.get", side_effect=fake_get) as m: + load_info = pipeline.run(source.with_resources("contacts")) + + assert_load_info(load_info) + + m.assert_has_calls( + [ + call( + urljoin(BASE_URL, CRM_CONTACTS_ENDPOINT), + headers=ANY, + params={ + "properties": expected_props, + "limit": 100, + }, + ), + ] + ) + + +@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) +def test_users_and_custom_properties(destination_name: str, mock_response) -> None: + def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] + if "/properties" in url: + return mock_response(json_data=mock_contacts_properties) + return mock_response(json_data=mock_contacts_with_history) + + expected_props = "address,annualrevenue,associatedcompanyid,associatedcompanylastupdated,city,closedate,company,company_size,country,createdate,currentlyinworkflow,date_of_birth,days_to_close,degree,email,engagements_last_meeting_booked,engagements_last_meeting_booked_campaign,engagements_last_meeting_booked_medium,engagements_last_meeting_booked_source,fax,field_of_study,first_conversion_date,first_conversion_event_name,first_deal_created_date,firstname,followercount,gender,graduation_date,hubspot_owner_assigneddate,hubspot_owner_id,hubspot_team_id,hubspotscore,industry,ip_city,ip_country,ip_country_code,ip_latlon,ip_state,ip_state_code,ip_zipcode,job_function,jobtitle,kloutscoregeneral,lastmodifieddate,lastname,lifecyclestage,linkedinbio,linkedinconnections,marital_status,message,military_status,mobilephone,notes_last_contacted,notes_last_updated,notes_next_activity_date,num_associated_deals,num_contacted_notes,num_conversion_events,num_notes,num_unique_conversion_events,numemployees,owneremail,ownername,phone,photo,prop1,prop2,prop3,recent_conversion_date,recent_conversion_event_name,recent_deal_amount,recent_deal_close_date,relationship_status,salutation,school,seniority,start_date,state,surveymonkeyeventlastupdated,total_revenue,twitterbio,twitterhandle,twitterprofilephoto,webinareventlastupdated,website,work_email,zip" + props = ["prop1", "prop2", "prop3"] + + pipeline = dlt.pipeline( + pipeline_name="hubspot", + destination=destination_name, + dataset_name="hubspot_data", + full_refresh=True, + ) + source = hubspot(api_key="fake_key") + source.contacts.bind(props=props) + + with patch("dlt.sources.helpers.requests.get", side_effect=fake_get) as m: + load_info = pipeline.run(source.with_resources("contacts")) + + assert_load_info(load_info) + + m.assert_has_calls( + [ + call( + urljoin(BASE_URL, "/crm/v3/properties/contacts"), + headers=ANY, + params=None, + ), + call( + urljoin(BASE_URL, CRM_CONTACTS_ENDPOINT), + headers=ANY, + params={ + "properties": expected_props, + "limit": 100, + }, + ), + ] + ) + + +@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) +def test_custom_only_properties(destination_name: str, mock_response) -> None: + def fake_get(url: str, *args, **kwargs) -> Any: # type: ignore[no-untyped-def] + if "/properties" in url: + return mock_response(json_data=mock_contacts_properties) + return mock_response(json_data=mock_contacts_with_history) + + pipeline = dlt.pipeline( + pipeline_name="hubspot", + destination=destination_name, + dataset_name="hubspot_data", + full_refresh=True, + ) + source = hubspot(api_key="fake_key") + + with patch("dlt.sources.helpers.requests.get", side_effect=fake_get) as m: + load_info = pipeline.run(source.with_resources("contacts")) + + assert_load_info(load_info) + + for prop in m.mock_calls[1][2]["params"]["properties"].split(","): + assert not prop.startswith("hs_") or prop == "hs_object_id" + + @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) def test_all_resources(destination_name: str) -> None: pipeline = dlt.pipeline( @@ -178,7 +340,7 @@ def test_all_resources(destination_name: str) -> None: full_refresh=True, ) load_info = pipeline.run(hubspot(include_history=True)) - print(load_info) + assert_load_info(load_info) table_names = [ t["name"] @@ -201,8 +363,9 @@ def test_all_resources(destination_name: str) -> None: # Check history tables # NOTE: this value is increasing... maybe we should start testing ranges assert load_table_counts(pipeline, *history_table_names) == { - "contacts_property_history": 17226, - "deals_property_history": 14349, + "companies_property_history": 4018, + "contacts_property_history": 5935, + "deals_property_history": 5162, } # Check property from couple of contacts against known data @@ -211,7 +374,7 @@ def test_all_resources(destination_name: str) -> None: list(row) for row in client.execute_sql( """ - SELECT ch.property_name, ch.value, ch.source_type, ch.source_type, ch.timestamp + SELECT ch.property_name, ch.value, ch.source_type, ch.timestamp FROM contacts JOIN contacts_property_history AS ch ON contacts.id = ch.object_id @@ -219,6 +382,7 @@ def test_all_resources(destination_name: str) -> None: """ ) ] + for row in rows: row[-1] = pendulum.instance(row[-1]) @@ -228,15 +392,13 @@ def test_all_resources(destination_name: str) -> None: "email", "emailmaria@hubspot.com", "API", - "API", - pendulum.parse("2022-06-15 08:51:51.399+00"), + pendulum.parse("2022-06-15 08:51:51.399"), ), ( "email", "bh@hubspot.com", "API", - "API", - pendulum.parse("2022-06-15 08:51:51.399+00"), + pendulum.parse("2022-06-15 08:51:51.399"), ), ] )