Analyzing Twitter Sentiments with AWS Comprehend: Artificial Intelligence and Observability

Gabriel Rios Belmiro
5 min readJul 14, 2023

--

How relevant is public opinion about a brand? Is it worth closely monitoring this landscape? These are questions we should consider when examining this subject. There are various ways in which a brand can assess its customer service and satisfaction, often through their own brand or company platforms. However, I believe that more effective monitoring occurs on social media platforms such as Twitter. Social media platforms are known for allowing people to express themselves freely, without filters, which helps identify scenarios where people report bugs in applications, customer service issues, among other examples.

So far, so good? Now let’s give a brief introduction to the technologies used:

1 — AWS Comprehend:

AWS Comprehend is an AI-powered service capable of determining the sentiment of text content. When analyzing the text, it returns the following values:

Positive: The text expresses an overall positive sentiment.
Negative: The text expresses an overall negative sentiment.
Mixed: The text expresses both positive and negative sentiments.
Neutral: The text doesn’t express any positive or negative sentiments.

2 — Prometheus:

Prometheus is a system for monitoring systems and services. It collects metrics from configured targets at given intervals, evaluates rule expressions, displays the results, and can trigger alerts when specified conditions are observed.

3 — Grafana:

Grafana is an open-source interactive data visualization platform developed by Grafana Labs. It allows users to view data through unified tables and graphs in one or multiple dashboards, making interpretation and understanding easier.

With this overview, we can combine these technologies to build a solution that monitors a brand using Twitter.

About the project:

1— Tweet collection service:

This Python-built service is responsible for accessing the Twitter API and retrieving the latest tweets posted based on a chosen keyword, which, in this case, we chose as a brand or company to simulate a scenario.
After collecting the tweets, the service processes the Twitter API response and produces messages to the Kafka topic.
bash

def task():
kafka_server = os.environ.get("KAFKA_URL")
print(f"kafka_server:::::>> {kafka_server}")
topic = "producer-twitter"
max_retries = 10
retry_delay = 5
retry_count = 0

def auth():
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
return tweepy.API(auth)

def load_processed_tweets():
if not os.path.isfile(processed_tweets_file):
return set()
with open(processed_tweets_file, "r") as file:
return set(file.read().splitlines())

def save_processed_tweets(processed_tweets):
with open(processed_tweets_file, "w") as file:
file.write("\n".join(processed_tweets))

api = auth()
processed_tweets = load_processed_tweets()
max_id = None

while True:
try:
producer = KafkaProducer(bootstrap_servers=kafka_server, value_serializer=lambda v: json.dumps(v).encode("utf-8"))
break # Se a conexão for bem-sucedida, saia do loop de retry
except Exception as e:
if retry_count >= max_retries:
print("Excedeu o número máximo de tentativas de conexão. Saindo...")
raise e
print(f"Erro ao conectar ao broker Kafka. Tentando novamente em {retry_delay} segundos...")
sleep(retry_delay)
retry_count += 1

while True:
search_query = "'netflix' -filter:retweets AND -filter:replies AND -filter:links"
tweets = api.search_tweets(q=search_query, count=1, tweet_mode='extended', max_id=max_id)

for tweet in tweets:
tweet_id = tweet.id_str

if tweet_id not in processed_tweets:
tweet_dict = tweet._json
print(tweets)
producer.send(topic, value=tweet_dict['full_text'])
producer.flush()
processed_tweets.add(tweet_id)
print(f"Tweet with ID {tweet_id} sent to Kafka.")

save_processed_tweets(processed_tweets)

# Atualiza o max_id para obter tweets mais recentes na próxima iteração
if tweets:
max_id = tweets[-1].id - 1

sleep(30)

if __name__ == '__main__':
task()

2 — Communication between services:

Communication is done asynchronously using Apache Kafka, and services are responsible for producing and consuming data from the topic.
The entire Kafka environment is set up using docker-compose.

services:
zookeeper:
image: zookeeper:3.4.9
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
volumes:
- ./data/zookeeper/data:/data
- ./data/zookeeper/datalog:/datalog
kafka1:
image: confluentinc/cp-kafka:5.3.0
hostname: kafka1
ports:
- "9091:9091"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./data/kafka1/data:/var/lib/kafka/data
depends_on:
- zookeeper
kafdrop:
image: obsidiandynamics/kafdrop
restart: "no"
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka1:19091"
depends_on:
- kafka1

3 — Sentiment analysis service:

This Go-built service consumes messages from the topic containing tweet data and processes them using AWS Comprehend, which synthesizes the text and shows us whether the written content is positive, negative, or neutral.
After this processing, a metric is created using Prometheus to count the quantity of each sentiment.

package awsClient

import (
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/comprehend"
)

func AnaliseSentimento(texto string) (*comprehend.DetectSentimentOutput, error) {
sess, err := session.NewSession(&aws.Config{
Region: aws.String("us-east-1"),
})

if err != nil {
fmt.Println("Erro ao criar sessão da AWS:", err)
return nil, err
}

comprehendClient := comprehend.New(sess)

input := &comprehend.DetectSentimentInput{
Text: aws.String(texto),
LanguageCode: aws.String("pt"),
}

output, err := comprehendClient.DetectSentiment(input)
if err != nil {
fmt.Println("Erro ao analisar sentimento:", err)
return nil, err
}

return output, nil
}

4 — Exposing metrics:

With Prometheus, we can expose these metrics to a route and use Grafana to create dashboards for monitoring the data.

package metric

import (
"fmt"
"io/ioutil"
"net/http"

"app/awsClient"
"app/repository"

"github.com/prometheus/client_golang/prometheus"
)

var (
sentimentCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "sentiment_count",
Help: "Number of sentiment analysis results",
},
[]string{"sentiment"},
)
)

func init() {
// Registrar a métrica
prometheus.MustRegister(sentimentCount)
}

func GetMetricMessage(message string) (*repository.Message, error) {

output, err := awsClient.AnaliseSentimento(message)
if err != nil {
fmt.Println("Erro ano analisar sentimento: ", err)
return nil, err
}

fmt.Print(output)

sentimentCount.WithLabelValues(*output.Sentiment).Inc()

messsageAnalyzed := &repository.Message{
Sentiment: output.Sentiment,
Tweet: message,
Mixed: output.SentimentScore.Mixed,
Negative: output.SentimentScore.Negative,
Neutral: output.SentimentScore.Neutral,
Positive: output.SentimentScore.Positive,
}

return messsageAnalyzed, nil

}

5 — Database:

After processing, the processed data is stored in MySQL for possible queries.

6 — Grafana dashboard:

Here is one of the possibilities we can explore with Grafana. In this dashboard, we can track the brand’s impact.

Conclusion:

This monitoring solution allows us to track the brand’s impact and, based on the collected data, create action plans to improve customer perception. By monitoring Twitter, we have direct contact with customers, where they express themselves without filters, enabling a more authentic view of the brand.
It is important to note that this project is only a demonstration, although it can be applied in a real-world scenario. However, it is necessary to consider the costs involved in maintaining the resources used, and of course, there are several improvements that can be implemented in the solution to make it even more effective and suitable for the real environment.

Source Code:

If you liked this project, please give it a star and feel free to contribute.

https://github.com/GaberRB/tweetMetricsComprehend

--

--