As more and more services and devices produce large amounts of data, companies across the world are trying to do analysis and generate insights as quickly as possible.
In some cases such as IoT, social media monitoring or handling financial transactions, the data is generated continuously and it’s essential to process it constantly, which means it’s considered as streaming data.
In this article we briefly describe the methods of streaming data processing and provide an example of a data pipeline that processes tweets using Twitter API and saves the data to Google Cloud BigQuery using Pub/Sub and Dataflow.
What is streaming data and how can we process it?
Streaming data is considered as the continuous flow of information that is usually generated in high volumes and at a high velocity. This kind of data can be generated from various different sources that log events as they occur, e.g. recording clicks on the website, IoT sensors measuring temperature or monitoring posts appearing on Instagram.
Unlike the traditional batch processing, which corresponds to loading large amounts of already stored data all at once, streaming data needs to be processed continuously as it’s appearing on the source. It means that when data appears on the source, the streaming pipeline should process it right away, without needing to separately download, store and process it in batches.
To process the streaming data, we can choose one of the many available tools such as:
- Google Cloud DataFlow
- Amazon Kinesis
- Azure Stream Analytics
- Apache Beam
- Apache Kafka
- Apache Storm
Visit our blog to better understand data streaming methodology:
Streaming pipeline example
Imagine you want to monitor posts that appear on Twitter that are connected to a certain tag, e.g. ‘Google’. Of course you can go to Twitter, simply type in a word in the search bar and you’ll get the results, but if you want to do this reliably and quickly generate some insights, it would be better to create the automated pipeline that will do the job.
To demonstrate the process, the streaming data processing pipeline has been created.
Its purpose is to download tweets that appear on a certain topic and ingest them to Google Cloud.
The process can be described in the following steps:
- Download tweets using Twitter API (https://developer.twitter.com/en/docs/twitter-api).
- Send each tweet as a message to Google Pub/Sub.
- Process the tweets using Apache Beam pipeline and aggregate the data.
- Save both raw tweets and aggregated tweet data to BigQuery tables.
Accessing Twitter API
Twitter provides a reliable way to programmatically access its contents via the API. It enables developers to access core elements of Twitter like: Tweets, Users or Messages.
In the presented example, the API is accessed using the tweepy package, which is a very convenient way of accessing Twitter API with Python (https://docs.tweepy.org/en/stable/).
Before we can access the API, we need to create the Twitter account and sign in to the developer portal – https://developer.twitter.com/en/docs/developer-portal/overview. Once we receive the Bearer Token for the authentication we can start accessing the API.
To test if it’s working you can just make a simple request, by running a cURL command. Just replace the $ACCESS_TOKEN and $USERNAME with your Bearer Token and Twitter username.
curl "https://api.twitter.com/2/users/by/username/$USERNAME" -H "Authorization: Bearer $ACCESS_TOKEN"
Sending tweets to Pub/Sub
When the twitter access is set up, we can proceed to create the python script that watches a tag on Twitter and if any tweets connected to the tag appear they are sent to the Google Cloud Pub/Sub topic.
Google Cloud Pub/Sub is a messaging service for exchanging event data among applications and services. It allows communication between the services by decoupling senders and receivers.
Before creating the script we need to create a Pub/Sub topic and a service account on Google Cloud.To create a topic, just go to the Pub/Sub section in Google Cloud console (https://console.cloud.google.com/cloudpubsub) and click the CREATE TOPIC button. In our example, the topic is named tweets-test.
Next we need to create the service account. To do that, go to IAM & Admin -> Service Accounts -> Create service account.
Once you set the name (in the example it’s twitter-test) click CREATE AND CONTINUE.
Next we specify the roles needed for the SA, set Pub/Sub Editor and BigQuery Data Editor roles and click done.
Next, we go to our new service account, select keys and add a new JSON key so we can access our SA.
Then we just download the key and set the user env variable on our local machine:
Now our Service account and Pub/Sub topic should be ready to go, so we can proceed with creating the python script for the tweet streaming.
First install the necessary libraries:
pip install --upgrade pip pip install google-cloud-pubsub pip install tweepy pip install apache-beam[gcp]
Next, create the python script named stream_to_pubsub.py:
# stream_to_pubsub.py import argparse import json from google.cloud import pubsub_v1 import tweepy def parse_args(): parser = argparse.ArgumentParser() parser.add_argument('--bearer_token', type=str, required=True) parser.add_argument('--stream_rule', type=str, required=True) parser.add_argument('--project_id', type=str, required=True) parser.add_argument('--topic_id', type=str, required=True) return parser.parse_args() def write_to_pubsub(data, stream_rule): data["stream_rule"] = stream_rule data_formatted = json.dumps(data).encode("utf-8") id = data["id"].encode("utf-8") author_id = data["author_id"].encode("utf-8") future = publisher.publish( topic_path, data_formatted, id=id, author_id=author_id ) print(future.result()) class Client(tweepy.StreamingClient): def __init__(self, bearer_token, stream_rule): super().__init__(bearer_token) self.stream_rule = stream_rule def on_response(self, response): tweet_data = response.data.data user_data = response.includes['users'].data result = tweet_data result["user"] = user_data write_to_pubsub(result, self.stream_rule) if __name__ == "__main__": tweet_fields = ['id', 'text', 'author_id', 'created_at', 'lang'] user_fields = ['description', 'created_at', 'location'] expansions = ['author_id'] args = parse_args() streaming_client = Client(args.bearer_token, args.stream_rule) publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(args.project_id, args.topic_id) # remove existing rules rules = streaming_client.get_rules().data if rules is not None: existing_rules = [rule.id for rule in streaming_client.get_rules().data] streaming_client.delete_rules(ids=existing_rules) # add new rules and run stream streaming_client.add_rules(tweepy.StreamRule(args.stream_rule)) streaming_client.filter(tweet_fields=tweet_fields, expansions=expansions, user_fields=user_fields)
Then the script can be run using the following command:
python3 stream_to_pubsub.py --bearer_token "<Bearer-token>" --stream_rule Google --project_id "<your-project-id>" --topic_id "<your-topic-id>"
While it’s running we can go to GCP to check if the messages are arriving. Go to Pub/Sub -> subscriptions -> our new subscription (here tweets-test-sub) -> messages and click PULL. You should see the messages that correspond to the newly created tweets, that are connected to the keyword set in the stream_rule variable in the script.
How to process streaming data with Apache Beam?
To process the streaming data we’re going to use Apache Beam, which is the unified programming model to execute data processing pipelines, both batch and streaming.
The created pipeline uses the Pub/Sub topic as input and outputs the data to two BigQuery tables:
- raw_tweets – tweet text with the metadata and author information
- minute_level_counts – total number of tweets grouped by minute and tweet language
The pipeline code can be accessed on Github: https://github.com/klepacz/tweet-streaming/blob/main/pipeline.py
Before running a pipeline we need to create a new dataset in BigQuery to store our output. To do that, go to BigQuery in your Google Cloud console, then click on three dots next to the project name and hit Create dataset. Name it twitter_data and click Create dataset.
Once you have the pipeline module and the BQ dataset created you can test the solution by running the pipeline.py script in one terminal using the following command:
python3 pipeline.py \ --project_id "<yout-project_id>" \ --input_topic "projects/<yout-project_id>/topics/<your-topic-id>"
and stream_to_pubsub.py in another terminal.
After a couple of minutes we can check the output tables to see the results. We should have two BigQuery tables created: minute_level_counts and raw_tweets. You can keep the scripts running for a few more minutes to see how the new data is appended to the tables.
The minute_level_counts table should look like this, we can see how many tweets appeared each minute in each language:
Notice that the minute_level_counts table gets updated every minute and the timestamp column changes with the one minute interval. It’s caused by the Windowing configuration in the Beam pipeline, which is set to have a fixed size of 60 seconds. You can change the window_size value in the pipeline code to adjust the interval of the data aggregation.
Deploying the solution to Google Cloud
Now our solution should be working correctly, but what if we want to run the tweet streaming for a longer period of time, e.g. a week? Well we can use Google Cloud for the deployment and further explore its capabilities.
Deploying Pub/Sub streaming service
To deploy the module for sending tweets to Pub/Sub we can simply create the VM on the Compute Engine and run our stream_to_pubsub.py script there.
To do that go to Compute Engine -> VM Instances -> Create Instance, then choose the machine name and size. Here it’s named tweet-streaming and for the machine type e2-micro was chosen as we’re not expecting a high workload in our case.
Next scroll down to Identity and API access and select the same service account that you created before (in the example it’s twitter-test). We need to specify that because the default service account doesn’t have access to Pub/Sub and we won’t be able to run our script.
Now we can go ahead and hit Create and after a couple of minutes we can connect to the VM by clicking SSH under the connect column.
Then we need to install tweepy by running following commands on the machine:
sudo apt-get update sudo apt-get -y install python3-pip pip install tweepy pip install google-cloud-pubsub
And now we just need to create the stream_to_pubsub.py module using the code presented before (you can use nano or just upload the file from local machine using the settings in the SSH window) and run it in background, so it’s still running even if we log-out from the VM:
nohup python3 stream_to_pubsub.py --bearer_token "<Bearer-token>" --stream_rule Google --project_id "<your-project-id>" --topic_id "<your-topic-id>" &
The deployment using the VM is certainly easy but may not be the optimal solution for the production environment. For those cases you can consider creating a Docker container and deploying it to Google Kubernetes Engine or Cloud Run.
Deploying Apache Beam pipeline
For the data processing pipeline deployment we can use Dataflow, which is a fully managed Google Cloud service for running Apache Beam pipelines.
Dataflow needs the storage bucket to store temporary data, so we need to create it first. Go to Cloud Storage in the GCP console and create a new bucket specifying a new unique name, here we named it: dataflow-twitter-stg.
We also need to grant additional permissions to our Service Account so it can run Dataflow Jobs. In the IAM – IAM & Admin section edit your service account (here it’s twitter-test) and add following roles: Storage Object Admin, Cloud Dataflow Service Agent.
Now we can run our pipeline.py module created before but instead of using the DirectRunner, which runs Beam on our local machine, we’re using DataflowRunner, which uses Google Cloud infrastructure as runtime.
python3 pipeline.py \ --project_id "<yout-project_id>" \ --input_topic "projects/<yout-project_id>/topics/<your-topic-id>" \ --runner DataflowRunner \ --staging_location "gs://<bucket-name>/stg" \ --temp_location "gs://<bucket-name>/temp" \ --region us-west1
While it’s running, we can check the job status in the Dataflow section on Google Cloud console. Under Job Graph we should see the running pipeline:
Here we can also check numerous Job Metrics such as element throughput, data freshness and autoscaling information, which can be really useful for troubleshooting any problems with the streaming pipelines.
Streaming data processing is becoming common as companies try to keep up with the business requirements to quickly generate insights from large amounts of data flowing from different services and devices. Different cloud-based tools such as Google Dataflow or Amazon Kinesis help to create the streaming pipelines without worrying about the infrastructure.
We presented an example pipeline, which allowed us to continuously download posts from Twitter and process them to BigQuery tables and we showed how the solution can be deployed to the Google Cloud.