The data analyzed will be fake customer data present in a local postgres database. The goal is to perform etl and replicate this customer data in the bigquery warehouse using an ETL mechanism.
For this demo, we will use the architecture diagram below. The data source is a local postgres database from which data is transferred to Google cloud platform where it undegoes etl using apache spark and is stored in bigquery
Apache Airflow is an open-source data workflow management platform. It started at Airbnb in October 2015 as a solution to manage the company’s increasingly complex workflows. Creating Airflow allowed Airbnb to programmatically author and schedule their data workflows and monitor them via the built-in Airflow user interface.
Airflow uses directed acyclic graphs (DAGs) to manage workflow orchestration. Tasks and dependencies are defined in Python and then Airflow manages the scheduling and execution. DAGs can be run either on a defined schedule (e.g. hourly or daily) or based on external event triggers .
Airflow DAG
import datetime
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.operators.bash_operator import BashOperator
PROJECT_ID ='finaldemo-349008'
GCS_BUCKET = 'landingbucket0694'
DEST_BUCKET='cleanbucket0694'
FILENAME = "test_file.json"
SQL_QUERY = "select * from customers limit 10 ;"
PYTHON_FILE="gs://sparkbucketbucket0694/spark_script.py"
REGION="europe-west2"
CLUSTER_NAME = "cluster-c2a9"
DATASET_NAME = 'final_demo_data'
DATA_EXPORT_BUCKET_NAME = 'cleanbucket0694'
TABLE = "customer"
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {"main_python_file_uri": PYTHON_FILE},
}
with DAG(
dag_id="postgres_to_bigquery_dag",
start_date=datetime.datetime(2022, 5, 14),
schedule_interval="@once",
catchup=False,
) as dag:
extract_data = PostgresToGCSOperator(
task_id="extract_data", sql=SQL_QUERY, bucket=GCS_BUCKET, filename=FILENAME, gzip=False,postgres_conn_id="postgres_remote"
)
upload_data_to_gcs = PostgresToGCSOperator(
task_id="upload_data_to_gcs",
sql=SQL_QUERY,
bucket=GCS_BUCKET,
filename=FILENAME,
gzip=False,
use_server_side_cursor=True,
postgres_conn_id="postgres_remote",
)
submit_job_to_spark = DataprocSubmitJobOperator(
task_id="submit_job_to_spark",
job=PYSPARK_JOB,
location=REGION,
project_id=PROJECT_ID
)
copy_single_file = GCSToGCSOperator(
task_id="copy_single_file",
source_bucket=GCS_BUCKET,
source_objects=['test_file.json'],
destination_bucket=GCS_BUCKET,
destination_object='BACKUP/dest.json',
)
rename_parquet = BashOperator(
task_id='rename_parquet',
bash_command='python /opt/airflow/dags/rename_script.py',
)
gcs_to_bigquery = GCSToBigQueryOperator(
task_id="gcs_to_bigquery",
bucket=DATA_EXPORT_BUCKET_NAME,
source_objects=["file.parquet"],
destination_project_dataset_table=f"{DATASET_NAME}.{TABLE}",
source_format='parquet',
write_disposition='WRITE_TRUNCATE',
)
extract_data>>upload_data_to_gcs>>submit_job_to_spark>>copy_single_file>>rename_parquet>>gcs_to_bigquery
The pipeline steps are :
Transfer the extracted datafile (JSON) onto Google cloud storage
ETL is performed on the data using cloud dataproc, a managed hadoop and apache spark service. The result is then saved to another bucket for cleaned data
Dataproc is a fully managed and highly scalable service for running Apache Spark, Apache Flink, Presto, and 30+ open source tools and frameworks. Use Dataproc for data lake modernization, ETL, and secure data science, at planet scale, fully integrated with Google Cloud, at a fraction of the cost.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Final_demo').getOrCreate()
from pyspark.sql.functions import *
data = spark.read.json("gs://landingbucket0694/test_file.json")
data=data.withColumn("sales",col("sale_amt")*col('qty'))
data.repartition(1).write.mode("overwrite").parquet("gs://cleanbucket0694/CLEAN")
Dataproc saves the transformed file into another bucket meant for cleaned data. Some etl is performed and the file is saved in parquet format (read optimised for warehouses)
A unix command calling a python script renames and takes backup of our transformed parquet file.
import os
from google.cloud import storage
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/opt/airflow/credentials/keys.json'
bucket_name = "cleanbucket0694"
def list_blobs(bucket_name):
storage_client = storage.Client()
blobs = storage_client.list_blobs(bucket_name)
for blob in blobs:
if blob.name.split(".")[-1]=='parquet':
return str(blob.name)
bucket_name = "cleanbucket0694"
destination_bucket_name="cleanbucket0694"
blob_name = list_blobs(bucket_name)
destination_blob_name = "file.parquet"
def copy_blob(bucket_name, blob_name, destination_bucket_name, destination_blob_name):
storage_client = storage.Client()
source_bucket = storage_client.bucket(bucket_name)
source_blob = source_bucket.blob(blob_name)
destination_bucket = storage_client.bucket(destination_bucket_name)
blob_copy = source_bucket.copy_blob(
source_blob, destination_bucket, destination_blob_name
)
copy_blob(bucket_name, blob_name, destination_bucket_name, destination_blob_name)
Load the datafile into google biquery, a serverless,fully managed data warehouse platform on google cloud
BigQuery is a fully-managed, serverless data warehouse that enables scalable analysis over petabytes of data. It is a Platform as a Service that supports querying using ANSI SQL. It also has built-in machine learning capabilities.