I want to share how to build a simple loader for CSV files using statistics that show the COVID-19 evolution throughout Spain.

Once loaded, this data can be explored with a variety of BI tools, which I will demonstrate in future posts.

All the code for this post is available in our public git.

Prerequisites

To follow this tutorial, you need:

These datasets are made available from the team at Datadista who, among other cool things, is processing and organizing the data provided by the Healthcare Minister of Spain daily.

Setting up the project

I developed the loader as a Maven project in Java because it’s the language with which I am most comfortable. But, you can take advantage of the LeanXcale connectors to develop in the language of your choice.

I use the key-value interface to insert the data because it’s easier and faster compared to writing the code in SQL. So, we include the Maven dependency to the KiVi connector.

    <dependency>
      <groupId>com.leanxcale</groupId>
      <artifactId>kivi-api</artifactId>
      <version>1.4.0</version>
    </dependency>

You can configure the LeanXcale public artifactory to enable Maven to download the connectors automatically by adding this to your pom.xml file.

<repositories>
    <repository>
      <id>leanxcale-virtual</id>
      <name>leanxcale-virtual</name>
      <url>https://artifactory.ext.leanxcale.com/artifactory/leanxcale-repos</url>
    </repository>
  </repositories>

Dataset structure

The data to load consists of the total of confirmed cases, patients in hospitals, patients in ICUs, and deaths occurring every day since the beginning of the outbreak.

The data for the entire country is included in the file nacional_covid19.csv, which has one row for each day with the date in the format yyyy-mm-dd along with several metrics, separated by commas. The features we are interested in are “casos_pcr”, “fallecimientos”, “ingresos_uci” , “hospitalizados”.

Detailed data from the territories are included in the file ccaa_covid19_datos_isciii.csv, which has a similar structure, but with an additional field in each row to identify the corresponding territory.

Because we need to store two datasets, we use two tables, one for the national data and the second for the detailed territory information.

The national table has the following structure:

  • Day (Date): The day to which the data corresponds, which will also be the PK for the table.
  • Cases (integer): The total confirmed cases.
  • Hospital (integer): The number of patients admitted to hospitals.
  • ICU (integer): The number of patients admitted to an ICU.
  • Dead (integer): The number of patients who died.

The territory information table has the following structure:

  • Day (Date): The day to which the data corresponds.
  • CCAA (String): The name of the territory (autonomous to a community in Spanish terminology).
  • Cases (integer): The total confirmed cases.
  • Hospital (integer): The number of patients admitted to hospitals.
  • ICU (integer): The number of patients admitted to an ICU.
  • Dead (integer): The number of patients who died.

For this table, the PK will be the day and territory together because each day includes information from all territories.

Because all the numerical fields represent accumulated amounts for each metric, the expected behavior for each is they continuously increase.

In future posts, I will demonstrate how to extract a single day’s data from the accumulated data and how to use LeanXcale to perform this simply and quickly.

Project structure

As we are loading data from two datasets into two tables and do not want duplicate code, we develop a generic loader that iterates over a file for inserting tuples into a table, two transformers that convert the rows in the tuples (specific for each dataset), and a main class to orchestrate the entire process.

Transformers

To provide a common interface for the transformers, I define the interface Transformer with the following methods:

public interface Transformer {

  /**
   * Create a table with the format needed for this dataset 
   * 
   * @param session
   * @param tableName
   */
  void createTable(Session session, String tableName);

  /**
   * Fills a tuple with the date contained in the line
   * 
   * @param line
   * @param tuple
   */
  void fillTuple(String line, Tuple tuple);

  /**
   * Transforms a String to an int, convrting an emptu String into am integer
   * @param field
   * @return
   */
  default int convertField(String field){

    if(field == null || field.isEmpty()){
      return 0;
    }

    return Integer.valueOf(field);
  }
}

The reason for including the creation of the table within the transformer is that this is the only object that can know the dataset and table structure.

Creating the tables

The implementation of this method is straightforward because it uses the createTable method of the database object.

National
 @Override
  public void createTable(Session session, String tableName) {

    if(session.database().tableExists(tableName)){

      log.info("Dropping table {}", tableName);

      session.database().dropTable(tableName);
    }

    log.info("Creating table {}", tableName);

    session.database()
        .createTable(
            tableName,
            singletonList(new Field(DATE_FIELD, Type.DATE)),
            asList(
                new Field(CONFIRMED_FIELD, Type.INT),
                new Field(DEATHS_FIELD, Type.INT),
                new Field(ICU_FIELD, Type.INT),
                new Field(HOSPITAL_FIELD, Type.INT)
            )
        );

    log.info("Table {} created", tableName);
  }
Territories

 @Override
  public void createTable(Session session, String tableName) {

    if(session.database().tableExists(tableName)){

      log.info("Dropping table {}", tableName);

      session.database().dropTable(tableName);
    }

    log.info("Creating table {}", tableName);

    session.database()
        .createTable(
            tableName,
            asList(
                new Field(TERRITORY_FIELD, Type.STRING),
                new Field(DATE_FIELD, Type.DATE)
            ),
            asList(
                new Field(CONFIRMED_FIELD, Type.INT),
                new Field(DEATHS_FIELD, Type.INT),
                new Field(ICU_FIELD, Type.INT),
                new Field(HOSPITAL_FIELD, Type.INT)
            )
        );

    log.info("Table {} created", tableName);
  }

Transforming the rows into tuples

To avoid memory consumption, we don’t create a new tuple object for each row. Instead, we reuse the object and re-populate it with new data.

To convert the date string into a Date object, we use the default constructor because the string date format complies with the standard format. To convert the strings to numbers, we use a generic method that converts the empty values to 0, and we include this as a default method in the interface to make it available for all implementations.

National
 @Override
  public void fillTuple(String line, Tuple tuple) {

    String fields[] = line.split("," ,-1);

    tuple.putDate(DATE_FIELD, Date.valueOf(fields[0]));

    String cases = fields[2];

    String deaths = fields[5];
    String icu = fields[6];
    String hospital = fields[7];

    tuple.putInteger(CONFIRMED_FIELD, convertField(cases));
    tuple.putInteger(DEATHS_FIELD, convertField(deaths));
    tuple.putInteger(ICU_FIELD, convertField(icu));
    tuple.putInteger(HOSPITAL_FIELD, convertField(hospital));
  }

Territories
  @Override
  public void fillTuple(String line, Tuple tuple) {

    String fields[] = line.split("," ,-1);

    tuple.putString(TERRITORY_FIELD, fields[2]);
    tuple.putDate(DATE_FIELD, Date.valueOf(fields[0]));

    String cases = fields[4];

    String deaths = fields[8];
    String icu = fields[7];
    String hospital = fields[6];

    tuple.putInteger(CONFIRMED_FIELD, convertField(cases));
    tuple.putInteger(DEATHS_FIELD, convertField(deaths));
    tuple.putInteger(ICU_FIELD, convertField(icu));
    tuple.putInteger(HOSPITAL_FIELD, convertField(hospital));
  }

Loading the tuples

The process of loading the files consists of creating the table, iterating over the rows in the file, converting each row into a tuple, and inserting the tuple into the table.

As mentioned above and to avoid the creation of a new tuple object for each row, we create one tuple and populate it with the row data. To improve performance, I add a batch size to perform a commit periodically.

public void load(Session session)
      throws IOException{

    transformer.createTable(session,tableName);

    log.info("Starting to load file {}", file);

    try(BufferedReader reader = new BufferedReader(new
   InputStreamReader(getClass().getResourceAsStream(file)))){

      Table table = session.database().getTable(tableName);
      Tuple tuple = table.createTuple();

      int total = 0;
      int commitCount = 0;

      if(skipHeader)
        
      

      String line;
      while((line = reader.readLine())!=null){

        transformer.fillTuple(line, tuple);
        table.insert(tuple);

        total++;
        commitCount++;

        if(commitCount>=commitSize){

          log.info("Committing at line {}", total);

          session.commit();
          commitCount = 0;
        }
      }

      if(commitCount > 0){

        log.info("Committing last {} lines", commitCount);
        session.commit();
      }
    }

    log.info("Loaded file {}", file);
  }

The total variable lets us log the progress of the total file at each commit.

Putting it all together

The main class receives the URL of the LeanXcale endpoint as well as the paths to the national and territory files through command line arguments, which creates the session, invokes the table creation, and invokes the loader for each file to import the data.

public static void main(String args[]){

    String url = args[0];
    String fileNational = args[1];
    String fileTerritory = args[2];

    String database = "covid";
    String tableNameNat = "data_nat";
    String tableNameTerritory = "data_ccaa";

    Settings settings = new Settings();
    settings.credentials(new Credentials("APP", "APP".toCharArray(), database));

    Transformer transformerNational = new NationalTransformer();
    Loader loaderNational = new Loader(fileNational, tableNameNat, transformerNational);

    Transformer transformerTerritory = new TerritoryTransformer();
    Loader loaderTerritory = new Loader(fileTerritory, tableNameTerritory, transformerTerritory);

    try(Session session = SessionFactory.newSession(url, settings)){

      loaderNational.load(session);
      loaderTerritory.load(session);
    }
    catch (LeanxcaleException | IOException e) {

      log.error("Error loading data: {}",e.getMessage(), e);
    }
  }

We define the tables in the “covid” database within the LeanXcale environment.

Showing the data

You may use your favorite JDBC client to query the tables and verify the data loaded.

lx> select count(*) from DATA_NAT;
+--------+
| EXPR$0 |
+--------+
| 104    |
+--------+
1 row selected (0.889 seconds)

WRITTEN BY


Javier López Moratalla

Development Lead at LeanXcale

After more than 15 years developing, designing, and deploying enterprise software at companies such as Hewlett Packard Enterprise or Gemalto, I’m now part of the LeanXcale team, researching and working on the next generation of databases.

javier@leanxcale.com

https://www.linkedin.com/in/javier-lopez-moratalla/