PolarSPARC

Introduction to Apache Calcite


Bhaskar S 08/18/2024


Overview

Ever wondered what is the one common ingredient behind most of the popular Java big data frameworks, such as Apache Drill, Apache Druid, Apache Flink, Apache Phoenix, Dremio, and others ???

Well, the "ONE" common ingredient amongst all these big data processing frameworks is - Apache Calcite !!!

Apache Calcite is a popular open source Java framework for building and managing data sources using the SQL language. The framework includes an SQL parser/validator, API constructs that support Relational Algebra expressions, and a query planning/optimization engine.

Setup

The setup will be on a Ubuntu 22.04 LTS based Linux desktop. Ensure at least Java 11 or above is installed and setup. Also, ensure Apache Maven is installed and setup.

To setup the Java directory structure for the demonstrations in this article, execute the following commands:


$ cd $HOME

$ mkdir -p $HOME/java/Calcite

$ cd $HOME/java/Calcite

$ mkdir -p src/main/java src/main/resources

$ mkdir -p src/main/java src/main/resources/customers

$ mkdir -p src/main/java/com/polarsparc/calcite


The following is the listing for the Maven project file pom.xml that will be used:


pom.xml
<?xml version="1.0" encoding="UTF-8"?>
  <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.polarsparc</groupId>
    <artifactId>Calcite</artifactId>
    <version>1.0</version>

    <properties>
        <maven.compiler.source>21</maven.compiler.source>
        <maven.compiler.target>21</maven.compiler.target>
        <slf4j.version>2.0.16</slf4j.version>
        <calcite.version>1.37.0</calcite.version>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>${slf4j.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.calcite/calcite-core -->
        <dependency>
            <groupId>org.apache.calcite</groupId>
            <artifactId>calcite-core</artifactId>
            <version>${calcite.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.calcite/calcite-file -->
        <dependency>
            <groupId>org.apache.calcite</groupId>
            <artifactId>calcite-file</artifactId>
            <version>${calcite.version}</version>
        </dependency>
    </dependencies>

</project>

The following is the listing for the slf4j-simple logger properties file simplelogger.properties located in the directory src/main/resources:


simplelogger.properties
#
### SLF4J Simple Logger properties
#

org.slf4j.simpleLogger.defaultLogLevel=info
org.slf4j.simpleLogger.showDateTime=true
org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss:SSS
org.slf4j.simpleLogger.showThreadName=true

Hands-on with Apache Calcite

Before we get started, just one note on the code samples - every piece of code will be self-contained using inner classes for simplicity.

For the first example, we will begin with a demonstration of the SQL parser/validator capability of the Apache Calcite framework.

The following is the Java code for the SQL parsing and validation:


Listing.1
/*
 * Name:   SimpleSqlParser
 * Author: Bhaskar S
 * Date:   08/17/2024
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.calcite;

import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParseException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleSqlParser {
    public static final Logger LOGGER = LoggerFactory.getLogger(SimpleSqlParser.class);

    public static void main(String[] args) {
        String goodQuery = "SELECT f2, f3, f5 FROM tbl WHERE f1 = 1 AND f4 = 'N'";
        String badQuery = "SELECT f2, f3, f5 FROM tbl WHERE f1 = ";

        parseSelectStatement(goodQuery);
        parseSelectStatement(badQuery);
    }

    static void parseSelectStatement(String sql) {
        LOGGER.info("------------------------- [ Start ] -------------------------");

        try {
            SqlParser parser = SqlParser.create(sql, SqlParser.Config.DEFAULT);
            SqlNode node = parser.parseStmt();
            SqlSelect select = (SqlSelect) node;

            LOGGER.info("Query statement:");
            LOGGER.info(select.toString());

            assert select.getFrom() != null;

            LOGGER.info("Select from Table: " + select.getFrom().toString());

            LOGGER.info("List of selected fields:");
            select.getSelectList().forEach(sn -> LOGGER.info("-> {}", sn.toString()));
        }
        catch (SqlParseException e) {
            LOGGER.error(e.getMessage());
        }
        finally {
            LOGGER.info("------------------------- [  End  ] -------------------------");
        }
    }
}

The class org.apache.calcite.sql.parser.SqlParser implements the SQL parsing and validation functionality.

The class SqlParser.Config encapsulates the SQL parser configuration, such as, how to handle case sensitivity, what should be the maximum length of an identifier name, what type of SQL conformance (ANSI, Oracle, MySQL, etc).

The default SQL parser configuration instance SqlParser.Config.DEFAULT encapsulates the ANSI standards.

The method parser.parseStmt() on the SQL parser returns a reference to the root node of the SQL parse tree, which is encapsulated using the class org.apache.calcite.sql.SqlNode.

The class org.apache.calcite.sql.SqlSelect represents a sub-class of SqlNode which corresponds to the SQL SELECT statement.

The method select.getFrom() on an instance of SqlSelect returns the SQL tables referenced in the query.

The method select.getSelectList() on an instance of SqlSelect gives us the collections of SQL table column(s) we desire to query.

To execute the code from Listing.1, open a terminal window and run the following commands:


$ cd $HOME/java/Calcite

$ mvn exec:java -Dexec.mainClass="com.polarsparc.calcite.SimpleSqlParser"


The following would be the typical output:


Output.1

[INFO] Scanning for projects...
[INFO] 
[INFO] -----------------------< com.polarsparc:Calcite >-----------------------
[INFO] Building Calcite 1.0
[INFO]   from pom.xml
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec:3.2.0:java (default-cli) @ Calcite ---
2024-08-18 10:44:39:022 [com.polarsparc.calcite.SimpleSqlParser.main()] INFO com.polarsparc.calcite.SimpleSqlParser - ------------------------- [ Start ] -------------------------
2024-08-18 10:44:39:144 [com.polarsparc.calcite.SimpleSqlParser.main()] INFO com.polarsparc.calcite.SimpleSqlParser - Query statement:
2024-08-18 10:44:39:158 [com.polarsparc.calcite.SimpleSqlParser.main()] INFO com.polarsparc.calcite.SimpleSqlParser - SELECT `F2`, `F3`, `F5`
FROM `TBL`
WHERE `F1` = 1 AND `F4` = 'N'
2024-08-18 10:44:39:159 [com.polarsparc.calcite.SimpleSqlParser.main()] INFO com.polarsparc.calcite.SimpleSqlParser - Select from Table: TBL
2024-08-18 10:44:39:159 [com.polarsparc.calcite.SimpleSqlParser.main()] INFO com.polarsparc.calcite.SimpleSqlParser - List of selected fields:
2024-08-18 10:44:39:159 [com.polarsparc.calcite.SimpleSqlParser.main()] INFO com.polarsparc.calcite.SimpleSqlParser - -> F2
2024-08-18 10:44:39:159 [com.polarsparc.calcite.SimpleSqlParser.main()] INFO com.polarsparc.calcite.SimpleSqlParser - -> F3
2024-08-18 10:44:39:159 [com.polarsparc.calcite.SimpleSqlParser.main()] INFO com.polarsparc.calcite.SimpleSqlParser - -> F5
2024-08-18 10:44:39:159 [com.polarsparc.calcite.SimpleSqlParser.main()] INFO com.polarsparc.calcite.SimpleSqlParser - ------------------------- [  End  ] -------------------------
2024-08-18 10:44:39:159 [com.polarsparc.calcite.SimpleSqlParser.main()] INFO com.polarsparc.calcite.SimpleSqlParser - ------------------------- [ Start ] -------------------------
2024-08-18 10:44:39:168 [com.polarsparc.calcite.SimpleSqlParser.main()] ERROR com.polarsparc.calcite.SimpleSqlParser - Encountered "= <EOF>" at line 1, column 37.
Was expecting one of:
    <EOF> 
    "EXCEPT" ...
    "FETCH" ...
    "GROUP" ...
    "HAVING" ...
    
    [ ... TRIMMED OUTPUT ... ]
    
    "=" "ROW" ...
    "=" "(" ...
    "[" ...
    "IS" ...
    "FORMAT" ...
    "(" ...
2024-08-18 10:44:39:169 [com.polarsparc.calcite.SimpleSqlParser.main()] INFO com.polarsparc.calcite.SimpleSqlParser - ------------------------- [  End  ] -------------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  0.520 s
[INFO] Finished at: 2024-08-18T10:44:39-04:00
[INFO] ------------------------------------------------------------------------

Note that we exception in the Output.1 above was from the bad SQL query.

For the second example, we will demonstrate the ability to query a collection of a Java object type using SQL statements.

The following is the Java code for SQL querying of a collection of Java objects:


Listing.2
/*
 * Name:   SimpleMemDbSql
 * Author: Bhaskar S
 * Date:   08/17/2024
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.calcite;

import org.apache.calcite.adapter.java.ReflectiveSchema;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.Properties;

public class SimpleMemDbSql {
    public static final Logger LOGGER = LoggerFactory.getLogger(SimpleMemDbSql.class);

    static ContactSchema contactSchema = new ContactSchema();

    public static void main(String[] args) {
        CalciteConnection connection = getCalciteConnection();
        if (connection != null) {
            dataSetup();

            schemaSetup(connection, contactSchema);

            String sql = "SELECT c.lastName, c.email, c.mobile FROM cs.contacts AS c";
            queryAll(connection, sql);

            String sql2 = "SELECT c.email, c.mobile FROM cs.contacts AS c WHERE c.lastName = 'Doctor'";
            queryWhere(connection, sql2);

            try {
                connection.close();
            }
            catch (SQLException ignored) {}
        }
    }

    static void dataSetup() {
        contactSchema.contacts = new Contact[] {
            new Contact("Alice", "Doctor", "alice.d@space.com", "123-321-1000"),
            new Contact("Bob", "Carpenter", "bob.c@space.com", "234-432-2000"),
            new Contact("Charlie", "Painter", "charlie_p@space.com", "345-543-3000"),
            new Contact("Donna", "Plumber", "donna.p@space.com", "456-654-4000"),
            new Contact("Eve", "Dentist", "eve_d@space.com", "567-765-5000")
        };
    }

    static CalciteConnection getCalciteConnection() {
        Properties props = new Properties();
        props.setProperty("lex", "JAVA");

        CalciteConnection calciteConn = null;

        try {
            Connection conn = DriverManager.getConnection("jdbc:calcite:", props);
            calciteConn = conn.unwrap(CalciteConnection.class);
        }
        catch (SQLException e) {
            LOGGER.error(e.getMessage());
        }

        LOGGER.info("Calcite connection created !!!");

        return calciteConn;
    }

    static void schemaSetup(CalciteConnection connection, ContactSchema contactSchema) {
        SchemaPlus rootSchema = connection.getRootSchema();
        Schema schema = new ReflectiveSchema(contactSchema);

        LOGGER.info("Tables names: {}", schema.getTableNames());

        rootSchema.add("cs", schema);

        LOGGER.info("Calcite schema setup !!!");
    }

    static void queryAll(CalciteConnection connection, String sql) {
        try {
            LOGGER.info("Executing queryAll: {}", sql);

            Statement stmt = connection.createStatement();
            ResultSet rs = stmt.executeQuery(sql);
            while (rs.next()) {
                LOGGER.info("Last name: {}, Email: {}, Mobile: {}",
                        rs.getString(1),
                        rs.getString(2),
                        rs.getString(3));
            }
            rs.close();
            stmt.close();
        }
        catch (SQLException e) {
            LOGGER.error(e.getMessage());
        }
    }

    static void queryWhere(CalciteConnection connection, String sql) {
        try {
            LOGGER.info("Executing queryWhere: {}", sql);

            Statement stmt = connection.createStatement();
            ResultSet rs = stmt.executeQuery(sql);
            while (rs.next()) {
                LOGGER.info("Email: {}, Mobile: {}",
                        rs.getString(1),
                        rs.getString(2));
            }
            rs.close();
            stmt.close();
        }
        catch (SQLException e) {
            LOGGER.error(e.getMessage());
        }
    }

    // The ReflectiveSchema requires that all the fields are public

    static public class ContactSchema {
        public Contact[] contacts;
    }

    static public class Contact {
        public String firstName;
        public String lastName;
        public String email;
        public String mobile;

        public Contact(String firstName, String lastName, String email, String mobile) {
            this.firstName = firstName;
            this.lastName = lastName;
            this.email = email;
            this.mobile = mobile;
        }
    }
}

The interface org.apache.calcite.schema.Schema defines the SQL namespace for storing tables.

The interface org.apache.calcite.schema.SchemaPlus extends the interface Schema to enable users to add and access user-defined tables.

The class org.apache.calcite.adapter.java.ReflectiveSchema is a concrete implementation of the interface Schema that allows access to the public fields of a Java object in memory as columns of a SQL table.

The class org.apache.calcite.jdbc.CalciteConnection is the frameworks implementation of the Java JDBC connection that allows one to dynamically define schemas at runtime.

The method select.getFrom() on an instance of SqlSelect returns the SQL tables referenced in the query.

The method select.getSelectList() on an instance of SqlSelect gives us the collections of SQL table column(s) we desire to query.

To execute the code from Listing.2, open a terminal window and run the following commands:


$ cd $HOME/java/Calcite

$ mvn exec:java -Dexec.mainClass="com.polarsparc.calcite.SimpleMemDbSql"


The following would be the interaction and the corresponding output:


Output.2

[INFO] Scanning for projects...
[INFO] 
[INFO] -----------------------< com.polarsparc:Calcite >-----------------------
[INFO] Building Calcite 1.0
[INFO]   from pom.xml
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec:3.2.0:java (default-cli) @ Calcite ---
2024-08-18 12:17:51:983 [com.polarsparc.calcite.SimpleMemDbSql.main()] INFO com.polarsparc.calcite.SimpleMemDbSql - Calcite connection created !!!
2024-08-18 12:17:51:989 [com.polarsparc.calcite.SimpleMemDbSql.main()] INFO com.polarsparc.calcite.SimpleMemDbSql - Tables names: [contacts]
2024-08-18 12:17:51:989 [com.polarsparc.calcite.SimpleMemDbSql.main()] INFO com.polarsparc.calcite.SimpleMemDbSql - Calcite schema setup !!!
2024-08-18 12:17:51:989 [com.polarsparc.calcite.SimpleMemDbSql.main()] INFO com.polarsparc.calcite.SimpleMemDbSql - Executing queryAll: SELECT c.lastName, c.email, c.mobile FROM cs.contacts AS c
2024-08-18 12:17:52:708 [com.polarsparc.calcite.SimpleMemDbSql.main()] INFO com.polarsparc.calcite.SimpleMemDbSql - Last name: Doctor, Email: alice.d@space.com, Mobile: 123-321-1000
2024-08-18 12:17:52:708 [com.polarsparc.calcite.SimpleMemDbSql.main()] INFO com.polarsparc.calcite.SimpleMemDbSql - Last name: Carpenter, Email: bob.c@space.com, Mobile: 234-432-2000
2024-08-18 12:17:52:708 [com.polarsparc.calcite.SimpleMemDbSql.main()] INFO com.polarsparc.calcite.SimpleMemDbSql - Last name: Painter, Email: charlie_p@space.com, Mobile: 345-543-3000
2024-08-18 12:17:52:708 [com.polarsparc.calcite.SimpleMemDbSql.main()] INFO com.polarsparc.calcite.SimpleMemDbSql - Last name: Plumber, Email: donna.p@space.com, Mobile: 456-654-4000
2024-08-18 12:17:52:708 [com.polarsparc.calcite.SimpleMemDbSql.main()] INFO com.polarsparc.calcite.SimpleMemDbSql - Last name: Dentist, Email: eve_d@space.com, Mobile: 567-765-5000
2024-08-18 12:17:52:708 [com.polarsparc.calcite.SimpleMemDbSql.main()] INFO com.polarsparc.calcite.SimpleMemDbSql - Executing queryWhere: SELECT c.email, c.mobile FROM cs.contacts AS c WHERE c.lastName = 'Doctor'
2024-08-18 12:17:52:781 [com.polarsparc.calcite.SimpleMemDbSql.main()] INFO com.polarsparc.calcite.SimpleMemDbSql - Email: alice.d@space.com, Mobile: 123-321-1000
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.317 s
[INFO] Finished at: 2024-08-18T12:17:52-04:00
[INFO] ------------------------------------------------------------------------

BAM !!! We have successfully demonstrated the capability of accessing a collection of Java objects in memory using the SQL syntax.

For the third example, we will demonstrate the ability to join and query a collection of a Java objects of two different types using SQL statements.

The following is the Java code for the SQL join query of a collection of two different Java objects in memory:


Listing.3
/*
 * Name:   SimpleMemDbJoinSql
 * Author: Bhaskar S
 * Date:   08/17/2024
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.calcite;

import org.apache.calcite.adapter.java.ReflectiveSchema;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.Properties;

public class SimpleMemDbJoinSql {
    public static final Logger LOGGER = LoggerFactory.getLogger(SimpleMemDbJoinSql.class);

    static PublicationSchema publicationSchema = new PublicationSchema();

    public static void main(String[] args) {
        CalciteConnection connection = getCalciteConnection();
        if (connection != null) {
            dataSetup();

            schemaSetup(connection, publicationSchema);

            String sql = "SELECT a.lastName, b.title, b.price FROM " +
                    "pub.authors AS a JOIN pub.books AS b ON a.id = b.authorId WHERE a.id = 2";
            queryJoin(connection, sql);

            try {
                connection.close();
            }
            catch (SQLException ignored) {}
        }
    }

    static void dataSetup() {
        publicationSchema.authors = new Author[] {
            new Author(1, "Alice", "Doctor"),
            new Author(2, "Bob", "Carpenter"),
            new Author(3, "Charlie", "Painter"),
        };

        publicationSchema.books = new Book[] {
            new Book("111", "Guide to Common Cold", 19.99f, 1),
            new Book("222", "How to Build Decks", 15.49f, 2),
            new Book("333", "Awesome Room Colors", 9.99f, 3),
            new Book("444", "How to Treat Cough", 17.49f, 1),
            new Book("555", "Fixing Wall Cracks", 11.99f, 2)
        };
    }

    static CalciteConnection getCalciteConnection() {
        Properties props = new Properties();
        props.setProperty("lex", "JAVA");

        CalciteConnection calciteConn = null;

        try {
            Connection conn = DriverManager.getConnection("jdbc:calcite:", props);
            calciteConn = conn.unwrap(CalciteConnection.class);
        }
        catch (SQLException e) {
            LOGGER.error(e.getMessage());
        }

        LOGGER.info("Calcite connection created !!!");

        return calciteConn;
    }

    static void schemaSetup(CalciteConnection connection, PublicationSchema publicationSchema) {
        SchemaPlus rootSchema = connection.getRootSchema();
        Schema schema = new ReflectiveSchema(publicationSchema);

        LOGGER.info("Tables names: {}", schema.getTableNames());

        rootSchema.add("pub", schema);

        LOGGER.info("Calcite schema setup !!!");
    }

    static void queryJoin(CalciteConnection connection, String sql) {
        try {
            LOGGER.info("Executing queryAll: {}", sql);

            Statement stmt = connection.createStatement();
            ResultSet rs = stmt.executeQuery(sql);
            while (rs.next()) {
                LOGGER.info("Author last name: {}, Book title: {}, Book price: {}",
                        rs.getString(1),
                        rs.getString(2),
                        rs.getString(3));
            }
            rs.close();
            stmt.close();
        }
        catch (SQLException e) {
            LOGGER.error(e.getMessage());
        }
    }

    // The ReflectiveSchema requires that all the fields are public

    static public class PublicationSchema {
        public Author[] authors;
        public Book[] books;
    }

    static public class Book {
        public String isbn;
        public String title;
        public float price;
        public int authorId;

        public Book(String isbn, String title, float price, int authorId) {
            this.isbn = isbn;
            this.title = title;
            this.price = price;
            this.authorId = authorId;
        }
    }

    static public class Author {
        public int id;
        public String firstName;
        public String lastName;

        public Author(int id, String firstName, String lastName) {
            this.id = id;
            this.firstName = firstName;
            this.lastName = lastName;
        }
    }
}

Note that the Book and Author objects are connected via a common ID.

To execute the code from Listing.3, open a terminal window and run the following commands:


$ cd $HOME/java/Calcite

$ mvn exec:java -Dexec.mainClass="com.polarsparc.calcite.SimpleMemDbJoinSql"


The following would be the typical output:


Output.3

[INFO] Scanning for projects...
[INFO] 
[INFO] -----------------------< com.polarsparc:Calcite >-----------------------
[INFO] Building Calcite 1.0
[INFO]   from pom.xml
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec:3.2.0:java (default-cli) @ Calcite ---
2024-08-18 12:57:00:974 [com.polarsparc.calcite.SimpleMemDbJoinSql.main()] INFO com.polarsparc.calcite.SimpleMemDbJoinSql - Calcite connection created !!!
2024-08-18 12:57:00:979 [com.polarsparc.calcite.SimpleMemDbJoinSql.main()] INFO com.polarsparc.calcite.SimpleMemDbJoinSql - Tables names: [authors, books]
2024-08-18 12:57:00:979 [com.polarsparc.calcite.SimpleMemDbJoinSql.main()] INFO com.polarsparc.calcite.SimpleMemDbJoinSql - Calcite schema setup !!!
2024-08-18 12:57:00:979 [com.polarsparc.calcite.SimpleMemDbJoinSql.main()] INFO com.polarsparc.calcite.SimpleMemDbJoinSql - Executing queryAll: SELECT a.lastName, b.title, b.price FROM pub.authors AS a JOIN pub.books AS b ON a.id = b.authorId WHERE a.id = 2
2024-08-18 12:57:01:704 [com.polarsparc.calcite.SimpleMemDbJoinSql.main()] INFO com.polarsparc.calcite.SimpleMemDbJoinSql - Author last name: Carpenter, Book title: How to Build Decks, Book price: 15.49
2024-08-18 12:57:01:704 [com.polarsparc.calcite.SimpleMemDbJoinSql.main()] INFO com.polarsparc.calcite.SimpleMemDbJoinSql - Author last name: Carpenter, Book title: Fixing Wall Cracks, Book price: 11.99
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.222 s
[INFO] Finished at: 2024-08-18T12:57:01-04:00
[INFO] ------------------------------------------------------------------------

BINGO !!! We have successfully demonstrated how one could perform a SQL join query on a collection of two different Java objects in memory.

One of the limitations of the previous approach of accessing in-memory objects is that the objects MUST expose the fields as public members. This is in direct conflict with the rules of the object-oriented programming principles (an reality).

For the fourth example, we will demonstrate the ability to query a collection of a Java objects using SQL statements by implementing custom schemas and tables to overcome the above mentioned limitations.

The following is the Java code for the SQL query of a collection of Java objects through a custom schema and table that hosts the objects:


Listing.4
/*
 * Name:   SimpleCustomDbSql
 * Author: Bhaskar S
 * Date:   08/17/2024
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.calcite;

import org.apache.calcite.DataContext;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Linq4j;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.type.SqlTypeName;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.*;
import java.util.stream.Stream;

public class SimpleCustomDbSql {
    public static final Logger LOGGER = LoggerFactory.getLogger(SimpleCustomDbSql.class);

    public static void main(String[] args) {
        CalciteConnection connection = getCalciteConnection();
        if (connection != null) {
            schemaSetup(connection);

            String sql = "SELECT d.item, d.store, d.discount FROM dl.deals AS d";
            queryAll(connection, sql);

            try {
                connection.close();
            }
            catch (SQLException ignored) {}
        }
    }

    static CalciteConnection getCalciteConnection() {
        Properties props = new Properties();
        props.setProperty("lex", "JAVA");

        CalciteConnection calciteConn = null;

        try {
            Connection conn = DriverManager.getConnection("jdbc:calcite:", props);
            calciteConn = conn.unwrap(CalciteConnection.class);
        }
        catch (SQLException e) {
            LOGGER.error(e.getMessage());
        }

        LOGGER.info("Calcite connection created !!!");

        return calciteConn;
    }

    static void schemaSetup(CalciteConnection connection) {
        SchemaPlus rootSchema = connection.getRootSchema();
        Schema schema = new DealsCustomSchema();

        LOGGER.info("Tables names: {}", schema.getTableNames());

        rootSchema.add("dl", schema);

        LOGGER.info("Calcite schema setup !!!");
    }

    static void queryAll(CalciteConnection connection, String sql) {
        try {
            LOGGER.info("Executing queryAll: {}", sql);

            Statement stmt = connection.createStatement();
            ResultSet rs = stmt.executeQuery(sql);
            while (rs.next()) {
                LOGGER.info("Last name: {}, Email: {}, Mobile: {}",
                        rs.getString(1),
                        rs.getString(2),
                        rs.getString(3));
            }
            rs.close();
            stmt.close();
        }
        catch (SQLException e) {
            LOGGER.error(e.getMessage());
        }
    }

    // Represents the data table in-memory
    static class DealsCustomTable extends AbstractTable implements ScannableTable {
        private final List<String> fieldNames = new ArrayList<>();
        private final List<SqlTypeName> fieldTypes = new ArrayList<>();

        private final List<Deal> dealsData = new ArrayList<>();

        public DealsCustomTable() {
            fieldNames.add("item");
            fieldNames.add("store");
            fieldNames.add("price");
            fieldNames.add("discount");

            fieldTypes.add(SqlTypeName.VARCHAR);
            fieldTypes.add(SqlTypeName.VARCHAR);
            fieldTypes.add(SqlTypeName.FLOAT);
            fieldTypes.add(SqlTypeName.INTEGER);

            dataSetup();
        }

        private void dataSetup() {
            dealsData.add(new Deal("iPhone 15 Pro", "Costco", 639.99f, 20));
            dealsData.add(new Deal("iPad 10.9 10th Gen", "BestBuy", 349.99f, 5));
            dealsData.add(new Deal("iWatch Series 9", "Target", 329.99f, 15));
        }

        // Return an enumerator to the stream of data rows (as individual column objects)
        @Override
        public Enumerable<Object[]> scan(DataContext context) {
            Stream<Object[]> fieldsStream = dealsData.stream().map(Deal::toObjectArray);
            return Linq4j.asEnumerable(fieldsStream.toList());
        }

        // Return the fields names and data types for a row in the table
        @Override
        public RelDataType getRowType(RelDataTypeFactory typeFactory) {
            List<RelDataType> dataTypes = fieldTypes.stream()
                    .map(typeFactory::createSqlType)
                    .toList();
            return typeFactory.createStructType(dataTypes, fieldNames);
        }
    }

    // Represents the schema that holds table(s) in-memory
    static class DealsCustomSchema extends AbstractSchema {
        @Override
        protected Map<String, Table> getTableMap() {
            return Collections.singletonMap("deals", new DealsCustomTable());
        }
    }

    // Represents a row of the table in-memory
    record Deal(String item, String store, float price, int discount) {
        public Object[] toObjectArray() {
            Object[] fields = new Object[4];
            fields[0] = this.item;
            fields[1] = this.store;
            fields[2] = this.price;
            fields[3] = this.discount;
            return fields;
        }
    }
}

The class org.apache.calcite.schema.impl.AbstractTable is the base class for implementing a custom table. It typically has references to the field names and their data types.

The class org.apache.calcite.schema.impl.AbstractSchema is the base class for implementing a custom schema. As indicated earlier, a schema is basically a namespace to host user tables.

To execute the code from Listing.4, open a terminal window and run the following commands:


$ cd $HOME/java/Calcite

$ mvn exec:java -Dexec.mainClass="com.polarsparc.calcite.SimpleCustomDbSql"


The following would be the typical output:


Output.4

[INFO] Scanning for projects...
[INFO] 
[INFO] -----------------------< com.polarsparc:Calcite >-----------------------
[INFO] Building Calcite 1.0
[INFO]   from pom.xml
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec:3.2.0:java (default-cli) @ Calcite ---
2024-08-18 13:25:25:054 [com.polarsparc.calcite.SimpleCustomDbSql.main()] INFO com.polarsparc.calcite.SimpleCustomDbSql - Calcite connection created !!!
2024-08-18 13:25:25:059 [com.polarsparc.calcite.SimpleCustomDbSql.main()] INFO com.polarsparc.calcite.SimpleCustomDbSql - Tables names: [deals]
2024-08-18 13:25:25:059 [com.polarsparc.calcite.SimpleCustomDbSql.main()] INFO com.polarsparc.calcite.SimpleCustomDbSql - Calcite schema setup !!!
2024-08-18 13:25:25:059 [com.polarsparc.calcite.SimpleCustomDbSql.main()] INFO com.polarsparc.calcite.SimpleCustomDbSql - Executing queryAll: SELECT d.item, d.store, d.discount FROM dl.deals AS d
2024-08-18 13:25:25:721 [com.polarsparc.calcite.SimpleCustomDbSql.main()] INFO com.polarsparc.calcite.SimpleCustomDbSql - Last name: iPhone 15 Pro, Email: Costco, Mobile: 20
2024-08-18 13:25:25:722 [com.polarsparc.calcite.SimpleCustomDbSql.main()] INFO com.polarsparc.calcite.SimpleCustomDbSql - Last name: iPad 10.9 10th Gen, Email: BestBuy, Mobile: 5
2024-08-18 13:25:25:722 [com.polarsparc.calcite.SimpleCustomDbSql.main()] INFO com.polarsparc.calcite.SimpleCustomDbSql - Last name: iWatch Series 9, Email: Target, Mobile: 15
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.173 s
[INFO] Finished at: 2024-08-18T13:25:25-04:00
[INFO] ------------------------------------------------------------------------

WALLA !!! We have successfully demonstrated how one could perform a SQL query on a collection of Java objects hosted on a custom schema and table in memory.

For the last example, we will demonstrate how one could access the data in a CSV file using an SQL query.

The following are the contents of the CSV file called CUSTOMERS.csv located in the directory src/main/resources/customers:


CUSTOMERS.csv
cid:int,name:string,email:string,mobile:string
1,"Alice Doctor","alice.d@space.com","123-321-1000"
2,"Bob Carpenter","bob.c@space.com","234-432-2000"
3,"Charlie Painter", "charlie_p@space.com", "345-543-3000"

The following are the contents of the model file called model.json located in the directory src/main/resources:


model.json
{
  "version": "1.0",
  "defaultSchema": "customers",
  "schemas": [
    {
      "name": "customers",
      "type": "custom",
      "factory": "org.apache.calcite.adapter.file.FileSchemaFactory",
      "operand": {
        "directory": "customers"
      }
    }
  ]
}

The following is the Java code for accessing the contents of the CSV file using an SQL statement:


Listing.5
/*
 * Name:   SimpleFileSql
 * Author: Bhaskar S
 * Date:   08/17/2024
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.calcite;

import org.apache.calcite.jdbc.CalciteConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;

public class SimpleFileSql {
    public static final Logger LOGGER = LoggerFactory.getLogger(SimpleFileSql.class);

    public static void main(String[] args) {
        CalciteConnection connection = getCalciteConnection();
        if (connection != null) {
            // The column names *MUST* be enclosed in double quotes
            String sql = "SELECT \"name\", \"email\", \"mobile\" FROM customers";
            queryAll(connection, sql);

            try {
                connection.close();
            }
            catch (SQLException ignored) {}
        }
    }

    static CalciteConnection getCalciteConnection() {
        CalciteConnection calciteConn = null;

        try {
            Connection conn = DriverManager.getConnection("jdbc:calcite:model=target/classes/model.json");
            calciteConn = conn.unwrap(CalciteConnection.class);
        }
        catch (SQLException e) {
            LOGGER.error(e.getMessage());
        }

        LOGGER.info("Calcite connection created !!!");

        return calciteConn;
    }

    static void queryAll(Connection connection, String sql) {
        try {
            LOGGER.info("Executing queryAll: {}", sql);

            Statement stmt = connection.createStatement();
            ResultSet rs = stmt.executeQuery(sql);
            while (rs.next()) {
                LOGGER.info("Name: {}, Email: {}, Mobile: {}",
                        rs.getString(1),
                        rs.getString(2),
                        rs.getString(3));
            }
            rs.close();
            stmt.close();
        }
        catch (SQLException e) {
            LOGGER.error(e.getMessage());
        }
    }
}

To execute the code from Listing.5, open a terminal window and run the following commands:


$ cd $HOME/java/Calcite

$ mvn exec:java -Dexec.mainClass="com.polarsparc.calcite.SimpleFileSql"


The following would be the typical output:


Output.5

[INFO] Scanning for projects...
[INFO] 
[INFO] -----------------------< com.polarsparc:Calcite >-----------------------
[INFO] Building Calcite 1.0
[INFO]   from pom.xml
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec:3.2.0:java (default-cli) @ Calcite ---
2024-08-18 13:47:07:889 [com.polarsparc.calcite.SimpleFileSql.main()] INFO com.polarsparc.calcite.SimpleFileSql - Calcite connection created !!!
2024-08-18 13:47:07:889 [com.polarsparc.calcite.SimpleFileSql.main()] INFO com.polarsparc.calcite.SimpleFileSql - Executing queryAll: SELECT "name", "email", "mobile" FROM customers
2024-08-18 13:47:08:532 [com.polarsparc.calcite.SimpleFileSql.main()] INFO com.polarsparc.calcite.SimpleFileSql - Name: Alice Doctor, Email: alice.d@space.com, Mobile: 123-321-1000
2024-08-18 13:47:08:532 [com.polarsparc.calcite.SimpleFileSql.main()] INFO com.polarsparc.calcite.SimpleFileSql - Name: Bob Carpenter, Email: bob.c@space.com, Mobile: 234-432-2000
2024-08-18 13:47:08:532 [com.polarsparc.calcite.SimpleFileSql.main()] INFO com.polarsparc.calcite.SimpleFileSql - Name: Charlie Painter, Email: charlie_p@space.com, Mobile: 345-543-3000
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.235 s
[INFO] Finished at: 2024-08-18T13:47:08-04:00
[INFO] ------------------------------------------------------------------------

YAHOO !!! We have successfully demonstrated how one could access contents of a CSV file using an SQL query.


References

GitHub :: Source Code

Apache Calcite

Apache Calcite API



© PolarSPARC