Module tiresias.client.storage

This module provides functions for storing and retrieving data from a the underlying SQLite databases.

Expand source code
"""
This module provides functions for storing and retrieving data from a the 
underlying SQLite databases.
"""
import os
import sqlite3
from random import random

def dict_factory(cursor, row):
    """
    This function extracts the column names from the SQLite cursor and returns
    a dictionary version of the row.
    """
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d

def initialize(storage_dir):
    """
    This function creates the data directory if necessary and initializes the 
    metadata database.
    """
    if not os.path.exists(storage_dir):
        os.makedirs(storage_dir)
    connection = sqlite3.connect(os.path.join(storage_dir, "metadata.db"))
    cursor = connection.cursor()
    cursor.executescript("""
        CREATE TABLE IF NOT EXISTS apps (app_name);
        CREATE TABLE IF NOT EXISTS app_tables (
            app_name, 
            table_name, 
            table_description
        );
        CREATE TABLE IF NOT EXISTS app_columns (
            app_name, 
            table_name, 
            column_name, 
            column_type, 
            column_description
        );
    """)
    cursor.close()
    connection.close()

def app_columns(storage_dir):
    """
    Return a list of dictionaries describing the columns that are being collected.
    """
    connection = sqlite3.connect(os.path.join(storage_dir, "metadata.db"))
    connection.row_factory = dict_factory

    try:
        cursor = connection.cursor()
        cursor.execute("SELECT * FROM app_columns")
        rows = cursor.fetchall()
    finally:
        cursor.close()
        connection.close()

    return rows

def execute_sql(storage_dir, sql):
    """
    Execute the given query on the database, using "<app_name>.<table_name>" to specify tables.
    """
    connection = sqlite3.connect(os.path.join(storage_dir, "metadata.db"))
    connection.row_factory = dict_factory
    cursor = connection.cursor()

    try:
        cursor.execute("SELECT * FROM apps")
        for app in cursor.fetchall():
            cursor.execute("ATTACH DATABASE '%s' AS %s" % (os.path.join(storage_dir, app["app_name"] + ".db"), app["app_name"]))

        cursor.execute(sql)
        rows = cursor.fetchall()
    finally:
        cursor.close()
        connection.close()

    return rows

def validate_schema(schema):
    assert type(schema) == dict, "Expected schema to be a dictionary."
    for table_name, table in schema.items():
        assert type(table_name) == str, "Expected schema to be a str -> dict mapping"
        assert type(table) == dict, "Expected schema to be a str -> dict mapping"
        assert "description" in table, "Expected table to contain a description"
        assert "columns" in table, "Expected table to contain columns"
        for column_name, column in table["columns"].items():
            assert type(column_name) == str, "Expected each column to have a string name"
            assert "type" in column, "Expected column to contain a type"
            assert "description" in column, "Expected column to contain a description"
            assert column["type"] in ["int", "string", "float"], "Expected type to be in {int, string, float}"
            assert type(column["description"]) == str, "Expected description to be a string"

def register_app(storage_dir, app_name, schema):
    """
    Register an application, which is defined by an app_name and a schema, and create the 
    appropriate tables in the database. For example, a schema which defines two tables containing 
    one and two columns, respectively, is shown here:

    ```
    schema = {
        "tableX": {
            "description": "This table contains X.",
            "columns": {
                "column1": {
                    "type": "float",
                    "description: "This column contains 1."
                }
            }
        },
        "tableY": {
            "description": "This table contains Y.",
            "columns": {
                "column1": {
                    "type": "float",
                    "description: "This column contains 1."
                },
                "column2": {
                    "type": "string",
                    "description: "This column contains 2."
                }
            }
        }
    }
    ```
    """
    # Validate the schema
    validate_schema(schema)

    # Update the metadata database
    connection = sqlite3.connect(os.path.join(storage_dir, "metadata.db"))
    cursor = connection.cursor()
    try:
        cursor.execute("SELECT * FROM apps WHERE app_name = ?", (app_name,))
        if cursor.fetchone():
            return ValueError("App already exists: %s" % app_name)
        cursor.execute("INSERT INTO apps VALUES (?)", (app_name,))
        for table_name, table in schema.items():
            cursor.execute("INSERT INTO app_tables VALUES (?, ?, ?)", (app_name, table_name, table["description"]))
            for column_name, column in table["columns"].items():
                cursor.execute("INSERT INTO app_columns VALUES (?, ?, ?, ?, ?)", (app_name, table_name, column_name, column["type"], column["description"]))
        cursor.close()
        connection.commit()
    finally:
        connection.close()

    # Create the app database
    try:
        connection = sqlite3.connect(os.path.join(storage_dir, "%s.db" % app_name))
        cursor = connection.cursor()
        for table_name, table in schema.items():
            cursor.execute("CREATE TABLE %s (%s)" % (table_name, ",".join(table["columns"].keys())))
        cursor.close()
        connection.commit()
    except:
        raise
    finally:
        connection.close()

def validate_payload(payload):
    assert type(payload) == dict, "Expected payload to be a dictionary."
    for table_name, rows in payload.items():
        assert type(table_name) == str, "Expected payload to be a str -> [] mapping"
        assert type(rows) == list, "Expected payload to be a str -> [] mapping"
        for row in rows:
            assert type(row) == dict, "Expected each row in the update to be a dictionary"

def insert_payload(storage_dir, app_name, payload):
    """
    The entire payload is considered a single transaction; for example, an update which inserts two 
    rows into `tableX` and one row into `tableY` is shown below:

    ```
    payload = {
        "tableX": [
            {"column1": 0.0},
            {"column1": 0.0},
        ],
        "tableY": [{
            {"column1": 0.0, "column2": "hello"}
        }]
    }
    ```
    """
    assert os.path.exists(os.path.join(storage_dir, "%s.db" % app_name)), "App not found."
    connection = sqlite3.connect(os.path.join(storage_dir, "%s.db" % app_name))
    cursor = connection.cursor()

    try:
        for table_name, rows in payload.items():
            for row in rows:
                keys, values = [], []
                for k, v in row.items():
                    keys.append(k)
                    values.append(v)
                
                columns = ', '.join(keys)
                placeholders = ', '.join('?' * len(keys))
                sql = 'INSERT INTO {} ({}) VALUES ({})'.format(table_name, columns, placeholders)
                cursor.execute(sql, values)
        connection.commit()
    finally:
        connection.close()

Functions

def app_columns(storage_dir)

Return a list of dictionaries describing the columns that are being collected.

Expand source code
def app_columns(storage_dir):
    """
    Return a list of dictionaries describing the columns that are being collected.
    """
    connection = sqlite3.connect(os.path.join(storage_dir, "metadata.db"))
    connection.row_factory = dict_factory

    try:
        cursor = connection.cursor()
        cursor.execute("SELECT * FROM app_columns")
        rows = cursor.fetchall()
    finally:
        cursor.close()
        connection.close()

    return rows
def dict_factory(cursor, row)

This function extracts the column names from the SQLite cursor and returns a dictionary version of the row.

Expand source code
def dict_factory(cursor, row):
    """
    This function extracts the column names from the SQLite cursor and returns
    a dictionary version of the row.
    """
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d
def execute_sql(storage_dir, sql)

Execute the given query on the database, using "." to specify tables.

Expand source code
def execute_sql(storage_dir, sql):
    """
    Execute the given query on the database, using "<app_name>.<table_name>" to specify tables.
    """
    connection = sqlite3.connect(os.path.join(storage_dir, "metadata.db"))
    connection.row_factory = dict_factory
    cursor = connection.cursor()

    try:
        cursor.execute("SELECT * FROM apps")
        for app in cursor.fetchall():
            cursor.execute("ATTACH DATABASE '%s' AS %s" % (os.path.join(storage_dir, app["app_name"] + ".db"), app["app_name"]))

        cursor.execute(sql)
        rows = cursor.fetchall()
    finally:
        cursor.close()
        connection.close()

    return rows
def initialize(storage_dir)

This function creates the data directory if necessary and initializes the metadata database.

Expand source code
def initialize(storage_dir):
    """
    This function creates the data directory if necessary and initializes the 
    metadata database.
    """
    if not os.path.exists(storage_dir):
        os.makedirs(storage_dir)
    connection = sqlite3.connect(os.path.join(storage_dir, "metadata.db"))
    cursor = connection.cursor()
    cursor.executescript("""
        CREATE TABLE IF NOT EXISTS apps (app_name);
        CREATE TABLE IF NOT EXISTS app_tables (
            app_name, 
            table_name, 
            table_description
        );
        CREATE TABLE IF NOT EXISTS app_columns (
            app_name, 
            table_name, 
            column_name, 
            column_type, 
            column_description
        );
    """)
    cursor.close()
    connection.close()
def insert_payload(storage_dir, app_name, payload)

The entire payload is considered a single transaction; for example, an update which inserts two rows into tableX and one row into tableY is shown below:

payload = {
    "tableX": [
        {"column1": 0.0},
        {"column1": 0.0},
    ],
    "tableY": [{
        {"column1": 0.0, "column2": "hello"}
    }]
}
Expand source code
def insert_payload(storage_dir, app_name, payload):
    """
    The entire payload is considered a single transaction; for example, an update which inserts two 
    rows into `tableX` and one row into `tableY` is shown below:

    ```
    payload = {
        "tableX": [
            {"column1": 0.0},
            {"column1": 0.0},
        ],
        "tableY": [{
            {"column1": 0.0, "column2": "hello"}
        }]
    }
    ```
    """
    assert os.path.exists(os.path.join(storage_dir, "%s.db" % app_name)), "App not found."
    connection = sqlite3.connect(os.path.join(storage_dir, "%s.db" % app_name))
    cursor = connection.cursor()

    try:
        for table_name, rows in payload.items():
            for row in rows:
                keys, values = [], []
                for k, v in row.items():
                    keys.append(k)
                    values.append(v)
                
                columns = ', '.join(keys)
                placeholders = ', '.join('?' * len(keys))
                sql = 'INSERT INTO {} ({}) VALUES ({})'.format(table_name, columns, placeholders)
                cursor.execute(sql, values)
        connection.commit()
    finally:
        connection.close()
def random(...)

random() -> x in the interval [0, 1).

def register_app(storage_dir, app_name, schema)

Register an application, which is defined by an app_name and a schema, and create the appropriate tables in the database. For example, a schema which defines two tables containing one and two columns, respectively, is shown here:

schema = {
    "tableX": {
        "description": "This table contains X.",
        "columns": {
            "column1": {
                "type": "float",
                "description: "This column contains 1."
            }
        }
    },
    "tableY": {
        "description": "This table contains Y.",
        "columns": {
            "column1": {
                "type": "float",
                "description: "This column contains 1."
            },
            "column2": {
                "type": "string",
                "description: "This column contains 2."
            }
        }
    }
}
Expand source code
def register_app(storage_dir, app_name, schema):
    """
    Register an application, which is defined by an app_name and a schema, and create the 
    appropriate tables in the database. For example, a schema which defines two tables containing 
    one and two columns, respectively, is shown here:

    ```
    schema = {
        "tableX": {
            "description": "This table contains X.",
            "columns": {
                "column1": {
                    "type": "float",
                    "description: "This column contains 1."
                }
            }
        },
        "tableY": {
            "description": "This table contains Y.",
            "columns": {
                "column1": {
                    "type": "float",
                    "description: "This column contains 1."
                },
                "column2": {
                    "type": "string",
                    "description: "This column contains 2."
                }
            }
        }
    }
    ```
    """
    # Validate the schema
    validate_schema(schema)

    # Update the metadata database
    connection = sqlite3.connect(os.path.join(storage_dir, "metadata.db"))
    cursor = connection.cursor()
    try:
        cursor.execute("SELECT * FROM apps WHERE app_name = ?", (app_name,))
        if cursor.fetchone():
            return ValueError("App already exists: %s" % app_name)
        cursor.execute("INSERT INTO apps VALUES (?)", (app_name,))
        for table_name, table in schema.items():
            cursor.execute("INSERT INTO app_tables VALUES (?, ?, ?)", (app_name, table_name, table["description"]))
            for column_name, column in table["columns"].items():
                cursor.execute("INSERT INTO app_columns VALUES (?, ?, ?, ?, ?)", (app_name, table_name, column_name, column["type"], column["description"]))
        cursor.close()
        connection.commit()
    finally:
        connection.close()

    # Create the app database
    try:
        connection = sqlite3.connect(os.path.join(storage_dir, "%s.db" % app_name))
        cursor = connection.cursor()
        for table_name, table in schema.items():
            cursor.execute("CREATE TABLE %s (%s)" % (table_name, ",".join(table["columns"].keys())))
        cursor.close()
        connection.commit()
    except:
        raise
    finally:
        connection.close()
def validate_payload(payload)
Expand source code
def validate_payload(payload):
    assert type(payload) == dict, "Expected payload to be a dictionary."
    for table_name, rows in payload.items():
        assert type(table_name) == str, "Expected payload to be a str -> [] mapping"
        assert type(rows) == list, "Expected payload to be a str -> [] mapping"
        for row in rows:
            assert type(row) == dict, "Expected each row in the update to be a dictionary"
def validate_schema(schema)
Expand source code
def validate_schema(schema):
    assert type(schema) == dict, "Expected schema to be a dictionary."
    for table_name, table in schema.items():
        assert type(table_name) == str, "Expected schema to be a str -> dict mapping"
        assert type(table) == dict, "Expected schema to be a str -> dict mapping"
        assert "description" in table, "Expected table to contain a description"
        assert "columns" in table, "Expected table to contain columns"
        for column_name, column in table["columns"].items():
            assert type(column_name) == str, "Expected each column to have a string name"
            assert "type" in column, "Expected column to contain a type"
            assert "description" in column, "Expected column to contain a description"
            assert column["type"] in ["int", "string", "float"], "Expected type to be in {int, string, float}"
            assert type(column["description"]) == str, "Expected description to be a string"