Recently, I read a post about streaming Twitter data into a MySQL database to filter and store tweets for further analysis.

Today, I want to share how to do the same using instead a LeanXcale database, which provides an easy key-value interface designed to ingest significant amounts of data (such as a Twitter stream) as well as an SQL interface more suitable for complex analysis.

All code is written at the end of the post and is available in our git repository (https://gitlab.com/leanxcale_public/twitter2lx).

PREREQUISITES

As described in the original post, we need to have a Twitter developer account and a database (in this case, LeanXcale). Severalsites explain how to obtain developer access to Twitter, and you can access a LeanXcale instance from this website.

Because our project is developed with Java and Maven, we also need a library to connect to the Twitter stream, such as Twitter4j, and another library to connect to the LeanXcale key-value interface, called the KiVi interface, so we’ll use the KiVi client library downloadable from the LeanXcale website.

 CONFIGURE PROJECT

To configure the project, we must define the maven dependencies for both libraries. For Twitter4j, we point to a public maven repository:

<dependency>
      <groupId>org.twitter4j</groupId>
      <artifactId>twitter4j-stream</artifactId>
      <version>4.0.6</version>
    </dependency>

As the KiVi library is not available from a public repository, we install the all-dependencies jar in our local repository and define the dependency as follows:

 mvn install:install-file -Dfile=kivi-api-0.96-SNAPSHOT-direct-client.jar -DgroupId=com.leanxcale -DartifactId=kivi-api -Dversion=0.96-SNAPSHOT -Dpackaging=jar
    <dependency>
      <groupId>com.leanxcale</groupId>
      <artifactId>kivi-api</artifactId>
      <version>0.96-SNAPSHOT</version>
    </dependency>

 CONFIGURE THE TWITTER CONNECTION

Twitter4j provides several ways to configure the credentials needed to connect to its API, and we use the properties file approach. We create a file called twitter4j.properties in our resources root folder with the following content:

oauth.consumerKey = // Your consumer key
oauth.consumerSecret = // Your consumer secret
oauth.accessToken = // Your access token
oauth.accessTokenSecret = //Your access token secret

This method is simple and allows you to change the credentials without touching your code, so it’s sufficient for our example.

CONFIGURE THE LEANXCALE CONNECTION

Configuring and opening a session to LeanXcale through its key-value interface is straightforward, and we only need to define the session settings to create a new Session object using the endpoint URL (replace localhost with your server address in the following code):

Settings settings = Settings.parse("lx://localhost:9876/twitterdb@APP");
Session session = SessionFactory.newSession(settings);

Note that the database is called twitterdb and the user is defined as APP.

CREATE THE TABLE

We store the tweets in a table called TWEETS (not so original) that includes the following fields:

  • ID (BIGINT): The primary key.
  • USERNAME (VARCHAR): Tweet’s author username.
  • CREATED (TIMESTAMP): Timestamp when the tweet was created.
  • TEXT (VARCHAR): Content of the tweet.
  • RT (INTEGER): Number of retweets.
  • LAT (DOUBLE): Position latitude from where the tweet was created (if defined).
  • LON (DOUBLE): Position longitude from where the tweet was created (if defined).
  • PLACE (VARCHAR): Name of the place from where the tweet was created (if defined).We could use the SQL interface to create the table that stores the retrieved tweets through a standard CREATE query. To make our example self-contained, we do this instead using the KiVi interface as follows:
private static Table getOrCreateTable(Session session){
     Database database = session.database();
     String tableName = “TWEETS”;
     if(!database.tableExists(tableName)){
       return database.createTable(
          tableName,
          Arrays.asList(new Field(“ID”, Type.LONG)),
          Arrays.asList(
              new Field(“USERNAME”, Type.STRING),
              new Field(“CREATED”, Type.TIMESTAMP),
              new Field(“TEXT”, Type.STRING),
              new Field(“RT”, Type.INT),
              new Field(“LAT”, Type.DOUBLE),
              new Field(“LON”, Type.DOUBLE),
              new Field(“PLACE”, Type.STRING)
              )
      );
    }
     return database.getTable(TABLE_NAME);
  }




CONVERT THE TWEET TO A TUPLE

Once we define the table structure to store the tweets, we next define a function for converting from the Twitter4j class representing a tweet (Status) to a table row represented by the Tuple class.

The complete reference of the Status object can be reviewed on the Twitter4j website, but for our needs, the code should be:

<private static Tuple tweetToTuple(Status status, Table table){
     // This ensures that the tuple has the table format
    Tuple tuple = table.createTuple();
     tuple.putLong(“ID”, status.getId());
    tuple.putString(“USERNAME”, status.getUser().getName());
    tuple.putTimestamp(“CREATED”, new Timestamp(status.getCreatedAt().getTime()));
    tuple.putString(“TEXT”, status.getText());
    tuple.putInteger(“RT”, status.getRetweetCount());
     if(status.getGeoLocation() != null){
       tuple.putDouble(“LAT”, status.getGeoLocation().getLatitude());
      tuple.putDouble(“LON”, status.getGeoLocation().getLongitude());
    }
     if(status.getPlace() != null){
       tuple.putString(“PLACE”, status.getPlace().getFullName());
    }
        return tuple;
  }

Tuple acts as a typed map for the tuple fields, so the only remarkable thing in this code is that Tuple works with java.sql.Timestamp (because we’ve defined the CREATED field as a TIMESTAMP column). Then, we must convert the createAt attribute from the java.util.Date to java.sql.Timestamp.

Because GeoLocation and Place are not mandatory attributes in the Status object, we must check if they are null.

STREAMING FROM TWITTER

Now that we have configured the Twitter and database connections, defined the database table, and created a conversion function to convert from tweet to table row, it’s time to open the stream to receive data.

This is accomplished through the TwitterStream class, which controls the stream, and the StatusListener interface, which receives the data events.

First, we must implement the listener class. To avoid implementing many methods that we won’t use for this example, we’re going to extend the StatusAdapter class and override only the onStatus() method, instead of implementing the complete StatusListener interface.

private static class LeanXcaleListener extends StatusAdapter {
     Table table;
    Session session;
     public LeanXcaleListener(Table table, Session session) {
      this.table = table;
      this.session = session;
    }
     @Override
    public void onStatus(Status status) {
         Tuple tuple = tweetToTuple(status, table);
        // Here we’ve to insert tuples  into the database
    }
  }

Second, we configure and create the TwitterStream object:

ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true);
 TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
twitterStream.addListener(new LeanXcaleListener(table, session));

Then, we can start sampling and filtering the tweets:

twitterStream.sample();
twitterStream.filter(new FilterQuery("datascience").language(“en”);

In this case, to avoid receiving posts about any topic, we define a filter using the expression “datascience” and, considering the data processing for the next post, we’ll receive only tweets in the English language.

In any case, the Twitter4j API doc describes a lot about how to define different filters and options.

INSERTING IN LEANXCALE

If you noticed above, the method onStatus in the listener is defined as follows:

@Override
    public void onStatus(Status status) {
         Tuple tuple = tweetToTuple(status, table);
        // Here we’ve to insert tuples  into the database
    }

Inserting the tuples into the database is called through the table object as follows:

 table.insert(tuple);

We can alternatively use a more efficient version of the method that does not check the unicity of the key and overwrite an existing tuple if one already exists:

table.upsert(tuple);

So, the onStatus() code would look like:

@Override
    public void onStatus(Status status) {
         Tuple tuple = tweetToTuple(status, table);
        table.upsert(tuple);
    }

LeanXcale is a transactional database, so we need to begin, commit, or rollback transactions to store the data effectively. Trying a first approach, we need to do something like:

@Override
    public void onStatus(Status status) {
       Tuple tuple = tweetToTuple(status, table);
       System.out.println("Inserting: " + tuple);
        try {
            session.beginTransaction();        
            table.upsert(tuple);
            session.commit();
      }
      catch (Exception e){
        e.printStackTrace();
        try {
          session.rollback();
        }
        catch(Exception e1){
          e1.printStackTrace();
        }
      }
    }

The above code creates a new transaction per tweet, which is not efficient. At the end of this post, we include a more efficient approach that groups insertions within a single transaction for every 10 tweets.

CLOSING CONNECTIONS

The TwitterStream object creates a new thread to receive the events from the unending stream, so to close the stream and database connection cleanly, we add this handler to the JVM to close the connections when the process is killed (i.e., a Ctrl+C).

Runtime.getRuntime().addShutdownHook(
          new Thread(() -> {
            twitterStream.shutdown();
            session.close();
          }));

 RUNNING THE CODE

Because we implemented the example using a Java class (Twitter2Lx), the main method looks like:

public static void main(String[] args){

Settings settings = Settings.parse("lx://localhost:9876/twitterdb@APP");
     try{
      Session session = SessionFactory.newSession(settings);
      Table table = getOrCreateTable(session);

      ConfigurationBuilder cb = new ConfigurationBuilder();
      cb.setDebugEnabled(true);

      TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
      twitterStream.addListener(new LeanXcaleListener(table, session));

      twitterStream.sample();
      twitterStream.filter(new FilterQuery("datascience"));

      Runtime.getRuntime().addShutdownHook(
          new Thread(() -> {
            twitterStream.shutdown();
            session.close();
          }));
    }
    catch (Exception e){
      e.printStackTrace();
    }
  }

To invoke it from the command line:

mvn clean install exec:java -Dexec.mainClass="Twitter2Lx"

Since we added a System.out.println(); in onStatus() method, we can see in real time the tweets processing:

Inserting: TupleImpl{ ID:LONG:1166750906664333313 USER:STRING:Babbel Digital CREATED:TIMESTAMP:2019-08-28 18:34:27.0 TEXT:STRING:RT @schmarzo: How can your #datascience team become more productive working as a team? Get results with these 5 essential lessons 🚀 👩 @Data… RT:INT:0 LAT:DOUBLE:null LON:DOUBLE:null PLACE:STRING:null }

Inserting: TupleImpl{ ID:LONG:1166750927249969152 USER:STRING:Christopher Burnette CREATED:TIMESTAMP:2019-08-28 18:34:32.0 TEXT:STRING:RT @ProgrammerBooks: Exploring Data Science ==> https://t.co/tXW2T1TcfI

 #python #javascript #angular #reactjs #vuejs #perl #ruby #Csharp #… RT:INT:0 LAT:DOUBLE:null LON:DOUBLE:null PLACE:STRING:null }

If you don’t see any activity, then you can modify the filter terms to receive more popular topics.

CHECKING DATA FROM SQL

Although we will dig deeper into the data analysis in future posts, for now, you can connect to your LeanXcale instance through the JDBC interface to view the stored tweets.

Follow the SQuirreL tutorial on the LeanXcale website and configure it using the following URL

jdbc:leanxcale://localhost:1522/twitterdb

And execute some SQL queries like:

 select count(*) from TWEETS
 select username, count(*) from TWEETS group by username

CONCLUSION

This post demonstrates an easy configuration for a simple pipeline to store data into a LeanXcale database from a stream source, such as Twitter. This is the first step to develop complex data analysis (like NLP) and, in the next posts, we will discuss how the LeanXcale dual interface will help us do analytical queries using SQL and some standard libraries.

In an advanced step, we will do this analysis with live data and build a complete example of real-time analytics.

If you have any doubts running this example or working with LeanXcale, then you can contact me using the information below.

FULL CODE

src/main/java/Twitter2Lx.java

import com.leanxcale.kivi.database.Database;
import com.leanxcale.kivi.database.Field;
import com.leanxcale.kivi.database.Table;
import com.leanxcale.kivi.database.Type;
import com.leanxcale.kivi.session.Session;
import com.leanxcale.kivi.session.SessionFactory;
import com.leanxcale.kivi.session.Settings;
import com.leanxcale.kivi.tuple.Tuple;
import java.sql.Timestamp;
import java.util.Arrays;
import twitter4j.FilterQuery;
import twitter4j.Status;
import twitter4j.StatusAdapter;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
public class Twitter2Lx {
  private static final String SERVER_URL = "lx://localhost:2181";
  private static final String DATABASE_USER = "APP";
  private static final String DATABASE_NAME = "twitterdb";
  private static final String TABLE_NAME = "TWEETS";
  private static final String ID_FIELD = "id";
  private static final String USER_FIELD = "username";
  private static final String CREATED_FIELD = "created";
  private static final String TEXT_FIELD = "text";
  private static final String RT_FIELD = "rt";
  private static final String LAT_FIELD = "lat";
  private static final String LON_FIELD = "lon";
  private static final String PLACE_FIELD = "place";
  public static void main(String[] args){
    Settings settings = new Settings();
    settings.setUrl(SERVER_URL);
     settings.setDatabase(DATABASE_NAME);
     settings.setUser(DATABASE_USER);
     try{
       Session session = SessionFactory.newSession(settings);
      Table table = getOrCreateTable(session);
      ConfigurationBuilder cb = new ConfigurationBuilder();
      cb.setDebugEnabled(true);
      TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
      twitterStream.addListener(new LeanXcaleListener(table, session));

      twitterStream.sample();
      twitterStream.filter(new FilterQuery("datascience"));

      Runtime.getRuntime().addShutdownHook(
          new Thread(() -> {
            twitterStream.shutdown();
            session.close();
          }));
    }
    catch (Exception e){
      e.printStackTrace();
    }
  }
  private static Tuple tweetToTuple(Status status, Table table){
    Tuple tuple = table.createTuple();
    tuple.putLong(ID_FIELD, status.getId());
    tuple.putString(USER_FIELD, status.getUser().getName());
    tuple.putTimestamp(CREATED_FIELD, new Timestamp(status.getCreatedAt().getTime()));
    tuple.putString(TEXT_FIELD, status.getText());
    tuple.putInteger(RT_FIELD, status.getRetweetCount());
    if(status.getGeoLocation() != null){
      tuple.putDouble(LAT_FIELD, status.getGeoLocation().getLatitude());
      tuple.putDouble(LON_FIELD, status.getGeoLocation().getLongitude());
    }
    if(status.getPlace() != null){
      tuple.putString(PLACE_FIELD, status.getPlace().getFullName());
    }
    return tuple;
  }
  private static Table getOrCreateTable(Session session){
    Database database = session.database();
    if(!database.tableExists(TABLE_NAME)) {
      return database.createTable(TABLE_NAME, Arrays.asList(new Field(ID_FIELD, Type.LONG)),
          Arrays.asList(new Field(USER_FIELD, Type.STRING), new Field(CREATED_FIELD, Type.TIMESTAMP),
              new Field(TEXT_FIELD, Type.STRING), new Field(RT_FIELD, Type.INT),
              new Field(LAT_FIELD, Type.DOUBLE), new Field(LON_FIELD, Type.DOUBLE),
              new Field(PLACE_FIELD, Type.STRING)));
    }
    return database.getTable(TABLE_NAME);
  }
  private static class LeanXcaleListener extends StatusAdapter {
    int count;
    Table table;
    Session session;
    public LeanXcaleListener(Table table, Session session) {
      this.table = table;
      this.session = session;
      this.count = 0;
    }
    @Override
    public void onStatus(Status status) {
      try {
        if(count == 0){
          session.beginTransaction();
        }
        Tuple tuple = tweetToTuple(status, table);
        table.upsert(tuple);
        System.out.println("Inserting: " + tuple);
        count++;
        if(count > 5){
          count = 0;
          session.commit();
          System.out.println("Commit");
        }
      }
      catch (Exception e){
        e.printStackTrace();
        try {
          System.out.println("Rollback");
          session.rollback();
        }
        catch (Exception e1){
          e1.printStackTrace();
        }
      }
    }
  }
}
src/main/resources/twitter4j.properties
oauth.consumerKey = // Your consumer key
oauth.consumerSecret = // Your consumer secret
oauth.accessToken = // Your access token
oauth.accessTokenSecret = //Your access token secret
/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.leanxcale</groupId>
  <artifactId>twitterClient</artifactId>
  <version>0.1-SNAPSHOT</version>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>8</source>
          <target>8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <properties>
    <kivi-api.version>0.96-SNAPSHOT</kivi-api.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.twitter4j</groupId>
      <artifactId>twitter4j-stream</artifactId>
      <version>4.0.6</version>
    </dependency>
    <dependency>
      <groupId>com.leanxcale</groupId>
      <artifactId>kivi-api</artifactId>
      <version>${kivi-api.version}</version>
    </dependency>
  </dependencies>
</project>

WRITTEN BY

Javier López Moratalla

Software Engineer at LeanXcale

After more than 15 years developing, designing and deploying enterprise software at companies like Hewlett Packard Enterprise or Gemalto, I’m now part of the LeanXcale team, researching and working on the next generation of databases.

javier@leanxcale.com

https://www.linkedin.com/in/javier-lopez-moratalla/