In one of my previous articles, I was uploading 1.2 million of records from csv file to Cloud Datastore. I was reading file and making list of 200 parsed lines and post that as json sequentially to webserver (here is full script). Of course that's rather slow, i.e. process took some 2.5 hours to complete. Only afterwards I remembered that there is a faster way and that is by using Cloud Dataflow, so just for fun I wrote pipeline to upload data from csv file into Datastore.
Dataflow is serverless service of Google Cloud Platform which runs data processing with Apache Beam framework. It means that you just need to write data processing pipeline and define few settings and that's all. It has (among others) good integration with Google Cloud products like Cloud Storage, BigQuery, BigTable, Datastore, Pub/Sub. Dataflow will create virtual machines which will execute job and after job is completed it will shutdown everything. Easy peasy. I created Github repository with full code (Python) https://github.com/zdenulo/upload-data-datastore-dataflow and I will explain how it's done.
most important file is upload.py where everything is defined. Note: Apache Beam for Python supports only Python 2.7 (not 3.x) at the moment.
import csv import datetime import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore from google.cloud.proto.datastore.v1 import entity_pb2 from googledatastore import helper as datastore_helper from settings import PROJECT, BUCKET, INPUT_FILENAME class CSVtoDict(beam.DoFn): """Converts line into dictionary""" def process(self, element, headers): rec = "" element = element.encode('utf-8') try: for line in csv.reader([element]): rec = line if len(rec) == len(headers): data = {header.strip(): val.strip() for header, val in zip(headers, rec)} return [data] else: print "bad: {}".format(rec) except Exception: pass class CreateEntities(beam.DoFn): """Creates Datastore entity""" def process(self, element): entity = entity_pb2.Entity() sku = int(element.pop('sku')) element['regularPrice'] = float(element['regularPrice']) element['salePrice'] = float(element['salePrice']) element['name'] = unicode(element['name'].decode('utf-8')) element['type'] = unicode(element['type'].decode('utf-8')) element['url'] = unicode(element['url'].decode('utf-8')) element['image'] = unicode(element['image'].decode('utf-8')) element['inStoreAvailability'] = unicode(element['inStoreAvailability']) datastore_helper.add_key_path(entity.key, 'Productx', sku) datastore_helper.add_properties(entity, element) return [entity] def dataflow(run_local): if run_local: input_file_path = 'sample.csv' else: input_file_path = 'gs://' + BUCKET + '/' + INPUT_FILENAME JOB_NAME = 'datastore-upload-{}'.format(datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S')) pipeline_options = { 'project': PROJECT, 'staging_location': 'gs://' + BUCKET + '/staging', 'runner': 'DataflowRunner', 'job_name': JOB_NAME, 'disk_size_gb': 100, 'temp_location': 'gs://' + BUCKET + '/temp', 'save_main_session': True } if run_local: pipeline_options['runner'] = 'DirectRunner' options = PipelineOptions.from_dictionary(pipeline_options) with beam.Pipeline(options=options) as p: (p | 'Reading input file' >> beam.io.ReadFromText(input_file_path) | 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(), ['sku', 'name', 'regularPrice', 'salePrice', 'type', 'url', 'image', 'inStoreAvailability']) | 'Create entities' >> beam.ParDo(CreateEntities()) | 'Write entities into Datastore' >> WriteToDatastore(PROJECT) ) if __name__ == '__main__': run_locally = False dataflow(run_locally)
dataflow function contains main part where options are defined as well as pipeline which does the job. It consists of 4 steps:
There are 2 custom transformations:
with run_locally variable you can define if you want to run pipeline locally or with Dataflow.
Before that apache beam library needs to be installed and job can be executed with command:
python upload.py
Here are some screenshots:
This is how pipeline looks in Dataflow UI
And some stats.
As it can be seen whole job took 16.5 minutes (which includes also provisioning of VM). On graph is displayed number of workers (VM) which were processing data and it was increasing with time and at peak (with 60 workers) before job was done. On bellow image are displayed consumed resources. With a bit effort, it can be calculated that this execution cost ~0.5$ plus Datastore costs.
So there you have it, with few lines of code it's possible to write date pipeline to load data from csv file to Datastore. Everything else is managed by Google Cloud. There is actually template which uploads text from Cloud Storage to Cloud Datastore but in that case every line should be encoded as json and based on it, Datastore entity will be created.