Python Developer and Educator
2019-04-24
I was tempted to title this "how to use GCF to create a simple ETL process", but that's not quite what I'm demonstrating here.
It does loosely fit the description of an ETL process - the script extracts some values from a POSTed payload, rearranges some of the values to fit a specific schema, then loads the transformed payload into a data store.
But what you're going to see here is not the heavy lifting we normally think of when we see the acronym "ETL".
And maybe that's a good thing, as it illustrates the beautiful simplicity of Google's new Cloud Function service.
I work on a data infrastructure team that already has an account and a project set up on Google Cloud Platform. That project is already associated with a data store - a BigQuery project/dataset. I'm not going to cover how to set all that up since it's out of scope here, but you can start with these docs: https://cloud.google.com/docs/
I'm currently working on a project to accept realtime event data from a media platform we work with. We expect the data to come in at a medium-to-high volume, but we're still in testing so I don't have details on how well this job will handle the volume or how well it will scale - that will come later.
What I am going to talk about is this flow, with some general info on how to build the tools I needed to handle each step:
The pieces I had to build to do this:
Before we go much further - assuming that you already have a Google Cloud project, with a BigQuery dataset, and all the permissions set up to link the two - you will also need the gcloud
command line tool. Go here and follow the steps to install:
https://cloud.google.com/sdk/docs/quickstart-macos
gcloud
is what you'll use to deploy your function to Google Cloud. Installation will update your PATH to include the Google Cloud SDK in ~/.bash_profile. You may need to go through some authorization steps using the email address you have associated with your project. You may not need to add any gcloud
components, although if you do instructions are included in the installation output.
For the example here, you should probably have these components:
In a local folder, do some of the basic setup you normally would to start a Python project:
main.py
- this wil be your scriptrequirements.txt
for any libraries you might need to installvirtualenv
to keep everything contained, particularly if you're going to test locallyIn your main.py, you are free to build your Python script in whatever way works for you. You can import any libraries you might need, you script structure can be as simple or as complex as you need it to be.
The only key requirement is that you name a function that will be the entry point for your script - that name will be how your function is referenced in the GCP dashboard, and will be used to deploy the code to GCP.
Now (finally!) let's look at some sample code:
In main.py
, I've built a simple Flask app (I love how easily Flask handles POSTs).
import json from flask import Flask, request from google.cloud import bigquery app = Flask(__name__)
I'll use this schema both to create my BigQuery table and to insert rows. This schema example includes several common column types used in BigQuery.
schema = [ bigquery.SchemaField('timestamp', 'TIMESTAMP', 'NULLABLE'), bigquery.SchemaField('event_type', 'STRING', 'NULLABLE'), bigquery.SchemaField('event_id', 'STRING', 'NULLABLE'), bigquery.SchemaField('has_insights', 'BOOLEAN', 'NULLABLE'), bigquery.SchemaField('video_insights', 'RECORD', 'REPEATED', fields=[ bigquery.SchemaField('video_id', "STRING", 'NULLABLE'), bigquery.SchemaField('video_duration', "INTEGER", 'NULLABLE'), ], ), bigquery.SchemaField('categories', 'STRING', 'REPEATED'), ] dataset = 'my_dataset' table_name = 'my_events_table'
Here's the events() function that does a few things:
def events(request): payload = {} try: payload = request.get_json() events = payload['events'] except Exception as e: response = app.response_class( response=json.dumps({'error': e.message}), status=400, mimetype='application/json' ) return response
try: create_table() except Exception as e: print("ERROR", e)
event_rows = [] for p in payload['events']: entry = construct_entry(payload, p) event_rows.append(entry)
try: insert_entries(event_rows) except Exception as e: print("Error on inserting entries: %s" % e) sys.exit()
response = app.response_class(response='', status=200) return response
Here are the utility functions that use the google.cloud.bigquery
library to do all that stuff:
def create_table(): client = bigquery.Client() dataset_ref = client.dataset(dataset) table_ref = dataset_ref.table(table_name) table = bigquery.Table(table_ref, schema=schema) table = client.create_table(table) print("Created table {}".format(table.full_table_id)) return table.full_table_id def construct_entry(payload, event): insights_list = [] if event.get('video_insights', None): for i in event['video_insights']: v = { 'id': vp.get('video_id', ''), 'time_played': vp.get('video_duration', 0), } insights_list.append(v) entry = { 'timestamp': event.get('timestamp', None), 'type': event.get('event_type', ''), 'id': event.get('event_id', ''), 'categories': event.get('categories', []), 'has_insights': event.get('has_insights', False), 'insights': insights_list, } return entry def insert_entries(event_rows): client = bigquery.Client() dataset_ref = client.dataset(dataset) table_ref = dataset_ref.table(table_name) table = bigquery.Table(table_ref, schema=schema) try: response = client.insert_rows(table, event_rows) except Exception as e: print("Error: %s" % str(e)) return False return True
google.cloud.bigquery
docs are here: https://googleapis.github.io/google-cloud-python/latest/bigquery/reference.html
Finally here's the main method that uses Flask to run the app and route requests to the events() function:
if __name__ == '__main__': app = Flask(__name__) app.route('/events', methods=['POST'])(lambda: events(request)) app.run(debug=True)
Altogether, that's only 100 lines of code! My example does work with a simplified payload, so as always, your mileage may vary.
I only have three libraries in my requirements.txt
:
Flask==1.0.2 google-cloud-bigquery==1.3.0 google-cloud-storage==1.6.0
And here's what you would expect a valid payload to look like for this example:
{ "site_id": "example.com", "another_id": "FE7169C2", "events": [ { "timestamp": "2017-07-25T09:15:36Z", "event_type": "ARTICLE_VIEW_EVENT", "event_id": "28C86A6C-B93F-4445-94D0-5926F6C0F723", "categories": ['Technology', 'Computers', 'News'], "has_insights": false }, { "timestamp": "2017-07-25T10:03:12Z", "event_type": "ARTICLE_VIEW_EVENT", "event_id": "A7ED75A5-475E-44EE-BAD9-3A57D8F547B2", "categories": ['Entertainment', 'Games'], "has_insights": true "video_insights": [ { "video_id": "video1", "video_duration": 120 } ] } ] }
To deploy from the command line, make sure you're in the folder with the function code. Then run:
gcloud beta functions deploy events --trigger-http --runtime python37 --project my-project-name
Breaking it down:
gcloud beta functions deploy
is the command that lets you create or update a Google Cloud Functionevents
is the name of the Google Cloud Function (as defined in example source code) that will be executed - you'll also see this in the dashboard--trigger-http
to generate an http endpoint where our function can receive requestsruntime
is the execution environment (there are also node.js and golang options)For more info about constructing a deployment, run:
gcloud beta functions deploy --help
When your deploy is successful, you'll see the entry point/http trigger values included in the return message, looking something like this:
entryPoint: events httpsTrigger: url: https://region-my-project-name.cloudfunctions.net/events
And that's it! If you use CURL to post a valid payload to your new endpoint, you should shortly thereafter see a few records in your BigQuery table.
I would guess that HTTP endpoints are going to be the most common use for Google Cloud Functions, but there are several other trigger types available. For more, take a look at:
https://cloud.google.com/functions/docs/calling/
You should put your code in Github or whatever your choice of repository is, but be aware that GCP also stores the most recent version of the source code. In your project, navigate to the functions dashboard, e.g.:
https://console.cloud.google.com/functions/list?project=my-project-name
And click through to your-function-name >> Source.
If you poke around your function dashboard on the GCP console, you'll also find some fun stuff like usage and activity charts, logging, and a little inline testing module.
Finally, here's a really good primer on GCF. This post is what that got me started:
Serverless Python Quickstart with Google Cloud Functions (Dustin Ingram)
Contact: barbara@mechanicalgirl.com