Apache Airflow is an open-source data workflow management platform. It started at Airbnb in October 2015 as a solution to manage the company’s increasingly complex workflows. Creating Airflow allowed Airbnb to programmatically author and schedule their data workflows and monitor them via the built-in Airflow user interface.
Airflow uses directed acyclic graphs (DAGs) to manage workflow orchestration. Tasks and dependencies are defined in Python and then Airflow manages the scheduling and execution. DAGs can be run either on a defined schedule (e.g. hourly or daily) or based on external event triggers .
STAGE-1 :DATA GATHERING
The data has been gathered from FINNHUB.IO,an online financial instrument API that provides data on various american listed companies.
I have gathered data relating to the public sentiment based on news media for leading companies in the tech sector including GOOGLE,MICROSOFT,ALIBABA,FACEBOOK etc.The data is recieved in JSON format.
{'buzz': {'articlesInLastWeek': 61, 'buzz': 0.9682, 'weeklyAverage': 63},
'companyNewsScore': 0.8,
'sectorAverageBullishPercent': 0.605,
'sectorAverageNewsScore': 0.5115,
'sentiment': {'bearishPercent': 0.1429, 'bullishPercent': 0.8571},
'symbol': 'NFLX'}
STAGE-2 :DATA TRANSFORMATION and CAPTURE
The python data analysis library pandas transforms the JSON files into a table structured dataframe,arrange companies by bullish(upward trending) sentiment and save it in csv format.
The data transform is done using a jupyter notebook whose executio will be orchestrated by AIRFLOW. The final dataframe looks like so:
Save a copy on my dektop along with the date using the airflow bash operator which allows for UNIX commands or a script file
Save the final datafile onto Google cloud storage where it can be used further for querying,connecting to a BI tool etc.
STAGE-3 :DATA ANALYSIS
Load the datafile into google biquery, a serverless,fully managed data warehouse platform on google cloud
The data is transfered to bigquery using googles bigquery transfer service which can be scheduled to run as per requirements and supports a plethora of file formats.
Since the data transformations have already been done in the notebook ,it can load it directly into bigquery. Google also provides services such as dataflow and dataproc for data transformation in the cloud.
PIPELINE MONITORING
Send an email using the airflow email operator once stage 2 has completed.We can also monitor the individual stages of the pipeline through airflows inbuilt user interface.
>The pipeline is scheduled to ru at 11:00 am GMT daily
PYTHON CODE FOR THE DATA TRANSFORMATION/CLEANING
import os
import json
import pandas as pd
from pandas.io.json import json_normalize
import requests
from datetime import datetime
from google.cloud import storage
data_json=[]
symbols=['NFLX','MSFT','GOOGL','FB','AMZN','BABA','TSLA']
for val in symbols:
r = requests.get('https://finnhub.io/api/v1/news-sentiment?symbol='+str(val)+'&token=bvv9c7f48v6r5v93tap0')
data_json.append(r.json())
df = json_normalize(data_json)
def make_frame(df)
df_cols=df.iloc[:,[1,3,5,6,2,7]]
df_cols.columns=['NewsScore','Symbol','Buzz','WeekAvgBuzz','BearSentiment' ,'BullSentiment']
df_cols=df_cols[['Symbol','NewsScore','Buzz','WeekAvgBuzz','BearSentiment','BullSentiment']]
df_cols['Date']=datetime.now().strftime("%Y-%m-%d")
df_final=df_cols.sort_values(by=['BullSentiment'],ascending=False).reset_index()
df_final.to_csv("/c/Users/leand/AirflowHome/dags/stocks.csv")
return df_final
df_final=make_frame(df)
def upload_blob(bucket_name, source_file_name, destination_blob_name):
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(
"File {} uploaded to {}.".format(
source_file_name, destination_blob_name
)
)
upload_blob('stock_data_bucket1', '/c/users/leand/AirflowHome/dags/stocks.csv', 'stock_data.csv')
PYTHON CODE FOR THE PIPELINE
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from airflow.operators.email import EmailOperator
import papermill as pm
default_args = {
'owner': 'leander',
'depends_on_past': False,
'start_date': datetime.today() - timedelta(days=5),
'email': ['leandersavio@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=10),
}
dag = DAG('MYDAG', default_args=default_args,schedule_interval='0 11 * * *')
def DataFetch_Transform_Clean():
pm.execute_notebook(
'/mnt/c/users/leand/AirflowHome/dags/stock.ipynb',
'/mnt/c/users/leand/AirflowHome/dags/stock_out.ipynb',
parameters=dict(alpha=0.6, ratio=0.1)
)
DataFetch_Transform_Clean = PythonOperator(
task_id="run_notebook",
python_callable=DataFetch_Transform_Clean,
dag=dag
)
move_file = BashOperator(
task_id='move-file',
bash_command='echo $(date) >> /mnt/c/users/leand/desktop/top_picks.csv;cat /mnt/c/users/leand/AirflowHome/dags/stocks.csv>> /mnt/c/users/leand/desktop/top_picks.csv',
dag=dag
)
send_email = EmailOperator(
task_id='send_email',
to='leandersavio@gmail.com',
subject='Airflow Alert',
html_content=""" <h3>HELLO,</h3><br><h3> PIPELINE SUCESSFULL</h3> """,
dag=dag
)
DataFetch_Transform_Clean >> move_file >> send_email