Module tiresias.server.platform

Expand source code
import uuid
import threading
from time import time
from enum import Enum
from tiresias.core import b64_encode
from tiresias.server.handler import handle_task

class State:
    ERROR = 'ERROR'
    RUNNING = 'RUNNING'
    PENDING = 'PENDING'
    COMPLETE = 'COMPLETE'

class Platform(object):

    def __init__(self):
        """
        The Platform object is responsible for managing and executing tasks. It's designed to work
        with a multi-threaded web server and stores everything in-memory.
        """
        self._lock = threading.RLock()
        self._tasks = {}
        self._payloads = {}

    def gc(self, timeout=60):
        """
        Delete the data for any completed tasks and delete any completed tasks that have passed the
        timeout window.
        """
        with self._lock:
            for tid in set(self._tasks.keys()):
                if self._tasks[tid]["status"] == State.COMPLETE:
                    if time() - self._tasks[tid]["start"] > timeout:
                        del self._tasks[tid]
                        del self._payloads[tid]
    
    def run(self):
        """
        Examing the pending tasks and, if there's enough data, start processing the task. Note that
        the task handler will block the platform accepting new tasks and/or data; at some point, we 
        will need to move the task handler into its own process.
        """
        with self._lock:
            for tid in self.tasks(only_pending=True):
                if self._tasks[tid]["count"] < self._tasks[tid]["min_count"]:
                    continue
                self._tasks[tid]["status"] = State.RUNNING
                result, err = handle_task(self._tasks[tid], self._payloads[tid])
                if err:
                    self._tasks[tid]["status"] = State.ERROR
                    self._tasks[tid]["result"] = repr(err)
                else:
                    self._tasks[tid]["result"] = b64_encode(result)
                    self._tasks[tid]["status"] = State.COMPLETE
                    self._tasks[tid]["end"] = time()
    
    def tasks(self, only_pending=False):
        """
        Return a list of tasks.
        """
        with self._lock:
            if not only_pending:
                return self._tasks
            return {k: v for k, v in self._tasks.items() if v["status"] == State.PENDING}
    
    def create(self, task):
        """
        Create a new task.
        """
        with self._lock:
            task["id"] = str(uuid.uuid4())
            task["status"] = State.PENDING
            task["start"] = time()
            task["count"] = 0
            self._tasks[task["id"]] = task
            self._payloads[task["id"]] = []
            return task["id"]
    
    def fetch(self, task_id):
        """
        Fetch a specific task.
        """
        with self._lock:
            return self._tasks[task_id]

    def submit(self, task_id, payload):
        """
        Store the data for a specific task in-memory and update the counters.
        """
        with self._lock:
            task = self._tasks[task_id]
            if task["status"] != State.PENDING:
                return False
            self._payloads[task_id].append(payload)
            self._tasks[task_id]["count"] = len(self._payloads[task_id])
            return True

Classes

class Platform

The Platform object is responsible for managing and executing tasks. It's designed to work with a multi-threaded web server and stores everything in-memory.

Expand source code
class Platform(object):

    def __init__(self):
        """
        The Platform object is responsible for managing and executing tasks. It's designed to work
        with a multi-threaded web server and stores everything in-memory.
        """
        self._lock = threading.RLock()
        self._tasks = {}
        self._payloads = {}

    def gc(self, timeout=60):
        """
        Delete the data for any completed tasks and delete any completed tasks that have passed the
        timeout window.
        """
        with self._lock:
            for tid in set(self._tasks.keys()):
                if self._tasks[tid]["status"] == State.COMPLETE:
                    if time() - self._tasks[tid]["start"] > timeout:
                        del self._tasks[tid]
                        del self._payloads[tid]
    
    def run(self):
        """
        Examing the pending tasks and, if there's enough data, start processing the task. Note that
        the task handler will block the platform accepting new tasks and/or data; at some point, we 
        will need to move the task handler into its own process.
        """
        with self._lock:
            for tid in self.tasks(only_pending=True):
                if self._tasks[tid]["count"] < self._tasks[tid]["min_count"]:
                    continue
                self._tasks[tid]["status"] = State.RUNNING
                result, err = handle_task(self._tasks[tid], self._payloads[tid])
                if err:
                    self._tasks[tid]["status"] = State.ERROR
                    self._tasks[tid]["result"] = repr(err)
                else:
                    self._tasks[tid]["result"] = b64_encode(result)
                    self._tasks[tid]["status"] = State.COMPLETE
                    self._tasks[tid]["end"] = time()
    
    def tasks(self, only_pending=False):
        """
        Return a list of tasks.
        """
        with self._lock:
            if not only_pending:
                return self._tasks
            return {k: v for k, v in self._tasks.items() if v["status"] == State.PENDING}
    
    def create(self, task):
        """
        Create a new task.
        """
        with self._lock:
            task["id"] = str(uuid.uuid4())
            task["status"] = State.PENDING
            task["start"] = time()
            task["count"] = 0
            self._tasks[task["id"]] = task
            self._payloads[task["id"]] = []
            return task["id"]
    
    def fetch(self, task_id):
        """
        Fetch a specific task.
        """
        with self._lock:
            return self._tasks[task_id]

    def submit(self, task_id, payload):
        """
        Store the data for a specific task in-memory and update the counters.
        """
        with self._lock:
            task = self._tasks[task_id]
            if task["status"] != State.PENDING:
                return False
            self._payloads[task_id].append(payload)
            self._tasks[task_id]["count"] = len(self._payloads[task_id])
            return True

Methods

def create(self, task)

Create a new task.

Expand source code
def create(self, task):
    """
    Create a new task.
    """
    with self._lock:
        task["id"] = str(uuid.uuid4())
        task["status"] = State.PENDING
        task["start"] = time()
        task["count"] = 0
        self._tasks[task["id"]] = task
        self._payloads[task["id"]] = []
        return task["id"]
def fetch(self, task_id)

Fetch a specific task.

Expand source code
def fetch(self, task_id):
    """
    Fetch a specific task.
    """
    with self._lock:
        return self._tasks[task_id]
def gc(self, timeout=60)

Delete the data for any completed tasks and delete any completed tasks that have passed the timeout window.

Expand source code
def gc(self, timeout=60):
    """
    Delete the data for any completed tasks and delete any completed tasks that have passed the
    timeout window.
    """
    with self._lock:
        for tid in set(self._tasks.keys()):
            if self._tasks[tid]["status"] == State.COMPLETE:
                if time() - self._tasks[tid]["start"] > timeout:
                    del self._tasks[tid]
                    del self._payloads[tid]
def run(self)

Examing the pending tasks and, if there's enough data, start processing the task. Note that the task handler will block the platform accepting new tasks and/or data; at some point, we will need to move the task handler into its own process.

Expand source code
def run(self):
    """
    Examing the pending tasks and, if there's enough data, start processing the task. Note that
    the task handler will block the platform accepting new tasks and/or data; at some point, we 
    will need to move the task handler into its own process.
    """
    with self._lock:
        for tid in self.tasks(only_pending=True):
            if self._tasks[tid]["count"] < self._tasks[tid]["min_count"]:
                continue
            self._tasks[tid]["status"] = State.RUNNING
            result, err = handle_task(self._tasks[tid], self._payloads[tid])
            if err:
                self._tasks[tid]["status"] = State.ERROR
                self._tasks[tid]["result"] = repr(err)
            else:
                self._tasks[tid]["result"] = b64_encode(result)
                self._tasks[tid]["status"] = State.COMPLETE
                self._tasks[tid]["end"] = time()
def submit(self, task_id, payload)

Store the data for a specific task in-memory and update the counters.

Expand source code
def submit(self, task_id, payload):
    """
    Store the data for a specific task in-memory and update the counters.
    """
    with self._lock:
        task = self._tasks[task_id]
        if task["status"] != State.PENDING:
            return False
        self._payloads[task_id].append(payload)
        self._tasks[task_id]["count"] = len(self._payloads[task_id])
        return True
def tasks(self, only_pending=False)

Return a list of tasks.

Expand source code
def tasks(self, only_pending=False):
    """
    Return a list of tasks.
    """
    with self._lock:
        if not only_pending:
            return self._tasks
        return {k: v for k, v in self._tasks.items() if v["status"] == State.PENDING}
class State (*args, **kwargs)
Expand source code
class State:
    ERROR = 'ERROR'
    RUNNING = 'RUNNING'
    PENDING = 'PENDING'
    COMPLETE = 'COMPLETE'

Class variables

var COMPLETE

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

var ERROR

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

var PENDING

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

var RUNNING

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.