Please note that the contents of this offline web site may be out of date. To access the most recent documentation visit the online version .
Note that links that point to online resources are green in color and will open in a new window.
We would love it if you could give us feedback about this material by filling this form (You have to be online to fill it)



Codelab: Extract and Transform App Engine Datastore Data for Analysis with BigQuery


"Building Data Pipelines at Google Scale," Google I/O 2012 - slides

Author: Michael Manoochehri , Updated: 06/29/2012 , Tags: Python, BigQuery, App Engine

A common design pattern for applications that collect and process a large amount of data is to couple a high performance NoSQL database (such as the App Engine Datastore) with an analytics service (such as Google BigQuery). This codelab demonstrates how to extract and transform data from the Datastore into a format suitable for loading into Google BigQuery, via the App Engine MapReduce library.

We will start by creating a mapper pipeline that iterates over each entity in the App Engine Datastore. This mapper applies a simple transformation to each entity, and writes the result of each transformation to a CSV file in Google Cloud Storage. After this data pipeline is complete, the application starts a BigQuery load job, creating a new BigQuery table from the CSV data file.

Contents

  1. Prerequisites
  2. Project Setup
    1. Create a new App Engine application
    2. Create a new project using the Google APIs Console
    3. Authorize your App Engine app to access BigQuery and Cloud Storage
    4. Create a new BigQuery dataset and a Cloud Storage bucket
    5. Download required libraries
  3. The Mapper pipeline: Iterate over Datastore data
    1. Accessing Datastore entities
    2. Importing required modules for data processing
    3. Defining a Mapper Pipeline for Datastore data
    4. Defining our Map function
    5. Converting App Engine DateTime properties into POSIX time format
  4. Loading CSV data into Google BigQuery
    1. Building a BigQuery load job configuration
  5. Running and Deploying the Application
  6. Complete Source Code and Further Steps

Prerequisites

This codelab assumes that readers have some familiarity with Google BigQuery, the Google App Engine Python runtime, as well as a basic familiarity with the App Engine Datastore.

If you haven't done so already, please read the BigQuery Getting Started and BigQuery Main Concept guides, which provide information about how the Google BigQuery service works. The App Engine Getting Started with Python 2.7 guide provides a good introduction to App Engine development, and includes a section on using the App Engine Datastore.

Project Setup

Create a new App Engine application

This codelab uses App Engine service accounts for API authorization. As of this article publication date, apps using this authorization method must be deployed to Google App Engine for testing and cannot be run on your machine in your local development environment.

  1. Create a new application using the App Engine administration console . From this point on, replace your_app_id with the application ID you choose.
  2. Visit the Application Settings page in the Administration Console at https://appengine.google.com/settings?&app;_id=your_app_id and find the service account name. The service account name should be similar to: [email protected] . We'll use this account name in the next step.
  3. Create a local directory to store your App Engine application files. During our next steps, we will install additional necessary libraries that should be added to this directory.

Create a new project using the Google APIs Console

  1. Visit the Google APIs Console , create a new APIs Console project and name it "Datastore BigQuery codelab."
  2. Under the Services tab, turn on the BigQuery API and Google Cloud Storage services.
  3. In order to create tables in Google BigQuery, we need to provide billing information. Visit the Billing tab and activate billing on the project using Google wallet.
  4. Finally, note the number of the project you just created. The project number is available in the URL of the project in the API console ( https://code.google.com/apis/console/#project: 12345XXXXXXX ). We will be using this project number to make calls to the BigQuery API.

Authorize your App Engine app to access BigQuery and Cloud Storage

API calls to BigQuery and Cloud Storage require authorization. We will be using the App Engine service account authorization method in this codelab. To set up your App Engine app to make calls to these API:

  1. Copy the App Engine service account name that we noted above (in the form of [email protected] ).
  2. Visit your API console's Team tab, and add the service account name to the project teammate with Can edit permissions.

Create a new BigQuery dataset and a Cloud Storage bucket

This codelab demonstrates the process of transforming Datastore data into CSV format, as well as loading this data into Google BigQuery. In order to provide storage for this transformed data, we need to create a Cloud Storage bucket to store the transformed CSV data. We also need to create a BigQuery dataset to hold the data tables we generate after loading this transformed CSV data into BigQuery.

  1. To create a Cloud Storage bucket, visit the API console for your project and click on the Google Cloud Storage tab. Find the link for Google Cloud Storage Manager .
  2. In the Google Cloud Storage manager, click on New Bucket , and create a bucket called datastore_csv_output . We will refer to this bucket when storing the data we extract from the App Engine Datastore.
  3. To create a new BigQuery dataset, visit the BigQuery Web UI . Using the drop-down menu in the left sidebar, select the "Datastore BigQuery Codelab" project we created earlier.
  4. In the same menu, select Create new dataset , and name your new dataset datastore_data .

Download required libraries

Apart from the Google App Engine SDK, this codelab requires the use of two additional libraries:

Download the latest version of the Google API Python client . In order to install this library, we recommend using the enable-app-engine-project script tool found in the bin directory of the Google API Python client's download bundle. This script copies all necessary Python client files needed to access Google APIs from App Engine. Run this script by passing the name of the local directory you created to store your application files.

The most up-to-date version of the App Engine Python MapReduce bundle can be acquired via svn from the project repository. To download the Python MapReduce library source, simply cd to your App Engine application directory and run:

svn checkout http://appengine-mapreduce.googlecode.com/svn/trunk/python/src/mapreduce mapreduce

To provide your application with access to the MapReduce library classes and functions, create an app.yaml file in the top level of your App Engine application directory and add the following configuration parameters and handlers (remember to replace your_app_id with the application id you created above):

application: your_app_id
version: 1
runtime: python27
api_version: 1
threadsafe: true

handlers:
- url: /mapreduce(/.*)?
  script: mapreduce/main.py

- url: /.*
  script: main.py

The Mapper pipeline: Iterate over Datastore data

This codelab uses a simple Datastore model to demonstrate how to extract and transform data into BigQuery. We will create a mock data model called ProductSalesData , representing the time when a particular product was purchased at a particular store. This model features three properties: an integer product_id , a datetime date property, and a string property containing a store name.

from google.appengine.ext import db

class ProductSalesData(db.Model):
  product_id = db.IntegerProperty(required=True)
  date = db.DateTimeProperty(verbose_name=None,
                             auto_now=True,
                             auto_now_add=True)
  store = db.StringProperty(required=True)

For convenience, let's create a simple handler to generate a few random entitles based on this Datastore model.

class AddDataHandler(webapp.RequestHandler):
  def get(self):
    for i in range(0,9):
      data = ProductSalesData(product_id=i,
                              store='Store %s' % str(i))
      self.response.out.write('Added sample Datastore entity #%s<b>' % str(i))
      data.put()
    self.response.out.write('<a href="/start">Click here</a> to start the Datastore to BigQuery pipeline.')

Accessing Datastore entities

Create a main.py file in your application directory to contain the application code. Add the following constants to this file. These constants contain information about the API console project, the BigQuery dataset, as well as our project's Cloud Storage bucket. We also include a SCOPE variable to provide authorization to create tables in Google BigQuery.

SCOPE = 'https://www.googleapis.com/auth/bigquery'
PROJECT_ID = 'XXXXXXXXXX' # Your Project ID here
BQ_DATASET_ID = 'datastore_data'
GS_BUCKET = 'datastore_csvoutput'
ENTITY_KIND = 'main.ProductSalesData'

Importing required modules for data processing

Next, we create a class that iterates over each of our Datastore entities, transforms the entity data into a string in CSV format, and then writes the result into a single Cloud Storage CSV file. We first need to import a collection of modules that provide distributed processing features on App Engine.

import time
import calendar

from google.appengine.api import taskqueue
from google.appengine.ext import blobstore
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext.webapp import blobstore_handlers
from google.appengine.ext.webapp import util
from google.appengine.ext.webapp import template

from mapreduce.lib import files
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline

Defining a Mapper Pipeline for Datastore data

Now, we will create a new class called DatastoreMapperPipeline , that yields data from a mapreduce_pipeline.MapperPipeline class. The MapperPipeline constructor contains several parameters:

  • The first parameter is simply a string that contains the name of our pipeline.
  • The second parameter is a mapper function that runs for each Datastore entity that we process. We'll define a function called datastore_map , which returns a string representation of the data in each entity in CSV format.
  • The next two parameters specify an input reader and output writer parameter for this pipeline. In this case, we will specify use of the DatastoreInputReader and FileOutputWriter , both of which are available in the App Engine MapReduce library.
  • The params parameter specifies parameters for the input reader and output writer, and we will pass information about our Datastore entity name, as well as our Cloud Storage bucket.
  • The shards parameter determines how many parallel workers that our application uses to process data. Each worker starts a new task in the default App Engine task queue. By default, each DatastoreInputReader worker processes 50 Datastore entities at a time. In this example, our sample dataset is small, so a single worker can process the entire data set in seconds. However, for much larger data sizes, an application may use dozens or even hundreds of workers as appropriate.

Finally, the last line yields the output of our MapperPipeline to a BigQuery load pipeline called CloudStorageToBigQuery , which we will define below.

class DatastoreMapperPipeline(base_handler.PipelineBase):
  def run(self, entity_type):
    output = yield mapreduce_pipeline.MapperPipeline(
      "Datastore Mapper %s" % entity_type,
      "main.datastore_map",
      "mapreduce.input_readers.DatastoreInputReader",
      output_writer_spec="mapreduce.output_writers.FileOutputWriter",
      params={
          "input_reader":{
              "entity_kind": entity_type,
              },
          "output_writer":{
              "filesystem": "gs",
              "gs_bucket_name": GS_BUCKET,
              "output_sharding":"none",
              }
          },
          shards=10)
    yield CloudStorageToBigQuery(output)

Defining our Map function

The datastore_map function runs over each Datastore entity that our pipeline processes. This function does two things: it reads each property from the entity and writes the value into a single line in CSV format. During this process, it converts the datetime entity in a format that BigQuery can load, using a helper function called timestamp_to_posix , which we will define below.

def datastore_map(entity_type):
  data = db.to_dict(entity_type)
  resultlist = [data.get('product_id'),
                timestamp_to_posix(data.get('date')),
                data.get('store')]
  result = ','.join(['"%s"' % field for field in resultlist])
  yield("%s\n" % result)

Converting App Engine DateTime properties into POSIX time format

BigQuery currently supports four native data types: integer, float, string, and boolean values. Timestamps should be converted into an integer format before loading into BigQuery. A very common integer representation of timestamps is the POSIX time format.

Let's create a function called timestamp_to_posix to convert our Datastore timestamp fields into POSIX time format.

def timestamp_to_posix(timestamp):
  return int(time.mktime(timestamp.timetuple()))

Loading CSV data into Google BigQuery

Once the Datastore to CSV pipeline is complete, the URI of the resulting CSV file is passed to a new pipeline that starts a BigQuery load job. We'll define another pipeline class called CloudStorageToBigQuery . This class initiates a BigQuery API client, authorized using the App Engine service account we setup at the beginning of our codelab.

First, let's import the modules necessary to allow our app to instantiate an API client, as well as make authorized calls to the BigQuery API.

from apiclient.discovery import build
from oauth2client.appengine import AppAssertionCredentials

Next, let's create our new pipeline. This pipeline uses the csv_output list generated as the output of the DatastoreMapperPipeline above. We will add a line that defines a BigQuery table name based on the current timestamp, so that each time we run the extraction, we will create a new BigQuery table with a unique name.

class CloudStorageToBigQuery(base_handler.PipelineBase):
  def run(self, csv_output):

    credentials = AppAssertionCredentials(scope=SCOPE)
    http = credentials.authorize(httplib2.Http())
    bigquery_service = build("bigquery", "v2", http=http)

    jobs = bigquery_service.jobs()
    table_name = 'datastore_data_%s' % datetime.utcnow().strftime(
        '%m%d%Y_%H%M%S')
    files = [str(f.replace('/gs/', 'gs://')) for f in csv_output]
    result = jobs.insert(projectId=PROJECT_ID,
                        body=build_job_data(table_name,files))
    result.execute()

Building a BigQuery load job configuration

We'll also create a helper function, build_job_data , to build the JSON object necessary to pass to the BigQuery API. This function takes the table name that we generated above, as well as a list of files to process (in this case, we only have a single CSV file).

def build_job_data(table_name, files):
  return {"projectId": PROJECT_ID,
          "configuration":{
              "load": {
                  "sourceUris": files,
                  "schema":{
                      "fields":[
                          {
                              "name":"product_id",
                              "type":"INTEGER",
                          },
                          {
                              "name":"date",
                              "type":"INTEGER",
                          },
                          {
                              "name":"store",
                              "type":"STRING",
                          }
                          ]
                      },
                  "destinationTable":{
                      "projectId": PROJECT_ID,
                      "datasetId": DATASET_ID,
                      "tableId": table_name,
                      },
                  "maxBadRecords": 0,
                  }
              }
          }

Running and Deploying the Application

The final step will be to create a simple webapp handler that calls the PipelineBase.start() method to start the DatastoreMapperPipeline. After the pipeline has been started, the handler redirects the browser to the MapReduce status page, provided by the App Engine MapReduce library. We will also register our handlers and map them to application URLs.

class DatastoretoBigQueryStart(webapp.RequestHandler):
  def get(self):
    pipeline = DatastoreMapperPipeline(ENTITY_KIND)
    pipeline.start()
    path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id
    self.redirect(path)

application = webapp.WSGIApplication(
                                     [('/start', DatastoretoBigQueryStart),
                                     ('/add_data', AddDataHandler)],
                                     debug=True)

Deploy your new application by running the App Engine SDK appcfg.py --oauth2 update command, or by using the Google App Engine launcher or Eclipse plugin. After deployment is complete, generate a few ProductSalesData entities by visiting http://your_app_id.appspot.com/add_data .

Once you have created a few Datastore entities, visit http://your_app_id.appspot.com/start to start the Datastore mapper pipeline. The browser should redirect automatically to the MapReduce dashboard, which displays the status of the data pipeline. After the Datastore mapper pipeline is complete, the intermediate transformed CSV file should be available in your project's Google Cloud Storage datastore_csvoutput bucket.

As discussed above, the Datastore Mapper pipeline yields its output to the CloudStorageToBigQuery pipeline, which starts a BigQuery load job. BigQuery data loading may take a few minutes, even with small source files. The newly created BigQuery table will be available in the datastore_data dataset. To view newly created tables in the BigQuery browser UI, click on the Refresh option in the dropdown menu next to the project name.

Complete Source Code and Further Steps

The complete source code for this sample is available here .

The Google App Engine MapReduce library allows developers to build more complex data workflows than the simple Datastore mapper defined here. For more information, visit the MapReduce Made Easy sample, and watch our Google I/O 2012 presentation "Building Data Pipelines at Google Scale" .

Authentication required

You need to be signed in with Google+ to do that.

Signing you in...

Google Developers needs your permission to do that.