Using Google Cloud Functions to Create a Simple POST Endpoint
Python Google Cloud Platform | 2019-04-24 |
I was tempted to title this "how to use GCF to create a simple ETL process", but that's not quite what I'm demonstrating here.
It does loosely fit the description of an ETL process - the script extracts some values from a POSTed payload, rearranges some of the values to fit a specific schema, then loads the transformed payload into a data store.
But what you're going to see here is not the heavy lifting we normally think of when we see the acronym "ETL".
And maybe that's a good thing, as it illustrates the beautiful simplicity of Google's new Cloud Function service.
Some background:
I work on a data infrastructure team that already has an account and a project set up on Google Cloud Platform. That project is already associated with a data store - a BigQuery project/dataset. I'm not going to cover how to set all that up since it's out of scope here, but you can start with these docs: https://cloud.google.com/docs/
I'm currently working on a project to accept realtime event data from a media platform we work with. We expect the data to come in at a medium-to-high volume, but we're still in testing so I don't have details on how well this job will handle the volume or how well it will scale - that will come later.
The project:
What I am going to talk about is this flow, with some general info on how to build the tools I needed to handle each step:
- vendor POSTs the event data payload to my http endpoint
- we receive, validate, and transform the payload data
- we write that data to a BigQuery table
The pieces I had to build to do this:
- a Google Cloud Function
- a BigQuery table
gcloud:
Before we go much further - assuming that you already have a Google Cloud project, with a BigQuery dataset, and all the permissions set up to link the two - you will also need the gcloud
command line tool. Go here and follow the steps to install:
https://cloud.google.com/sdk/docs/quickstart-macos
gcloud
is what you'll use to deploy your function to Google Cloud. Installation will update your PATH to include the Google Cloud SDK in ~/.bash_profile. You may need to go through some authorization steps using the email address you have associated with your project. You may not need to add any gcloud
components, although if you do instructions are included in the installation output.
For the example here, you should probably have these components:
- BigQuery Command Line Tool
- Cloud SDK Core Libraries
- Cloud Storage Command Line Tool
Setting up the script:
In a local folder, do some of the basic setup you normally would to start a Python project:
- create a
main.py
- this wil be your script - add a
requirements.txt
for any libraries you might need to install - use
virtualenv
to keep everything contained, particularly if you're going to test locally
In your main.py, you are free to build your Python script in whatever way works for you. You can import any libraries you might need, you script structure can be as simple or as complex as you need it to be.
The only key requirement is that you name a function that will be the entry point for your script - that name will be how your function is referenced in the GCP dashboard, and will be used to deploy the code to GCP.
The code:
Now (finally!) let's look at some sample code:
In main.py
, I've built a simple Flask app (I love how easily Flask handles POSTs).
import json from flask import Flask, request from google.cloud import bigquery app = Flask(__name__)
I'll use this schema both to create my BigQuery table and to insert rows. This schema example includes several common column types used in BigQuery.
schema = [ bigquery.SchemaField('timestamp', 'TIMESTAMP', 'NULLABLE'), bigquery.SchemaField('event_type', 'STRING', 'NULLABLE'), bigquery.SchemaField('event_id', 'STRING', 'NULLABLE'), bigquery.SchemaField('has_insights', 'BOOLEAN', 'NULLABLE'), bigquery.SchemaField('video_insights', 'RECORD', 'REPEATED', fields=[ bigquery.SchemaField('video_id', "STRING", 'NULLABLE'), bigquery.SchemaField('video_duration', "INTEGER", 'NULLABLE'), ], ), bigquery.SchemaField('categories', 'STRING', 'REPEATED'), ] dataset = 'my_dataset' table_name = 'my_events_table'
Here's the events() function that does a few things:
- simple validation of the POST contents, the 'payload' (make sure it's JSON and that 'events' are included)
def events(request): payload = {} try: payload = request.get_json() events = payload['events'] except Exception as e: response = app.response_class( response=json.dumps({'error': e.message}), status=400, mimetype='application/json' ) return response
- create the table in BigQuery (or not - this can be done in a few different ways, I've just included the example here as a convenience)
try: create_table() except Exception as e: print("ERROR", e)
- for each event, extract a few values, add them to a dict, add that dict to a new list (this is the 'transform' part of our mini-ETL)
event_rows = [] for p in payload['events']: entry = construct_entry(payload, p) event_rows.append(entry)
- take that list of transformed events and insert them into the BigQuery table all at once
try: insert_entries(event_rows) except Exception as e: print("Error on inserting entries: %s" % e) sys.exit()
- and if it's all successful, return a 200
response = app.response_class(response='', status=200) return response
Here are the utility functions that use the google.cloud.bigquery
library to do all that stuff:
- create a BigQuery table
- transform each event into an entry
- load the list of entries into the BigQuery table
def create_table(): client = bigquery.Client() dataset_ref = client.dataset(dataset) table_ref = dataset_ref.table(table_name) table = bigquery.Table(table_ref, schema=schema) table = client.create_table(table) print("Created table {}".format(table.full_table_id)) return table.full_table_id def construct_entry(payload, event): insights_list = [] if event.get('video_insights', None): for i in event['video_insights']: v = { 'id': vp.get('video_id', ''), 'time_played': vp.get('video_duration', 0), } insights_list.append(v) entry = { 'timestamp': event.get('timestamp', None), 'type': event.get('event_type', ''), 'id': event.get('event_id', ''), 'categories': event.get('categories', []), 'has_insights': event.get('has_insights', False), 'insights': insights_list, } return entry def insert_entries(event_rows): client = bigquery.Client() dataset_ref = client.dataset(dataset) table_ref = dataset_ref.table(table_name) table = bigquery.Table(table_ref, schema=schema) try: response = client.insert_rows(table, event_rows) except Exception as e: print("Error: %s" % str(e)) return False return True
google.cloud.bigquery
docs are here: https://googleapis.github.io/google-cloud-python/latest/bigquery/reference.html
Finally here's the main method that uses Flask to run the app and route requests to the events() function:
if __name__ == '__main__': app = Flask(__name__) app.route('/events', methods=['POST'])(lambda: events(request)) app.run(debug=True)
Altogether, that's only 100 lines of code! My example does work with a simplified payload, so as always, your mileage may vary.
I only have three libraries in my requirements.txt
:
Flask==1.0.2 google-cloud-bigquery==1.3.0 google-cloud-storage==1.6.0
The sample payload:
And here's what you would expect a valid payload to look like for this example:
{ "site_id": "example.com", "another_id": "FE7169C2", "events": [ { "timestamp": "2017-07-25T09:15:36Z", "event_type": "ARTICLE_VIEW_EVENT", "event_id": "28C86A6C-B93F-4445-94D0-5926F6C0F723", "categories": ['Technology', 'Computers', 'News'], "has_insights": false }, { "timestamp": "2017-07-25T10:03:12Z", "event_type": "ARTICLE_VIEW_EVENT", "event_id": "A7ED75A5-475E-44EE-BAD9-3A57D8F547B2", "categories": ['Entertainment', 'Games'], "has_insights": true "video_insights": [ { "video_id": "video1", "video_duration": 120 } ] } ] }
Deploying:
To deploy from the command line, make sure you're in the folder with the function code. Then run:
gcloud beta functions deploy events --trigger-http --runtime python37 --project my-project-name
Breaking it down:
gcloud beta functions deploy
is the command that lets you create or update a Google Cloud Functionevents
is the name of the Google Cloud Function (as defined in example source code) that will be executed - you'll also see this in the dashboard- we specify
--trigger-http
to generate an http endpoint where our function can receive requests runtime
is the execution environment (there are also node.js and golang options)- finally, you need to include your project name
For more info about constructing a deployment, run:
gcloud beta functions deploy --help
When your deploy is successful, you'll see the entry point/http trigger values included in the return message, looking something like this:
entryPoint: events httpsTrigger: url: https://region-my-project-name.cloudfunctions.net/events
And that's it! If you use CURL to post a valid payload to your new endpoint, you should shortly thereafter see a few records in your BigQuery table.
Etcetera:
I would guess that HTTP endpoints are going to be the most common use for Google Cloud Functions, but there are several other trigger types available. For more, take a look at:
https://cloud.google.com/functions/docs/calling/
You should put your code in Github or whatever your choice of repository is, but be aware that GCP also stores the most recent version of the source code. In your project, navigate to the functions dashboard, e.g.:
https://console.cloud.google.com/functions/list?project=my-project-name
And click through to your-function-name >> Source.
If you poke around your function dashboard on the GCP console, you'll also find some fun stuff like usage and activity charts, logging, and a little inline testing module.
Finally, here's a really good primer on GCF. This post is what that got me started:
Serverless Python Quickstart with Google Cloud Functions (Dustin Ingram)