Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GA4 compatibility to Google Analytics connector #45

Merged
merged 24 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
88ef47e
ADD: added if __name__ block for testing
chris-greening Feb 3, 2023
ceb33ca
ADD: added environment setup to if __name__
chris-greening Feb 3, 2023
6c3cce4
FIX: fixed docs and var name to reflect Google Analytics connection
chris-greening Feb 3, 2023
66308d5
ADD: added ga4 flag to connect_to_google_analytics
chris-greening Feb 3, 2023
50f5f26
ADD: added call to BetaAnalyticsDataClient for Data API
chris-greening Feb 3, 2023
1f461b0
ADD: added _process_universal_analytics_data function
chris-greening Feb 3, 2023
20cb5b1
ADD: added reference to _process_universal_analytics_data
chris-greening Feb 3, 2023
1c12bb9
ADD: added _process_ga4_data connector
chris-greening Feb 3, 2023
81f3086
ADD: added processing_func ternary op for setting which processor to use
chris-greening Feb 6, 2023
f66d7d7
MODIFY: renamed _request_google_analytics_data -> _request_universal_…
chris-greening Feb 6, 2023
e69852e
MODIFY: renamed _process_raw_google_analytics_data -> _process_raw_un…
chris-greening Feb 6, 2023
d160edb
MODIFY: renamed function def to universal analytics
chris-greening Feb 6, 2023
0db8252
ADD: created utils directory for processing functions and module for …
chris-greening Feb 6, 2023
ca698af
ADD: moved all UA code over into separate module
chris-greening Feb 6, 2023
a3ff59d
MODIFY: moved appropriate imports over to refactored _universal_analy…
chris-greening Feb 6, 2023
42621ff
FIX: added correct import for process_universal_analytics_data
chris-greening Feb 6, 2023
0ea1b12
MODIFY: moved GA4 processing code over into its own module
chris-greening Feb 7, 2023
6f46a4c
REFACTOR: rewrote ternary op as if-else block for readability
chris-greening Feb 7, 2023
52f7d5f
ADD: added import of RunReportRequest and various other necessary obj…
chris-greening Feb 7, 2023
4602c82
ADD: added request and response call
chris-greening Feb 7, 2023
9a107d1
ADD: added DataFrame creation and processing and returned back throug…
chris-greening Feb 7, 2023
54c582a
ADD: added warning about Universal Analytics sunset
chris-greening Feb 7, 2023
17e8074
ADD: added install requirement of google-analytics-data
chris-greening Feb 7, 2023
915040f
FIX: fixed next_page_token default arg as None
chris-greening Feb 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 30 additions & 174 deletions route1io_connectors/google/google_analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

This module contains functions for interacting with Google Analytics reporting
"""
from typing import List, Union, Dict, Tuple
import itertools
from typing import List
import warnings

import numpy as np
import pandas as pd
from googleapiclient.discovery import build
from google.analytics.data_v1beta import BetaAnalyticsDataClient

def connect_to_google_analytics(credentials: "google.oauth2.credentials.Credentials"
) -> "googleapiclient.discovery.Resource":
from .utils import _universal_analytics, _ga4

def connect_to_google_analytics(
credentials: "google.oauth2.credentials.Credentials",
ga4: bool = False
) -> "googleapiclient.discovery.Resource":
"""Return a connection to Google Drive

Parameters
Expand All @@ -20,11 +23,15 @@ def connect_to_google_analytics(credentials: "google.oauth2.credentials.Credenti

Returns
-------
google_drive_conn : googleapiclient.discovery.Resource
Connection to Google Drive API
google_conn : googleapiclient.discovery.Resource
Connection to Google Analytics API
"""
google_drive_conn = build('analyticsreporting', 'v4', credentials=credentials)
return google_drive_conn
if ga4:
google_conn = BetaAnalyticsDataClient(credentials=credentials)
else:
google_conn = build('analyticsreporting', 'v4', credentials=credentials)
warnings.warn("Google is sunsetting Universal Analytics on July 1st, 2023 and is recommending you migrate to Google Analytics 4. More information can be found here: https://support.google.com/analytics/answer/11583528?hl=en", DeprecationWarning)
return google_conn

def get_google_analytics_data(
analytics,
Expand Down Expand Up @@ -56,168 +63,17 @@ def get_google_analytics_data(
-------
df : pd.DataFrame
"""
resp_df_arr = []
next_page_token = None
while True:
resp = _request_google_analytics_data(
analytics=analytics,
view_id=view_id,
dimensions=dimensions,
metrics=metrics,
start_date=start_date,
end_date=end_date,
next_page_token=next_page_token
)
resp_df = _process_raw_google_analytics_data(resp=resp)
resp_df_arr.append(resp_df)

next_page_token = _get_next_page_token(resp=resp)
if next_page_token is None:
break

df = pd.concat(resp_df_arr)
return df

def _get_next_page_token(resp: Dict[str, str]) -> Union[str, None]:
"""Return Boolean indicating if paginated data exists"""
return resp["reports"][0].get("nextPageToken")

def _request_google_analytics_data(
analytics,
view_id: str,
dimensions: List[str] = None,
metrics: List[str] = None,
start_date: str = "7daysAgo",
end_date: str = "today",
next_page_token = Union[str, None]
) -> Dict[str, Union[str, List, Dict, bool]]:
"""Returns response from reporting request to the Google Analytics Reporting API
built from arguments

Parameters
----------
view_id : str
View ID that we want to view
dimensions : List[str]
List of dimensions
https://ga-dev-tools.web.app/dimensions-metrics-explorer/
metrics : List[str]
List of metrics
https://ga-dev-tools.web.app/dimensions-metrics-explorer/
start_date : str
Dynamic preset such as 7daysago or YYYY-MM-DD
end_date : str
Dynamic preset such as today or YYYY-MM-DD

Returns
-------
resp : Dict[str, Union[str, List, Dict, bool]]
"""
return analytics.reports().batchGet(
body={'reportRequests': _process_report_requests(
view_id=view_id,
dimensions=dimensions,
metrics=metrics,
start_date=start_date,
end_date=end_date,
next_page_token=next_page_token
)}
).execute()

def _process_raw_google_analytics_data(resp: Dict[str, Union[str, List, Dict, bool]]) -> "pd.DataFrame":
""" Return a DataFrame parsed and constructed from the raw response from
Google Analytics"""
resp_data = resp['reports'][0]
columns_metadata = _process_columns(resp_data['columnHeader'])
columns = list(columns_metadata)
values = _process_rows(resp_data['data'])
df = pd.DataFrame(values, columns=columns)
df = df.astype(columns_metadata)
return df

def _process_rows(values_resp) -> List[List[str]]:
"""Return list of lists containing values parsed from API response"""
rows = values_resp['rows']
processed_rows = []
for row in rows:
try:
dimensions = row['dimensions']
except KeyError:
dimensions = []

metrics = [metric['values'] for metric in row['metrics']]
metrics = list(itertools.chain.from_iterable(metrics))

processed_rows.append([*dimensions, *metrics])
return processed_rows

def _process_columns(column_header_resp: Dict[str, str]) -> List[Tuple[str]]:
"""Return a dictionary containing column name and associated dtype as parsed
from the Google Analytics API
"""
dimensions_cols = _process_dimensions_columns(column_header_resp=column_header_resp)
metrics_cols = _process_metrics_columns(column_header_resp=column_header_resp)
columns_metadata = [*dimensions_cols, *metrics_cols]
return {key.replace("ga:", ""): val for key, val in columns_metadata}

def _process_metrics_columns(column_header_resp) -> List[Tuple]:
"""Return list of tuple's containing metrics and their associated dtype"""
metrics_col_data = column_header_resp['metricHeader']['metricHeaderEntries']
metrics_cols = [(metric['name'], _lookup_dtype(metric['type']))
for metric in metrics_col_data]
return metrics_cols

def _process_dimensions_columns(column_header_resp) -> List[Tuple[str, str]]:
"""Return list of tuple's containing dimensions and their associated dtype"""
try:
dimensions_col_data = column_header_resp['dimensions']
except KeyError:
dimensions_cols = []
is_ga4_data = isinstance(analytics, BetaAnalyticsDataClient)
if is_ga4_data:
processing_func = _ga4.process_ga4_data
else:
dimensions_cols = [(dimension, str) for dimension in dimensions_col_data]
return dimensions_cols

def _lookup_dtype(resp_type: str) -> Dict[str, str]:
"""Return dtype for pd.DataFrame associated with column as determined
from the API response
"""
dtypes = {
"INTEGER": np.int32,
"FLOAT": np.float32,
"TIME": str,
"CURRENCY": np.float32
}
return dtypes[resp_type]

def _process_report_requests(
view_id: str,
dimensions: Union[List[str], None],
metrics: Union[List[str], None],
start_date: str,
end_date: str,
next_page_token: Union[str, None]
) -> Dict[str, str]:
"""Return a dictionary containing formatted data request to Google Analytics
API"""
report_requests = {
"viewId": f"ga:{view_id}",
"dateRanges": [{"startDate": start_date, "endDate": end_date}],
"pageSize": 100_000
}
if next_page_token is not None:
report_requests["pageToken"] = next_page_token
if dimensions is not None:
report_requests['dimensions'] = _process_dimensions(dimensions)
if metrics is not None:
report_requests['metrics'] = _process_metrics(metrics)
return [report_requests]

def _process_dimensions(dimensions: List[str]) -> List[Dict[str, str]]:
"""Return list of dictionary's containing the dimensions formatted for Google
Analytics Reporting API to accept the request"""
return [{"name": f"ga:{dimension}"} for dimension in dimensions]

def _process_metrics(metrics: List[str]) -> List[Dict[str, str]]:
"""Return list of dictionary's containing the metrics formatted for Google
Analytics Reporting API to accept the request"""
return [{"expression": f"ga:{metric}"} for metric in metrics]
processing_func = _universal_analytics.process_universal_analytics_data
df = processing_func(
analytics=analytics,
view_id=view_id,
dimensions=dimensions,
metrics=metrics,
start_date=start_date,
end_date=end_date
)
return df
79 changes: 79 additions & 0 deletions route1io_connectors/google/utils/_ga4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Code for requesting and processing data from GA4"""

from typing import List, Dict
import itertools

import pandas as pd
import numpy as np
from google.analytics.data_v1beta.types import (
DateRange,
Dimension,
Metric,
RunReportRequest,
)

def process_ga4_data(
analytics,
view_id: str,
dimensions: List[str] = None,
metrics: List[str] = None,
start_date: str = "7daysAgo",
end_date: str = "today"
) -> "pd.DataFrame":
"""Return pd.DataFrame of GA4 data pulled via the
Google Analytics Data API"""
resp = _request_ga4_data(
analytics=analytics,
view_id=view_id,
dimensions=dimensions,
metrics=metrics,
start_date=start_date,
end_date=end_date
)
resp_df = _process_raw_ga4_data(resp=resp)
return resp_df

def _process_raw_ga4_data(resp) -> "pd.DataFrame":
"""Return a DataFrame containing the processed data extracted from GA4"""
rows = []
keys = _build_list_from_resp(resp.dimension_headers, resp.metric_headers, attr_name = "name")
metric_dtypes = _build_metric_type_list_from_resp(resp)
for row in resp.rows:
values = _build_list_from_resp(row.dimension_values, row.metric_values, attr_name = "value")
row_dict = dict(zip(keys, values))
rows.append(row_dict)
df = pd.DataFrame(rows)
df = df.astype(metric_dtypes)
return df

def _build_list_from_resp(*args, attr_name: str) -> List[str]:
"""Return list of strings of values parsed from header information in response"""
return [getattr(val, attr_name) for val in list(itertools.chain.from_iterable(args))]

def _build_metric_type_list_from_resp(resp) -> Dict[str, str]:
"""Return a dict of strings detailing data type of the returned metric"""
return {val.name: _lookup_dtype(val.type_.name) for val in resp.metric_headers}

def _lookup_dtype(resp_type: str) -> str:
"""Return dtype for pd.DataFrmae column associated with Google's provided dtype"""
dtype_lookup_table = {
"TYPE_INTEGER": np.int32
}
return dtype_lookup_table.get(resp_type, str)

def _request_ga4_data(
analytics,
view_id: str,
dimensions: List[str] = None,
metrics: List[str] = None,
start_date: str = "7daysAgo",
end_date: str = "today"
):
"""Return response from reporting request to Google Analytics Data API"""
request = RunReportRequest(
property=f"properties/{view_id}",
dimensions=[Dimension(name=dim) for dim in dimensions],
metrics=[Metric(name=metric) for metric in metrics],
date_ranges=[DateRange(start_date=start_date, end_date=end_date)],
)
return analytics.run_report(request)
Loading