In previous posts, we showed how to stream Twitter data into a LeanXcale database using a direct API. In today’s post, I will demonstrate how to perform analytical queries using SQL to later use natural language processing (NLP) techniques to perform sentiment analysis on tweets. This post aims to provide a simple example of sentiment analysis using the LeanXcale database, and it is inspired by this blog post, where a similar analysis is done reading from a MySQL database.
All code is written at the end of the post and is available in our git repository (https://gitlab.com/leanxcale_public/sentimentanalysislx).
PREREQUISITES
To follow this post, you will need a LeanXcale database populated with tweets in the same way as in our previous post. You can access a LeanXcale instance from our website.
You will also need Python 3.7 and the LeanXcale SQLAlchemy driver, which can be downloaded from here. Also, pip for Python 3 must be installed to install the required packages.
sudo apt update sudo apt upgrade sudo apt install python3.7 python3-pip
The requirements and the LeanXcale python driver can be installed using pip by running:
pip3 install file.whl pip3 install -r requirements.txt
In addition, stop words and wordnet from NLTK must be downloaded:
python3 import nltk nltk.download(‘stopwords’) nltk.download('wordnet') quit()
For the setup of a LeanXcale database and population with a Twitter stream, please go through our post on streaming Twitter data into a LeanXcale database. If you get the following error “ImportError: No module named _tkinter, please install the python-tk package” when running the code, you need to install the tkinter package: apt install python3-tk.
LEANXCALE CONNECTION
To connect our Python code to the populated LeanXcale database, the previously installed SQLAlchemy driver must be imported. From this package, we’ll need the create_engine, MetaData, and Table packages as well as SessionMaker from sqlalchemy.orm.
from sqlalchemy import create_engine, MetaData, Table from sqlalchemy.orm import sessionmaker
If we remember from our previous post, a Twitter stream was stored in a database called ‘twitterdb’ and a table called ‘TWEETS’. Additionally, to connect, we will need our username, which is ‘APP’. The connection string used by SQLAlchemy has the following format: ‘leanxcale://APP@10.64.21.1:1522/twitterdb’, where the IP address and port is the endpoint where the LeanXcale query engine is running. You will have to change the IP to your current installation endpoint. With this connection string, an engine object from SQLAlchemy is created. Now, there are two options to connect to LeanXcale and load data into a pandas DataFrame.
LOADING OPTION 1
The first alternative is taking advantage of the SQLAlchemy Object Relational Mapper (ORM). Basically, using this ORM, we can associate predefined Python classes with database tables. The ORM’s access to the database is done through a session object bound to the engine already created. Then, we create a table object to load our LX table, binding it to our LX engine using a MetaData object. After that, we create a query for returning all rows and columns of our table. Finally, we use the pandas read_sql function to execute the query and build a DataFrame from the returned ResultSet. It is very important to close the connection opened by the engine to the LX database because we are not going to use that connection anymore.
def lx_to_df(user, database, table): engine = create_engine('leanxcale://' + user + '@10.64.21.1:1522/' + database) Session = sessionmaker(bind=engine) session = Session() meta = MetaData() meta.bind = engine tweetsTable = Table(table, meta, autoload='True') query_all = session.query(tweetsTable) df = pd.read_sql(query_all.statement, query_all.session.bind) print('df loaded from LX DB') engine.dispose() return df
LOADING OPTION 2
The second alternative directly uses pandas read_sql_query function, which only requires the SQLAlchemy engine bound to the LX database and the analytical SQL query to be performed. Notice that this is very flexible because it allows you to use SQL queries as you would use in any relational database.
def lx_to_df_2(user, database, SQL): engine = create_engine('leanxcale://' + user + '@10.64.21.1:1522/' + database) df = pd.read_sql_query(SQL, engine) engine.dispose() return df
CLEANING TWEETS
Up to this point, we have been able to connect to a LeanXcale database and retrieve all records from a table into a pandas DataFrame. Once the data is stored in a DataFrame, we can start performing NLP techniques to infer information from the tweets. In our case, we use the NLTK Python library to remove stop words and make some process of tokenization and lemmatization. Some HTML code is also removed using regular expressions.
def clean_tweets(df): stopwords_en = stopwords.words('english') # ps = PorterStemmer() wordnet_lemmatizer = WordNetLemmatizer() df["clean_tweets"] = None df['len'] = None print('cleaning tweets') for i in range(0, len(df['TEXT'])): exclusion_list = ['[^a-zA-Z]', 'rt', 'http', 'co', 'RT'] exclusions = '|'.join(exclusion_list) text = re.sub(exclusions, ' ', df['TEXT'][i]) text = text.lower() words = text.split() words = [wordnet_lemmatizer.lemmatize(word) for word in words if not word in stopwords_en] # words = [ps.stem(word) for word in words] #df.loc('clean_tweets')[i] = ' '.join(words) df['clean_tweets'][i] = ' '.join(words) df['len'] = np.array([len(tweet) for tweet in df["clean_tweets"]]) return df
SENTIMENT ANALYSIS
When it comes to calculating sentiment from tweets, TextBlob is a simple Python library that offers several NLP options. The sentiment property of the TextBlob object provides a decimal value from -1 to 1, reflecting the negative or positive sentiment, respectively. To simplify the classification, a hard decision is used to define the three sentiment types:
• Positive: If the polarity is greater than 0.
• Neutral: If the polarity is equal to 0.
• Negative: If the polarity is less than 0.
def sentiment(tweet): analysis = TextBlob(tweet) if analysis.sentiment.polarity > 0: return 1 elif analysis.sentiment.polarity == 0: return 0 else: return -1
The WordCloud library is also used to plot a cloud of words for all words in tweets grouped by sentiment (that is, one word cloud for every sentiment class).
def word_cloud(df): plt.subplots(figsize=(12, 10)) wordcloud = WordCloud( background_color='white', width=1000, height=800).generate(" ".join(df['clean_tweets'])) plt.imshow(wordcloud) plt.axis('off')
MAIN METHOD
In the main method, we sequentially call the already-explained methods and construct the lists containing tweets according to the sentiment classification.
if __name__ == "__main__": df = lx_to_df('APP', 'twitterdb', 'TWEETS') # SQL = 'select * from TWEETS' # df = lx_to_df_2('APP', 'twitterdb', SQL) clean_tweets(df) df['Sentiment'] = np.array([sentiment(x) for x in df['clean_tweets']]) pos_tweets = [tweet for index, tweet in enumerate(df["clean_tweets"]) if df["Sentiment"][index] > 0] neg_tweets = [tweet for index, tweet in enumerate(df["clean_tweets"]) if df["Sentiment"][index] < 0] neu_tweets = [tweet for index, tweet in enumerate(df["clean_tweets"]) if df["Sentiment"][index] == 0] print("percentage of positive tweets: {}%".format(100 * (len(pos_tweets) / float(len(df['clean_tweets']))))) print("percentage of negative tweets: {}%".format(100 * (len(neg_tweets) / float(len(df['clean_tweets']))))) print("percentage of neutral tweets: {}%".format(100 * (len(neu_tweets) / float(len(df['clean_tweets']))))) word_cloud(df[df.Sentiment == 1]) word_cloud(df[df.Sentiment == -1]) word_cloud(df[df.Sentiment == 0]) plt.show()
CONCLUSION
This post has shown how to connect to a LeanXcale database with Python using an SLQAlchemy driver and how to perform analytical SQL queries. In addition, some ORM SQLAlchemy functionalities have been introduced, which add a new way of interacting with the database. On top of that, some NLP analyses have been conducted to classify tweets according to sentiment in a simple way due to the direct integration of LeanXcale with pandas DataFrames. If you have any concern running the example or working with LeanXcale, you can contact me using the information below.
FULL CODE
SentimentAnalysis/sentiment_analysis_basic.py from sqlalchemy import create_engine, MetaData, Table from sqlalchemy.orm import sessionmaker import pandas as pd import os import re from nltk.corpus import stopwords from nltk.stem.porter import PorterStemmer from nltk.stem import WordNetLemmatizer import nltk from wordcloud import WordCloud import numpy as np import matplotlib.pyplot as plt from textblob import TextBlob def lx_to_df(user, database, table): engine = create_engine('leanxcale://' + user + '@10.64.21.1:1522/' + database) Session = sessionmaker(bind=engine) session = Session() meta = MetaData() meta.bind = engine tweetsTable = Table(table, meta, autoload='True') query_all = session.query(tweetsTable) df = pd.read_sql(query_all.statement, query_all.session.bind) print('df loaded from LX DB') engine.dispose() return df def lx_to_df_2(user, database, SQL): engine = create_engine('leanxcale://' + user + '@10.64.21.1:1522/' + database) df = pd.read_sql_query(SQL, engine) engine.dispose() return df def clean_tweets(df): stopwords_en = stopwords.words('english') # ps = PorterStemmer() wordnet_lemmatizer = WordNetLemmatizer() df["clean_tweets"] = None df['len'] = None print('cleaning tweets') for i in range(0, len(df['TEXT'])): exclusion_list = ['[^a-zA-Z]', 'rt', 'http', 'co', 'RT'] exclusions = '|'.join(exclusion_list) text = re.sub(exclusions, ' ', df['TEXT'][i]) text = text.lower() words = text.split() words = [wordnet_lemmatizer.lemmatize(word) for word in words if not word in stopwords_en] # words = [ps.stem(word) for word in words] #df.loc('clean_tweets')[i] = ' '.join(words) df['clean_tweets'][i] = ' '.join(words) df['len'] = np.array([len(tweet) for tweet in df["clean_tweets"]]) return df def sentiment(tweet): analysis = TextBlob(tweet) if analysis.sentiment.polarity > 0: return 1 elif analysis.sentiment.polarity == 0: return 0 else: return -1 def word_cloud(df): plt.subplots(figsize=(12, 10)) wordcloud = WordCloud( background_color='white', width=1000, height=800).generate(" ".join(df['clean_tweets'])) plt.imshow(wordcloud) plt.axis('off') if __name__ == "__main__": df = lx_to_df('APP', 'twitterdb', 'TWEETS') # SQL = 'select * from TWEETS' # df = lx_to_df_2('APP', 'twitterdb', SQL) clean_tweets(df) df['Sentiment'] = np.array([sentiment(x) for x in df['clean_tweets']]) pos_tweets = [tweet for index, tweet in enumerate(df["clean_tweets"]) if df["Sentiment"][index] > 0] neg_tweets = [tweet for index, tweet in enumerate(df["clean_tweets"]) if df["Sentiment"][index] < 0] neu_tweets = [tweet for index, tweet in enumerate(df["clean_tweets"]) if df["Sentiment"][index] == 0] print("percentage of positive tweets: {}%".format(100 * (len(pos_tweets) / float(len(df['clean_tweets']))))) print("percentage of negative tweets: {}%".format(100 * (len(neg_tweets) / float(len(df['clean_tweets']))))) print("percentage of neutral tweets: {}%".format(100 * (len(neu_tweets) / float(len(df['clean_tweets']))))) word_cloud(df[df.Sentiment == 1]) word_cloud(df[df.Sentiment == -1]) word_cloud(df[df.Sentiment == 0]) plt.show()
WRITTEN BY
Jesús Manuel Gallego Romero
Software Engineer at LeanXcale
https://www.linkedin.com/in/jes%C3%BAs-manuel-gallego-romero-68a430134/