Friday, August 1, 2014

APScheduler 3.0 released

The first final version of APScheduler's 3.0 branch has been released.

For the uninitiated, APScheduler is a task scheduling and management system written in Python. Thinking of it as a cron/at daemon running inside your application is not far off, but APScheduler also provides management and monitoring of jobs, and much more. And of course it runs Python code instead of shell commands.

If one were to compare APScheduler with Celery, the difference could be summarized like this: Celery is a distributed task queue with basic scheduling capabilities, while APScheduler is a full featured scheduler with basic task queuing capabilities. Users tell me that APScheduler is easier to set up. I haven't personally used Celery, so I can't comment on that.

The 3.0 update brings many new features and enhancements, albeit at the cost of a backward-incompatible API. Virtually all of the feature requests from 2.x have been fulfilled. A guide is also provided for 2.x users for smoother migration to 3.0.

Performance improvements

Probably the most important change in 3.0 is about the job stores. In previous versions, all the job stores cached all their jobs in memory. This was to eliminate the overhead of fetching them from the backend (file or database). And that would've been fine with a small number of jobs, but when use cases started popping up that required thousands upon thousands of jobs, it became a severe problem. So starting with 3.0, persistent job stores no longer keep the jobs in memory, but instead rely on backend specific mechanisms (such as indexes) to efficiently fetch due jobs. This will greatly help reduce the memory footprints of applications that need to handle large numbers of jobs.

Time zone support

One of the most frequent complaints about APScheduler was that it always operated in the host's local time. Many users would've preferred it to always use the UTC timezone instead. Now, in 3.0, all datetimes are timezone aware. The scheduler has a set timezone which defaults to the local timezone, but can easily set to, say, UTC instead. Individual jobs can also be scheduled with different timezones if necessary.

Integration with asynchronous event loops

APScheduler now integrates with several widely used asynchronous application frameworks. The integration involves, at a minimum, the use of the event loop's built-in delayed execution mechanism. This avoids the use of a dedicated thread for the scheduler. With some frameworks, the integration can even provide a custom default executor (more on those in the next section) that runs the jobs in a built-in thread pool or similar.

Pluggable executor system

The built-in thread pool from the previous versions has been replaced with a pluggable executor system. Each scheduler subclass can specify its own default executor. For example, GeventScheduler uses a gevent specific executor that spawns jobs as greenlets. An executor based on the PEP 3148 (concurrent.futures) thread pool is used as the default executor on most scheduler subclasses.

While the thread pool in APScheduler 2.x was supposedly replaceable, using a process pool as a replacement didn't work in practice. This has been rectified by providing an officially supported ProcessPoolExecutor.

Although no such executors are yet provided, this API allows for remote execution, much like Celery does.

Scheduler API improvements

With 3.0, all the parameters of the job (except for its ID) can now be modified. In the previous versions, you had to remove and recreate the job from scratch to change anything about it. You can also pause, resume or completely reschedule jobs. This avoids having to keep the job parameters around in order to recreate the job.

The scheduler API now operates on jobs based on their IDs. This removes a lot of pain when implementing a remote scheduler service based on APScheduler. All the job related methods are also proxied on the job instances returned by add_job(). You can also now retrieve a particular job instance from the scheduler based on its ID – something that was painful to do with older versions of APScheduler.

The scheduler now allows you to schedule callables based on a text reference consisting of the fully qualified module name and a variable lookup path (for example, x.y.z:func_name). This is handy for when you need to schedule a function for which a reference can't be automatically determined, like static methods.

Finally, add_job() can now optionally replace an existing job (by its ID). This fixes a long-standing design flaw in APScheduler 2.x, in which adding a job in a persistent job store at application startup (usually using the scheduling decorators) would always add a new instance of the job without removing the old one. By supplying a static ID for the job, the user can ensure that there will be no duplicates of the job.

What's next?

A couple new features, contributed by other people, didn't make it into the 3.0 release. For one, there is a job store class for RethinkDB. Then there is support for getting the current number of running instances for each job. These will likely debut in the 3.1 release.

The rest will depend on user requirements and feedback. Happy scheduling :)

Wednesday, January 1, 2014

Unit testing SQLAlchemy apps, part 2: the universal method

The problem

In an earlier post I presented a method for unit testing SQLAlchemy applications cleanly, without leaving any trace of the tables created during the run of the test suite. However, the problem with this method was that it required the target RDBMS to support both transactional DDL and nested transactions. This requirement left two prominent vendors in the cold: MySQL and Oracle.

What didn't work

In search for a solution, I was reminded of the existence of a feature called temporary tables. This seemed like a perfect fit for the problem! Most RDBMS's can create both "local" and "global" temporary tables. Local temporary tables are only visible to the connection that created them, while global temporary tables are structurally visible to all connections, but the data still remains specific to each connection. Unfortunately, Oracle only supports global temporary tables which are unusable for unit testing since they stick around after the test suite has finished. MySQL, however, does support local temporary tables, so I modified the original unit testing code to create all the tables as temporary. But that didn't work either because apparently foreign keys between temporary tables aren't allowed.

Since temporary tables didn't provide a working solution, I had to look into other alternatives. The most obvious one would of course be to reflect the metadata at the beginning and use metadata.drop_all(). This approach, however, has one subtle issue circular relationships. If the tables are linked in a cyclic relationship, like A → B → C → A, then these tables can't be dropped without first dropping the foreign key constraints. But then I heard of the DROP TABLE ... CASCADE command which supposedly drops the foreign keys too along the way. This got my hopes up, even though SQLAlchemy didn't support this directly. Those hopes, however, died quickly when I looked at the MySQL documentation and saw this:
RESTRICT and CASCADE are permitted to make porting easier. In MySQL 5.6, they do nothing.

The solution

With this, I had to admit defeat and settle for the lowest common denominator. As such, the revised testing process goes as follows:
  1. Reflect the metadata from the database reserved for unit testing
  2. Drop all foreign key constraints using the old metadata
  3. Drop all tables using the reflected metadata
  4. Create all tables from the current model
  5. Add any base data (fixtures) and commit the session
  6. Prevent your application and framework from committing or closing the session
  7. Run the tests, rolling back the transaction at the end of each test
Step 1 is necessary because there's no telling how much your model has changed between test runs, so simply running drop_all() with your current model's metadata is not guaranteed to do the right thing.
Step 2 is necessary because of potential circular foreign key dependencies preventing some tables from being dropped (see the previous section for a deeper explanation).
Step 6 is necessary for two reasons:
  1. Allowing commit() would break test isolation by leaking database changes to other tests
  2. Allowing the session to close would mean that any changes made between requests within a single test would be rolled back
Finally, a couple reminders:
  • Remember to point the connection URI to a dedicated testing database so you don't lose development (not to mention production) data
  • When testing on MySQL, use InnoDB tables since MyISAM doesn't support transactions

Putting it to practice with Flask and py.test

I developed this testing method to run unit tests on a new, mid-sized Flask based web app of mine that had to use MySQL as its data store for compatibility with an older version. I've also recently migrated from nose to the wonderful py.test testing framework.

The following code is a generic testing example, adapted from my application's test suite. It works with a single-db configuration, but could be adapted to a multi-db configuration. I've tested it against MySQL 5.5, PostgreSQL 9.1 and SQLite 2.6.0.

It should be mentioned that I experimented with applying the faster method (based on nested transactions) but my test suite ran only 0.6 seconds faster with it and the approach required a whole different code path on several fixtures so I decided to drop it.

from sqlalchemy.schema import MetaData, DropConstraint
import pytest

from yourapp import db, create_app


@pytest.fixture(scope='session')
def app(request):
    return create_app()


@pytest.fixture(scope='session', autouse=True)
def setup_db(request, app):
    # Clear out any existing tables
    metadata = MetaData(db.engine)
    metadata.reflect()
    for table in metadata.tables.values():
        for fk in table.foreign_keys:
            db.engine.execute(DropConstraint(fk.constraint))
    metadata.drop_all()

    # Create the tables based on the current model
    db.create_all()

    # Add base data here
    # ...
    db.session.flush()
    db.session.expunge_all()
    db.session.commit()


@pytest.fixture(autouse=True)
def dbsession(request, monkeypatch):
    # Roll back at the end of every test
    request.addfinalizer(db.session.remove)

    # Prevent the session from closing (make it a no-op) and
    # committing (redirect to flush() instead)
    monkeypatch.setattr(db.session, 'commit', db.session.flush)
    monkeypatch.setattr(db.session, 'remove', lambda: None)


def test_example(app):
    with app.test_client() as client:
        response = client.get('/some/path')
    
    assert response.status_code == 200
    assert db.session.query(Foo).count() == 5