SERVERLESS STREAMING PIPELINE

Leander Leitao


—————————– LINK TO DASHBOARD———————–


The data analyzed will be information regarding tweets on the topic of BITCOIN (a subject of constant interest ensuring fresh data inflow). The goal is to try to visualize daily metrics of engagement of bitcoin tweets such as reteets,likes,number of followers of people tweeting about bitcoin.



Stream data processing deals with performing analytics on data in motion that is generated by certain events such as IOT devices sending data,clickstream info from a website etc. Unlike traditional batch data data streams are unbounded and make a better use case for real time analytics

For this demo, we will use the path and services in Google cloud platform traced in red below. Image credit: SATISH CHANDRA GUPTA twitter.com/scgupta



An equivalent in Amazon web services would look like the image below. Image credit: SATISH CHANDRA GUPTA twitter.com/scgupta



A google cloud function will act as a publisher of events to our event stream. These functions can scale limitlessly depending on the volume of requests


Google Cloud Functions is a serverless execution environment for building and connecting cloud services. With Cloud Functions you write simple, single-purpose functions that are attached to events emitted from your cloud infrastructure and services.


Google Cloud Function call


# CLOUD FUNCTION CODE
import tweepy
import json
import configparser
import flask
import datetime
import time
from datetime import datetime
from google.cloud import pubsub_v1
import os

import apache_beam as beam
import pandas as pd
from apache_beam.options.pipeline_options import PipelineOptions,StandardOptions
from apache_beam import window
from google.cloud import bigquery



os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'streaming-project-XXXXX-e38bde08661f.json'

def get_tweets(request):
    

    publisher = pubsub_v1.PublisherClient()
    #pub-sub topic
    topic_path = 'projects/streaming-project-xxxxx/topics/twitter-topic'
    parser = configparser.ConfigParser() #authenticate the twitter api
    parser.read("pipeline.conf") 
    ACCESS_TOKEN = parser.get("twitter_config", "ACCESS_TOKEN")
    ACCESS_SECRET = parser.get("twitter_config", "ACCESS_SECRET")
    CONSUMER_KEY = parser.get("twitter_config", "CONSUMER_KEY") 
    CONSUMER_SECRET = parser.get("twitter_config", "CONSUMER_SECRET")
    auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)

    api = tweepy.API(auth)



   bitcoin_tweets = api.search_tweets(q='Bitcoin',count=1)#request a single tweet
    for tweet in bitcoin_tweets:

        data=str(tweet.retweet_count)+","+ str(tweet.user.followers_count)+","+str(tweet.favorite_count)+","+datetime.now().strftime("%Y-%m-%d") +','+str(int(time.time()))
        data=data.encode("utf-8")#encode the data
        
    
    
        future = publisher.publish(topic_path, data)#push to pub-sub topic
  


The tweet data (retweets,followers,likes,date,unix timestamp) is pushed to cloud pubsub, a serverless asynchronous messgae queue perfect for handling continous streams of data. PubSub is a highly available,high throughput and endlessly scalable service



The architecture of the publisher subscriber model of pubsub is shown below



The messages from the pubsub queue are read into cloud DATAFLOW, a serverless data processing service that can run both batch and real time analytics jobs. The unique feature of dataflow is that it has a unified programming model for both batch and stream processing. Dataflow uses the apache beam programming model.



Apache Beam is an open source unified programming model to define and execute data processing pipelines, including ETL, batch and stream (continuous) processing.Beam Pipelines are defined using one of the provided SDKs and executed in one of the Beam’s supported runners (distributed processing back-ends) including Apache Flink, Apache Samza, Apache Spark, and Google Cloud Dataflow


#BEAM PIPELINE CODE

import tweepy
import json
import configparser
import flask
import datetime
from datetime import datetime
from google.cloud import pubsub_v1
import os

import apache_beam as beam
import pandas as pd
from apache_beam.options.pipeline_options import PipelineOptions,StandardOptions
from apache_beam import window
from google.cloud import bigquery


os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'streaming-project-335816-1f.json'
sub='projects/streaming-project-xxxxx/subscriptions/twitter-topic-sub'
output_topic='projects/streaming-project-xxxxx/topics/output-topic'
table_name='streaming-project-xxxxxx:twitterdata11.TWEETTABLE'

options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=options)


def custom_timestamp(elements):
  unix_timestamp = elements[4]
  return beam.window.TimestampedValue(elements, int(unix_timestamp))

table_schema = 'RETWEETS:INTEGER,FOLLOWERS:INTEGER,HEARTS:INTEGER,DATE:DATE'

lines=(
   (p
   | 'ReadData' >> beam.io.ReadFromPubSub(subscription=sub).with_output_types(bytes)#READ FROM PUBSUB
   |'decode' >> beam.Map(lambda x: x.decode('utf-8'))#DECODE BYTESTRING
   | 'Remove extra chars' >> beam.Map(lambda data: (data.rstrip().lstrip()))#CLEAR LEFT,RIGHT CHARS         
   | 'Split Row' >> beam.Map(lambda row : row.split(','))#PASS COMMA DELIMITE MESSAGE AS AN ARRAY
   |'timestamp' >> beam.Map(custom_timestamp) #FUNCTION TO USE THE UNIX TIMESTAMP TO APPLY A TIME WINDOW FOR DATA PROCESSING
   |beam.WindowInto(window.FixedWindows(10))#PARTITION THE STREAM BY INTERVALS OF 10 SECONDS
   |'dropcol' >> beam.Map(lambda elements: {"RETWEETS":int(elements[0]),"FOLLOWERS":int(elements[1]),"HEARTS":int(elements[2]),"DATE":elements[3]})
   #pass all columns in a message except the timestamp as a dictionary to be written to bigquery
   | 'write to bigquery' >> beam.io.WriteToBigQuery(# write a message as a  row in abigquery table
table_name,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
)

   ))
result=p.run()
result.wait_until_finish()



The transformed messages from dataflow are written into a bigquery table where they can be analyzed using SQL or other BI tools

BigQuery is a fully-managed, serverless data warehouse that enables scalable analysis over petabytes of data.




The data in bigquery is visualised using google data studio.


—————————– LINK TO DASHBOARD———————–