from google.cloud import bigquery
import os
"GOOGLE_APPLICATION_CREDENTIALS"] = "/path/.secrets/project_id.json"
os.environ[= bigquery.Client() bqclient
BigQuery can be used as Data Warehouse to gather multiple silos and build BI tools or ML models. This tutorial is very general, for details check Google documentation. I’m recording my steps using Hubspot as the source, overall it would be something like:
1- Write a function that extracts data from Hubspot (or any other source that can be pragmatically scheduled), and load it to BigQuery
2- Upload the script to Google Cloud Functions and set a routine with Cloud Scheduler
3- Handle duplicates
Cloud Functions doesn’t support R runtime, there is a way to do the same routine with R using {googleCloudRunner} but I found it harder to implement than use Functions
Create a Service Account
First, you will have to create a project and set the service account permission to the resources, at least these three: BigQuery, Functions, and Scheduler. For local development, you will need to download the Key as JSON file and place it in your project root
Plan the schema and relations
Sketch the relation between tables, datasets and be sure about data types. I usually define a dataset for every silo (data source)
Set the Pipeline
After replicating the first batch of data we need to define how we will update the tables. Basically, every X minutes, hours, or days, depending on the frequency of the update, take the datetime of the last updated (or created) record saved in BQ, and query records created/updated in the source after that timestamp.
def get_last_updated_object(bq_client, table_name, field_name):
"""
bq_client: BigQuery Client
table_name: str table to query
field_name: str name of the date/datetime field to get max value
return: max field_name in table_name in unix
"""
= f"SELECT max({field_name}) as last_updated FROM `poject_name.hubspot.{table_name}` "
checkpoint_query = bq_client.query(checkpoint_query, project='project_name').to_dataframe()
checkpoints_df
= checkpoints_df['last_updated'][0]
last_updated_datetime
= dt.datetime.timestamp(last_updated_datetime)
last_updated_unix
return last_updated_unix.__round__()
Compressing an ETL task into a single function
The idea is to extract the data, transform into dataframes, and append it to the corresponding BQ table. We will query HubSpot’s deals as an example
Extract
Hubspot API uses a pagination system for querying, so all this function does is retrieve data from a timestamp. The timestamp serves as a checkpoint, we will load only the batch of data after that checkpoint.
def get_recent_modified(url, hapikey, count, since, max_results):
"""
url: str endpoint to retreive. One of deals, companies or engagements
hapikey: str API key
count: dbl number of object to retreive in a single call
since: dbl unix datetime in miliseconds to retreive after
max_results: dbl max number of records to retreive
return: list with response
"""
= []
object_list = url
get_recent_url = {'hapikey': hapikey, 'count': count, 'since': since}
parameter_dict = {}
headers
# Paginate your request using offset
= True
has_more while has_more:
= urllib.parse.urlencode(parameter_dict)
parameters = get_recent_url + parameters
get_url = requests.get(url= get_url, headers = headers)
r = json.loads(r.text)
response_dict = response_dict['hasMore']
has_more 'results'])
object_list.extend(response_dict['offset'] = response_dict['offset']
parameter_dict[
if len(object_list) >= max_results: # Exit pagination, based on whatever value you've set your max results variable to.
print('maximum number of results exceded')
break
print(f'Done!! Found {len(object_list)} object')
return object_list
Transform
Do the necessary wrangling to convert the data to a ready-to-load pd.DataFrame, and make sure about column types. In this case, I extract some association id attributes from the list response and some attributes that are in another field of the list response, I merged and convert them to the correct field types.
def extract_value(new_objects, column_names):
"""
new_objects: list with http response
column_names: list with column names to keep
return: pd.DataFrame with column_names fields
"""
= 'associations' in new_objects[0].keys()
has_associations
= []
list_properties for obj in new_objects:
if has_associations:
= obj['associations']['associatedCompanyIds']
associatedCompanyIds = obj['associations']['associatedVids']
associatedVids
= obj['properties']
props
= {}
saved_properties for key, value in props.items():
= value['value']
saved_properties[key]
if has_associations:
'associatedCompanyIds'] = associatedCompanyIds
saved_properties['associatedVids'] = associatedVids
saved_properties[
list_properties.append(saved_properties)
= pd.DataFrame(list_properties)
df_properties
= df_properties.columns.intersection(set(column_names))
subset_columns
= df_properties[subset_columns]
df_properties
if has_associations:
'associatedVids'] = df_properties['associatedVids'].apply(lambda x: '-'.join(map(str, x)))
df_properties[= df_properties.explode('associatedCompanyIds')
df_properties
return df_properties
def parse_properties(df, columns_to_datetime, columns_to_numeric, columns_to_boolean):
"""
df: pd.DataFrame
columns_to_datetime, columns_to_numeric, columns_to_boolean: list with names of the columns to parse
return: pd.DataFrame with parsed columns
"""
if columns_to_datetime:
= df[columns_to_datetime].apply(pd.to_datetime, unit = 'ms')
df[columns_to_datetime]
if columns_to_numeric:
= df[columns_to_numeric].apply(pd.to_numeric, errors = 'coerce')
df[columns_to_numeric]
if columns_to_boolean:
= df[columns_to_boolean].replace({'true': True, 'false': False}).astype('boolean')
df[columns_to_boolean]
return df
def clean_deals(deals_list):
"""
deals_list: list with http response
return: pd.DataFrame with necessary columns and correct types
"""
= ['amount_in_home_currency', 'closed_lost_reason', 'closedate', 'createdate', 'days_to_close', 'deal_currency_code',
deal_properties 'dealname', 'dealstage', 'dealtype', 'hs_is_closed', 'hs_is_closed_won', 'hs_lastmodifieddate', 'hs_object_id',
'associatedCompanyIds', 'associatedVids', 'pipeline']
= extract_value(deals_list, deal_properties)
deals = parse_properties(
deals
deals,= ['createdate', 'closedate', 'hs_lastmodifieddate'],
columns_to_datetime = ['days_to_close', 'hs_object_id', 'amount_in_home_currency'],
columns_to_numeric = ['hs_is_closed', 'hs_is_closed_won']
columns_to_boolean
)
return deals
Load
Use the BigQuery API to load_table_from_dataframe()
, previously define the schema to avoid data type errors between the upload batch and the BQ table.
# Example Schema
= [
example_schema "amount_in_home_currency", "FLOAT"),
bigquery.SchemaField("closed_lost_reason", "STRING"),
bigquery.SchemaField("closedate", "DATE"),
bigquery.SchemaField("days_to_close", "INTEGER"))]
bigquery.SchemaField(
def load_table_from_dataframe_safely(bq_client, df: pd.DataFrame, table_id: str, table_schema: list):
"""
df: dataframe to upload to BigQuery
table_id: id of table in BigQuery, in should consist of project.dataset.table
table_schema: list of .SchemaField element for every column in the table
return: nothing, it uploads the df to BQ in the table_id destination
"""
if df.empty:
return(print(f'Empty Table, there are not new records in {table_id}'))
else:
= bigquery.LoadJobConfig(schema=table_schema)
job_config = bq_client.load_table_from_dataframe(df, table_id, job_config=job_config)
job return(job.result())
Include all into a single function
Cloud function executes a function, not a script, so all the helpers functions need to be used in one function. There are three places where helpers can live:
In the same script as the main function (you will define what is the function that GCP will execute, so it doesn’t matter to have variables and helpers in the same script)
In a different script that lives in a folder at the same level directory that main.py
Build a library and add it to dependencies
I think 3rd one is the best choice but adds complexity to the process, so we will go with the 2nd one to be able to use the helpers in local development
The final main.py will be like
########################################## Libraries
import pandas as pd
import datetime as dt
from google.cloud import bigquery
import os
from helpers import hs_helpers as hs_help
########################################## BigQuery Connection
= bigquery.Client()
bqclient
def hubspot_replication():
= os.environ.get('hs_apikey')
apikey
= hs_help.get_last_updated_object(bqclient, 'deals', 'las_updated_column')
last_updated_deal
=hs_help.get_recent_modified("https://api.hubapi.com/deals/v1/deal/recent/modified?",
recent_deals_list= 100, since=last_updated_deal*1000, max_results=100000)
apikey, count
= hs_help.clean_deals(recent_deals_list)
recent_deals
= [
example_schema "amount_in_home_currency", "FLOAT"),
bigquery.SchemaField("closed_lost_reason", "STRING"),
bigquery.SchemaField("closedate", "DATE"),
bigquery.SchemaField("days_to_close", "INTEGER"))]
bigquery.SchemaField(
'project_name.dataset_name.table_name', example_schema)
hs_help.load_table_from_dataframe_safely(bqclient, recent_deals,
return 'Data pull complete'
Note that you don’t need to call the JSON with the Service Account key since you will define what Service Account will execute this function later
Setting scripts and routine in GCP
There are multiple options to deploy the function, the most straightforward is to upload a .zip from the console, you can find details here
You have two options to include the passwords and API keys that the function needs: Using Secrets (recommended) or set a Environment Variables. Both are very easy to implement and can be configured from the console at deployment time
After you try your function you can create a schedule that triggers the HTTP function
Handling duplicates
BigQuery doesn’t support index like RDBMS so we can add duplicates to a table, to avoid this in the reports you have two options:
1- Drop records that appear in both new batch and persistent BQ table before uploading the batch. You can implement something like this before calling load_table_from_dataframe_safely
:
def drop_duplicates(bq_client, table_name, ids):
= ', '.join(map(str, ids))
collapsed_ids
= f"""DELETE hubspot.{table_name} WHERE id in ({collapsed_ids})"""
query
= bq_client.query(query)
query_job
return query_job.result()
2- Creating Views or Rewriting tables removing duplicates with SQL sentence
SELECT
* EXCEPT(row_number)
FROM (
SELECT
*,
ROW_NUMBER()
OVER (PARTITION BY ID_COLUMN) row_number
FROM
`TABLE_NAME`)WHERE
row_number = 1