Tutorial: Replicating data to BigQuery

Quick Tutorial on how to automate data replication to BigQuery using Google Cloud Functions in order to build BI/ML tools

ETL
DataWarehouse
Author
Published

February 10, 2022

BigQuery can be used as Data Warehouse to gather multiple silos and build BI tools or ML models. This tutorial is very general, for details check Google documentation. I’m recording my steps using Hubspot as the source, overall it would be something like:

1- Write a function that extracts data from Hubspot (or any other source that can be pragmatically scheduled), and load it to BigQuery

2- Upload the script to Google Cloud Functions and set a routine with Cloud Scheduler

3- Handle duplicates

Cloud Functions doesn’t support R runtime, there is a way to do the same routine with R using {googleCloudRunner} but I found it harder to implement than use Functions

Create a Service Account

First, you will have to create a project and set the service account permission to the resources, at least these three: BigQuery, Functions, and Scheduler. For local development, you will need to download the Key as JSON file and place it in your project root

from google.cloud import bigquery
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/path/.secrets/project_id.json"
bqclient = bigquery.Client()

Plan the schema and relations

Sketch the relation between tables, datasets and be sure about data types. I usually define a dataset for every silo (data source)

Set the Pipeline

After replicating the first batch of data we need to define how we will update the tables. Basically, every X minutes, hours, or days, depending on the frequency of the update, take the datetime of the last updated (or created) record saved in BQ, and query records created/updated in the source after that timestamp.

def get_last_updated_object(bq_client, table_name, field_name):
  
  """
  bq_client: BigQuery Client
  table_name: str table to query
  field_name: str name of the date/datetime field to get max value
  
  return: max field_name in table_name in unix
  """
  
  checkpoint_query = f"SELECT max({field_name}) as last_updated FROM `poject_name.hubspot.{table_name}` "
  checkpoints_df = bq_client.query(checkpoint_query, project='project_name').to_dataframe()
  
  last_updated_datetime = checkpoints_df['last_updated'][0]
  
  last_updated_unix = dt.datetime.timestamp(last_updated_datetime)
  
  return last_updated_unix.__round__()

Compressing an ETL task into a single function

The idea is to extract the data, transform into dataframes, and append it to the corresponding BQ table. We will query HubSpot’s deals as an example

Extract

Hubspot API uses a pagination system for querying, so all this function does is retrieve data from a timestamp. The timestamp serves as a checkpoint, we will load only the batch of data after that checkpoint.

def get_recent_modified(url, hapikey, count, since, max_results):
  
  """
  url: str endpoint to retreive. One of deals, companies or engagements
  hapikey: str API key
  count: dbl number of object to retreive in a single call
  since: dbl unix datetime in miliseconds to retreive after
  max_results: dbl max number of records to retreive
  
  return: list with response
  """
  
  object_list = []
  get_recent_url = url
  parameter_dict = {'hapikey': hapikey, 'count': count, 'since': since}
  headers = {}
  
  # Paginate your request using offset
  has_more = True
  while has_more:
    parameters = urllib.parse.urlencode(parameter_dict)
    get_url = get_recent_url + parameters
    r = requests.get(url= get_url, headers = headers)
    response_dict = json.loads(r.text)
    has_more = response_dict['hasMore']
    object_list.extend(response_dict['results'])
    parameter_dict['offset'] = response_dict['offset']
    
    if len(object_list) >= max_results: # Exit pagination, based on whatever value you've set your max results variable to.
      print('maximum number of results exceded')
      break
  
  print(f'Done!! Found {len(object_list)} object')
  
  return object_list

Transform

Do the necessary wrangling to convert the data to a ready-to-load pd.DataFrame, and make sure about column types. In this case, I extract some association id attributes from the list response and some attributes that are in another field of the list response, I merged and convert them to the correct field types.

def extract_value(new_objects, column_names):
  
  """
  new_objects: list with http response
  column_names: list with column names to keep
  
  return: pd.DataFrame with column_names fields
  """
  
  has_associations = 'associations' in new_objects[0].keys()
  
  list_properties = []
  for obj in new_objects:
    
    if has_associations:
      associatedCompanyIds = obj['associations']['associatedCompanyIds']
      associatedVids = obj['associations']['associatedVids']
    
    props = obj['properties']
    
    saved_properties = {}
    for key, value in props.items():
      saved_properties[key] = value['value']
    
    if has_associations:
      saved_properties['associatedCompanyIds'] = associatedCompanyIds
      saved_properties['associatedVids'] = associatedVids
      
    list_properties.append(saved_properties)
  
  df_properties = pd.DataFrame(list_properties)
  
  subset_columns = df_properties.columns.intersection(set(column_names))
  
  df_properties = df_properties[subset_columns]
  
  if has_associations:
    df_properties['associatedVids'] = df_properties['associatedVids'].apply(lambda x: '-'.join(map(str, x)))
    df_properties = df_properties.explode('associatedCompanyIds')
    
  return df_properties



def parse_properties(df, columns_to_datetime, columns_to_numeric, columns_to_boolean):
  
  """
  df: pd.DataFrame
  columns_to_datetime, columns_to_numeric, columns_to_boolean: list with names of the columns to parse
  
  return: pd.DataFrame with parsed columns
  """
  
  if columns_to_datetime:
    df[columns_to_datetime] = df[columns_to_datetime].apply(pd.to_datetime, unit =  'ms')
  
  if columns_to_numeric:
    df[columns_to_numeric] = df[columns_to_numeric].apply(pd.to_numeric, errors = 'coerce')
    
  if columns_to_boolean:
    df[columns_to_boolean] = df[columns_to_boolean].replace({'true': True, 'false': False}).astype('boolean')
  
  return df
def clean_deals(deals_list):
  
  """
  deals_list: list with http response
  
  return: pd.DataFrame with necessary columns and correct types
  """
  
  deal_properties = ['amount_in_home_currency', 'closed_lost_reason', 'closedate', 'createdate', 'days_to_close', 'deal_currency_code', 
                     'dealname', 'dealstage', 'dealtype', 'hs_is_closed', 'hs_is_closed_won', 'hs_lastmodifieddate', 'hs_object_id', 
                     'associatedCompanyIds', 'associatedVids', 'pipeline']
  
  deals = extract_value(deals_list, deal_properties)
  deals = parse_properties(
    deals,
    columns_to_datetime = ['createdate', 'closedate', 'hs_lastmodifieddate'],
    columns_to_numeric = ['days_to_close', 'hs_object_id', 'amount_in_home_currency'],
    columns_to_boolean = ['hs_is_closed', 'hs_is_closed_won']
    )
    
  return deals

Load

Use the BigQuery API to load_table_from_dataframe(), previously define the schema to avoid data type errors between the upload batch and the BQ table.

# Example Schema
example_schema = [
  bigquery.SchemaField("amount_in_home_currency", "FLOAT"),
  bigquery.SchemaField("closed_lost_reason", "STRING"),
  bigquery.SchemaField("closedate", "DATE"),
  bigquery.SchemaField("days_to_close", "INTEGER"))]
  
  
def load_table_from_dataframe_safely(bq_client, df: pd.DataFrame, table_id: str, table_schema: list):
    
    """
    df: dataframe to upload to BigQuery
    table_id: id of table in BigQuery, in should consist of project.dataset.table
    table_schema: list of .SchemaField element for every column in the table
    
    return: nothing, it uploads the df to BQ in the table_id destination
    """
    
    if df.empty:
        return(print(f'Empty Table, there are not new records in {table_id}'))
    else:
        job_config = bigquery.LoadJobConfig(schema=table_schema)
        job = bq_client.load_table_from_dataframe(df, table_id, job_config=job_config)
        return(job.result())

Include all into a single function

Cloud function executes a function, not a script, so all the helpers functions need to be used in one function. There are three places where helpers can live:

  • In the same script as the main function (you will define what is the function that GCP will execute, so it doesn’t matter to have variables and helpers in the same script)

  • In a different script that lives in a folder at the same level directory that main.py

  • Build a library and add it to dependencies

I think 3rd one is the best choice but adds complexity to the process, so we will go with the 2nd one to be able to use the helpers in local development

The final main.py will be like

########################################## Libraries
import pandas as pd
import datetime as dt
from google.cloud import bigquery
import os
from helpers import hs_helpers as hs_help

########################################## BigQuery Connection
bqclient = bigquery.Client()

def hubspot_replication():
  
  apikey = os.environ.get('hs_apikey')
  
  last_updated_deal = hs_help.get_last_updated_object(bqclient, 'deals', 'las_updated_column')
  
  recent_deals_list=hs_help.get_recent_modified("https://api.hubapi.com/deals/v1/deal/recent/modified?",
                                        apikey, count = 100, since=last_updated_deal*1000, max_results=100000)
  
  recent_deals = hs_help.clean_deals(recent_deals_list)
  
  example_schema = [
    bigquery.SchemaField("amount_in_home_currency", "FLOAT"),
    bigquery.SchemaField("closed_lost_reason", "STRING"),
    bigquery.SchemaField("closedate", "DATE"),
    bigquery.SchemaField("days_to_close", "INTEGER"))]
 
  
  hs_help.load_table_from_dataframe_safely(bqclient, recent_deals, 'project_name.dataset_name.table_name', example_schema)
  
  return 'Data pull complete'

Note that you don’t need to call the JSON with the Service Account key since you will define what Service Account will execute this function later

Setting scripts and routine in GCP

There are multiple options to deploy the function, the most straightforward is to upload a .zip from the console, you can find details here

You have two options to include the passwords and API keys that the function needs: Using Secrets (recommended) or set a Environment Variables. Both are very easy to implement and can be configured from the console at deployment time

After you try your function you can create a schedule that triggers the HTTP function

Handling duplicates

BigQuery doesn’t support index like RDBMS so we can add duplicates to a table, to avoid this in the reports you have two options:

1- Drop records that appear in both new batch and persistent BQ table before uploading the batch. You can implement something like this before calling load_table_from_dataframe_safely:

def drop_duplicates(bq_client, table_name, ids):
  
  collapsed_ids = ', '.join(map(str, ids))
  
  query = f"""DELETE hubspot.{table_name} WHERE id in ({collapsed_ids})"""

  query_job = bq_client.query(query)
  
  return query_job.result()

2- Creating Views or Rewriting tables removing duplicates with SQL sentence

SELECT
  * EXCEPT(row_number)
FROM (
  SELECT
    *,
    ROW_NUMBER()
          OVER (PARTITION BY ID_COLUMN) row_number
  FROM
    `TABLE_NAME`)
WHERE
  row_number = 1