Working with Celery

Celery is an asynchronous task queue/job queue based on distributed message passing.

Setup a regular celery project and import tropofy.celery. For example (as in the celery docs), for a module proj with a task task:

from __future__ import absolute_import
from celery import Celery
import tropofy.celery  # Need this import even though unused! Specifies helper Celery config to aid using tropofy with celery
from celery.result import AsyncResult


celery = Celery(
    'proj',
    include=['proj.task'],
)
celery.config_from_object('proj.celeryconfig')

# To be able to fetch task result/status from the backend, must set AsyncResult.app
AsyncResult.app = celery

if __name__ == '__main__':
    celery.start()

This will configure your Celery app along with including some helper functionality for use with Tropofy. This includes:

  • Ability to pass a tropofy.app.data_set.AppDataSet object to a celery Task
    • To any celery task, pass a tropofy.app.data_set.AppDataSet as any argument and it will be accessible within the worker (despite json serialiser).
    • All other parameters must be JSON serialisable (simple text, numbers, dict, list, etc.) Note - Python tuples are not json serialisable.
    • Within your task, tropofy.app.data_set.AppDataSet can be used the same as in Tropofy, with a few minor caveats:
      • Cannot reference parameter app. This shouldn’t be required however, as tropofy.app.data_set.AppDataSet has parameters app_name, app_url_name, and app_module_path.
      • tropofy.app.AppDataSet.set_param() cannot be used as yet. (tropofy.app.AppDataSet.get_param() can however)
  • Automatic SQLAlchemy session management.
    • Automatic SQLAlchemy database connection and termination
    • Auto commit changes when task is complete

Config

For the above celery app, create a file proj.celeryconfig.py. An example file is below:

BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_ENABLE_UTC = True

It is recommended to use ‘json’ serialiser (as above) if you are using the tropofy.celery helper functionality.

Note

For CELERY_RESULT_BACKEND it is possible to use a database. This can work well in a Tropofy app, as the same databse used for app data can be used as a Celery backend. Note that is you use PostgreSQL for the Celery result backend and use default settings, that Celery will make tables in the public schema. Do not delete this schema, or Celery will not work.

Running Code with Celery

Note

The following instructions have been tested using PostgreSQL as both the Celery Result Backend, and main database. Other configuration are definitely possible, however not yet tested.

Tropofy provides an easy to use framework for calling Celery tasks within an tropofy.widgets.ExecuteFunction. To do so, a small amount of configuration is required:

  1. Create a celery task that accepts app_session as a parameter. Additional parameters may be passed, as long as they are JSON serialisable. For example:
# Import your celery app definition
from my_celery_app import celery

@celery.task(track_started=True)
def run_my_task(app_session):
    app_session.task_manager.send_progress_message("Starting task execution...")
    # Execute some code with a Celery worker
    app_session.task_manager.send_progress_message("Finished task execution...")
  1. Add the module containing this celery task to Celery include list defined when constructing your celery app.
  2. Ensure that throughout your Tropofy app, all imports to the same module use the same relative path. If you don’t, the Celery worker will think they are separate modules and likely cause issues with tasks or SQLA. For example:
# Do not import from different relative paths throughout your app like in the following example:
from widgets import *
from my_app.widgets import *
  1. Remove any reference to tropofy.app.AppDataSet.set_param(). This cannot yet be used within a Celery task (tropofy.app.AppDataSet.get_param() and tropofy.app.AppDataSet.set_var() can however).

  2. Specify schema name (if using schemas - PostgreSQL) - The Tropofy Framework dynamically assigns a default schema name to an app when loading the app. As a Celery worker initialises differently, we need to set the schema explicitly for an app. - Add the following to your_app.__init__.py, substituting “<app_schema_name>” for the schema name you would like to use.

    from tropofy.database.tropofy_orm import DynamicSchemaName
    DynamicSchemaName.schema_name = "<app_schema_name>"
    

    Note

    If importing tables within an app from another schema, the DynamicSchemaName must be set prior to the import to ensure the SQLAlchemy metadata puts the table in the correct schema.

Calling a Celery task from within an tropofy.widgets.ExecuteFunction

tropofy.widgets.ExecuteFunction has helper functionality for running asynchronous tasks with Celery, and communicating progress back to the client

# Import your celery task
from my_celery_task_def import run_my_task
from tropofy.widgets import ExecuteFunctionWidget

class MyCeleryExecuteFunction(ExecuteFunctionWidget):
    def get_button_text(self):
        return "Execute task with Celery"

    def execute_function(self, app_session):
        return run_my_task.delay(app_session)

    def execute_tasks_with_celery(self, app_session):
        return True

Note

To run this same code without Celery, simply call the task normally without .delay and return False from tropofy.widgets.ExecuteFunction.execute_tasks_with_celery()

Additional Task Configuration

  1. Stopping a running task: - Celery tasks have a red stop button enabling them to be stopped while processing. For all Celery tasks, this will stop the task from the users perspective. However, this will not stop the Celery worker automatically on the server - It is up to the developer to test if a task has been stopped using tropofy.app.task.TaskManager.current_celery_task_terminated(), and then raise an Exception to stop the task.
  2. Calling a subprocess within a task: - This is a common pattern for Celery tasks in Tropofy apps. For example, use this to call and communicate with compiled C++ code. - Use tropofy.app.task.TaskManager.communicate_with_subprocess() to both send messages from the subprocess to the GUI, and check if the user has stopped the task, terminating it accordingly. - The following example shows how to call a subprocess and communicate with it:
import subprocess
from my_celery_app import celery

@celery.task(track_started=True)
def run_my_task(app_session):
    app_session.task_manager.send_progress_message("Starting task execution...")

    p = subprocess.Popen(["process_details"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    app_session.task_manager.communicate_with_subprocess(p)

    app_session.task_manager.send_progress_message("Finished task execution successfully.", status=data_set.message_status.SUCCESS)

Warning

By default, tropofy.app.task.TaskManager.communicate_with_subprocess() assumes that if a subprocess pipes any text to stderr, that the task has failed. See tropofy.app.task.TaskManager.communicate_with_subprocess() docs for further options.

Sending Synchronous tasks to a Celery Worker

Helper functionality for async Celery tasks is built in to tropofy.widgets.ExecuteFunction, however using Celery for async task execution will likely have unexpected results if used elsewhere in the framework. Using synchronous Celery task execution is a way of utilising the computing power of a Celery worker anywhere in the Tropofy framework.

Example: synchronous loading of example data

For example, if loading example data with tropofy.app.AppWithDataSets.get_examples(), uses a large amount of computing power (eg. importing large data sets another data source), it may not be scalable to burden your web server with this load. By instead sending this work a Celery worker and executing it synchronously, the web server is only burdened with waiting for a response from the Celery worker (light load), while the Celery worker takes the heavy load of doing the processing.

# This code is not valid at all, we just haven't fixed the docs yet
from tropofy.app import AppWithDataSets, Step, StepGroup
from my_celery_app import celery
import transaction

class DOTDayStartApp(AppWithDataSets):
    def get_examples(self):
        return {"Test - big example data", load_example_data},

    # The result of the app definition...

def load_example_data(app_session):
    transaction.commit()  # This ensures the new data set that the example data will be loaded into has been written to the database. Note that by using this,
    load_data_synchronously.delay(app_session).get()  # .delay calls load_data_synchronously as a Celery task. Using .get() runs the task synchronously.

@celery.task(track_started=True)
def load_data_synchronously(app_session):
    pass  # Load data considering the restrictions for a Celery task (no reference to data_set.app or data_set.set_param)