Tuesday, October 17, 2017

Automating packaging and uploading of binary wheels

There has been plenty of frustration surrounding Python libraries which contain C extensions. Users are frustrated because pre-built wheels are not available for their platform, requiring them to install a compatible compiler. This has been particularly painful for Windows users. On the developer side, projects that have gone through the trouble of providing binary wheels often have a rather complicated process for building and uploading them, requiring several manual steps to get it done. On the other hand, projects that choose not to build binary wheels sometimes have droves of users nagging them to build those wheels (yes, I've been guilty of that). All in all, nobody's happy.

What makes this situation particularly challenging is that there is no single CI service available for F/OSS projects which could build wheels for all three major platforms (Linux, macOS and Windows). Travis supports Linux and macOS but not Windows. Then there's AppVeyor which only supports Windows. So in order to make everyone happy, you would need to combine the powers of both continuous integration services.

An additional challenge has been the lack of coordination tools for ensuring that a new release is uploaded to PyPI only if all the build jobs succeed. Naive configurations (yes, I've been guilty of that too) build the wheels and upload them to PyPI independently. This can lead to situations where a build job legitimately fails in a way that should've been a release blocker, but the jobs that succeeded have already uploaded their artifacts to PyPI. Now you have a botched release in your hands.

What if I told you that there is a way to set up your project so that all you have to do is add a git tag and push to Github, and the wheels and the source distribution would automatically get built and uploaded to PyPI if (and only if) all goes well?

Yes, folks. It can be done. You just need an adventurous mind. Take the red pill and find out how deep the rabbit hole goes.

How it works

As I hinted earlier, the recipe I'm about to present combines three important factors:
  1. Use of Travis's "Build Stages" feature
  2. Use of AppVeyor via its ReST API
  3. Use of an external storage service (Amazon S3 is used here, but it could be something else)
The gist is this: The Travis build first runs the tests against all supported Python versions. After the tests have finished successfully, Travis starts building wheels for Linux and macOS. Meanwhile, an additional job is started which sends a request to AppVeyor's ReST API which tells it to start a build against the current git changeset. It will then poll the status of the build on regular intervals until it finishes one way or another. If it fails, the Travis build is failed. If it succeeds, the build artifacts are downloaded to the container running the Travis build.

When all the wheels have been built, their respective build jobs will upload them to the shared storage. Then the final job is started which pulls all the artifacts from this storage and uploads them to PyPI.

Setting it up

Tl;dr: Go see the example project and adapt its configuration to your needs.

You will need to have the following set up before proceeding:
  • A project on Github
  • A PyPI account
  • An AppVeyor account
  • An AWS account (if S3 is used)
The use of Amazon's S3 could be replaced with any other storage service. However, the free tier on AWS should get you enough disk space to satisfy the needs of most projects. You do need to have a valid credit card, however.

You will need at least these two configuration files present in the project's root directory:
  • .travis.yml: build configuration for Travis (this is the most important part)
  • appveyor.yml: build configuration for AppVeyor
You can copy the linked files to your own project as a base. Just remember to replace the environment variables in .travis.yml (or remove them altogether, as explained below).

Travis setup

If your project does not yet have Travis integration enabled, you need to do the following:
  1. Go to your project's settings on Github
  2. Click on "Integrations and services"
  3. Click on "Add service"
  4. Choose "Travis CI" and enter your Github password when prompted to do so
  5. Go to your Travis profile settings on their site
  6. Click on "Sync account" (at the top right) to refresh the list of projects
  7. Find your project on the list after the sync is complete and turn the switch on
  8. Click on the cogwheel next to your project's name to enter the settings page
The following Travis project settings are recommended:

Next, you will need to define the following environment variables:
  • APPVEYOR_SLUG (your project name on AppVeyor)
  • APPVEYOR_ACCOUNT (your account name on AppVeyor)
  • TWINE_USERNAME (your PyPI user name)
  • TWINE_PASSWORD (your PyPI password)
  • AWS_ACCESS_KEY_ID (the access key ID from AWS, for shared storage)
  • AWS_SECRET_ACCESS_KEY (the secret key from AWS, for shared storage)

There are two ways you can provide your build jobs environment variables:
  1. Add them to your .travis.yml file, encrypting any confidential ones like passwords
  2. Add them on your Travis project settings page
To encrypt an environment variable, you will need to have Travis's command line client installed. Then, you can do something like this:
echo -n TWINE_PASSWORD=foobarbaz | travis encrypt
This will output an encrypted secret which you can paste into .travis.yml. Note the importance of the -n switch, as without that a newline character would be added to the end which would cause the wrong text to be encrypted.

AppVeyor setup

Assuming you have your AppVeyor account set up, you need to add your project to it. First, go to the Projects section in the top level menu and click "New Project". Then select Github and pick the project from that list.

Next, you need to disable the web hook AppVeyor just added to your Github project. This is necessary because the AppVeyor build should only be triggered by the wheel build stage on Travis. On Github, go to Settings -> Webhooks and edit the AppVeyor hook. Uncheck the "Active" check box and press "Update webhook", as shown below:
That's it for the AppVeyor configuration, unless your project has some special requirements.


I hope this will lower the barrier for projects to start producing binary wheels.

It should be noted that some projects prefer running their test suites against the wheels separately on each platform, but this is left as an exercise for the reader to implement.

Hopefully Travis will some day sort out their lack of Windows support.

The example configuration could be simplified somewhat once pip starts supporting pyproject.toml (included in the sample project). That should enable the removal of the "pip install wheel Cython" lines all over the configuration.

Monday, May 4, 2015

concurrent.futures backport updated

It's been a while since I last updated the backport of the concurrent.futures package for Python versions older than 3.2. I was looking at my github repositories and noticed that I wasn't marked as watching the pythonfutures repository. As I viewed the issue tracker, I noticed a huge slew of issues in it, some of them dating back to 2013! "Oops", I said to myself. I immediately started working on the bugs, mostly applying patches from upstream CPython code. I also decided to drop support for Python 2.5 and 3.1, since nobody seems to be using them anymore. Likewise, the "futures" top level package (which was an alias to concurrent.futures) is now gone.

Several issues were fixed by the 3.0.1 release:

  • CPython issue 16284 (bug #32; memory leak in thread and process pools)
  • CPython issue 11777 (bug #28; map() doesn't start tasks until generator is used)
  • CPython issue 15015 (accessing a nonexistent attribute)
  • CPython issue 20367 (behavior of concurrent.futures.as_completed() for duplicate arguments)
  • backport specific bug #20 (map() is greedy; fixed in 3.0.1)
Many apologies for the unreasonable delays in getting these fixed!

There are still some major issues with ProcessPoolExecutor, but the upstream code in Python 3.3+ now uses Python 3 specific features and I'm not brave enough to try and backport that. I'm not sure it's even possible. So if you need reliable process pools, you're going to have to switch to Python 3 :)

Friday, August 1, 2014

APScheduler 3.0 released

The first final version of APScheduler's 3.0 branch has been released.

For the uninitiated, APScheduler is a task scheduling and management system written in Python. Thinking of it as a cron/at daemon running inside your application is not far off, but APScheduler also provides management and monitoring of jobs, and much more. And of course it runs Python code instead of shell commands.

If one were to compare APScheduler with Celery, the difference could be summarized like this: Celery is a distributed task queue with basic scheduling capabilities, while APScheduler is a full featured scheduler with basic task queuing capabilities. Users tell me that APScheduler is easier to set up. I haven't personally used Celery, so I can't comment on that.

The 3.0 update brings many new features and enhancements, albeit at the cost of a backward-incompatible API. Virtually all of the feature requests from 2.x have been fulfilled. A guide is also provided for 2.x users for smoother migration to 3.0.

Performance improvements

Probably the most important change in 3.0 is about the job stores. In previous versions, all the job stores cached all their jobs in memory. This was to eliminate the overhead of fetching them from the backend (file or database). And that would've been fine with a small number of jobs, but when use cases started popping up that required thousands upon thousands of jobs, it became a severe problem. So starting with 3.0, persistent job stores no longer keep the jobs in memory, but instead rely on backend specific mechanisms (such as indexes) to efficiently fetch due jobs. This will greatly help reduce the memory footprints of applications that need to handle large numbers of jobs.

Time zone support

One of the most frequent complaints about APScheduler was that it always operated in the host's local time. Many users would've preferred it to always use the UTC timezone instead. Now, in 3.0, all datetimes are timezone aware. The scheduler has a set timezone which defaults to the local timezone, but can easily set to, say, UTC instead. Individual jobs can also be scheduled with different timezones if necessary.

Integration with asynchronous event loops

APScheduler now integrates with several widely used asynchronous application frameworks. The integration involves, at a minimum, the use of the event loop's built-in delayed execution mechanism. This avoids the use of a dedicated thread for the scheduler. With some frameworks, the integration can even provide a custom default executor (more on those in the next section) that runs the jobs in a built-in thread pool or similar.

Pluggable executor system

The built-in thread pool from the previous versions has been replaced with a pluggable executor system. Each scheduler subclass can specify its own default executor. For example, GeventScheduler uses a gevent specific executor that spawns jobs as greenlets. An executor based on the PEP 3148 (concurrent.futures) thread pool is used as the default executor on most scheduler subclasses.

While the thread pool in APScheduler 2.x was supposedly replaceable, using a process pool as a replacement didn't work in practice. This has been rectified by providing an officially supported ProcessPoolExecutor.

Although no such executors are yet provided, this API allows for remote execution, much like Celery does.

Scheduler API improvements

With 3.0, all the parameters of the job (except for its ID) can now be modified. In the previous versions, you had to remove and recreate the job from scratch to change anything about it. You can also pause, resume or completely reschedule jobs. This avoids having to keep the job parameters around in order to recreate the job.

The scheduler API now operates on jobs based on their IDs. This removes a lot of pain when implementing a remote scheduler service based on APScheduler. All the job related methods are also proxied on the job instances returned by add_job(). You can also now retrieve a particular job instance from the scheduler based on its ID – something that was painful to do with older versions of APScheduler.

The scheduler now allows you to schedule callables based on a text reference consisting of the fully qualified module name and a variable lookup path (for example, x.y.z:func_name). This is handy for when you need to schedule a function for which a reference can't be automatically determined, like static methods.

Finally, add_job() can now optionally replace an existing job (by its ID). This fixes a long-standing design flaw in APScheduler 2.x, in which adding a job in a persistent job store at application startup (usually using the scheduling decorators) would always add a new instance of the job without removing the old one. By supplying a static ID for the job, the user can ensure that there will be no duplicates of the job.

What's next?

A couple new features, contributed by other people, didn't make it into the 3.0 release. For one, there is a job store class for RethinkDB. Then there is support for getting the current number of running instances for each job. These will likely debut in the 3.1 release.

The rest will depend on user requirements and feedback. Happy scheduling :)

Wednesday, January 1, 2014

Unit testing SQLAlchemy apps, part 2: the universal method

The problem

In an earlier post I presented a method for unit testing SQLAlchemy applications cleanly, without leaving any trace of the tables created during the run of the test suite. However, the problem with this method was that it required the target RDBMS to support both transactional DDL and nested transactions. This requirement left two prominent vendors in the cold: MySQL and Oracle.

What didn't work

In search for a solution, I was reminded of the existence of a feature called temporary tables. This seemed like a perfect fit for the problem! Most RDBMS's can create both "local" and "global" temporary tables. Local temporary tables are only visible to the connection that created them, while global temporary tables are structurally visible to all connections, but the data still remains specific to each connection. Unfortunately, Oracle only supports global temporary tables which are unusable for unit testing since they stick around after the test suite has finished. MySQL, however, does support local temporary tables, so I modified the original unit testing code to create all the tables as temporary. But that didn't work either because apparently foreign keys between temporary tables aren't allowed.

Since temporary tables didn't provide a working solution, I had to look into other alternatives. The most obvious one would of course be to reflect the metadata at the beginning and use metadata.drop_all(). This approach, however, has one subtle issue circular relationships. If the tables are linked in a cyclic relationship, like A → B → C → A, then these tables can't be dropped without first dropping the foreign key constraints. But then I heard of the DROP TABLE ... CASCADE command which supposedly drops the foreign keys too along the way. This got my hopes up, even though SQLAlchemy didn't support this directly. Those hopes, however, died quickly when I looked at the MySQL documentation and saw this:
RESTRICT and CASCADE are permitted to make porting easier. In MySQL 5.6, they do nothing.

The solution

With this, I had to admit defeat and settle for the lowest common denominator. As such, the revised testing process goes as follows:
  1. Reflect the metadata from the database reserved for unit testing
  2. Drop all foreign key constraints using the old metadata
  3. Drop all tables using the reflected metadata
  4. Create all tables from the current model
  5. Add any base data (fixtures) and commit the session
  6. Prevent your application and framework from committing or closing the session
  7. Run the tests, rolling back the transaction at the end of each test
Step 1 is necessary because there's no telling how much your model has changed between test runs, so simply running drop_all() with your current model's metadata is not guaranteed to do the right thing.
Step 2 is necessary because of potential circular foreign key dependencies preventing some tables from being dropped (see the previous section for a deeper explanation).
Step 6 is necessary for two reasons:
  1. Allowing commit() would break test isolation by leaking database changes to other tests
  2. Allowing the session to close would mean that any changes made between requests within a single test would be rolled back
Finally, a couple reminders:
  • Remember to point the connection URI to a dedicated testing database so you don't lose development (not to mention production) data
  • When testing on MySQL, use InnoDB tables since MyISAM doesn't support transactions

Putting it to practice with Flask and py.test

I developed this testing method to run unit tests on a new, mid-sized Flask based web app of mine that had to use MySQL as its data store for compatibility with an older version. I've also recently migrated from nose to the wonderful py.test testing framework.

The following code is a generic testing example, adapted from my application's test suite. It works with a single-db configuration, but could be adapted to a multi-db configuration. I've tested it against MySQL 5.5, PostgreSQL 9.1 and SQLite 2.6.0.

It should be mentioned that I experimented with applying the faster method (based on nested transactions) but my test suite ran only 0.6 seconds faster with it and the approach required a whole different code path on several fixtures so I decided to drop it.

from sqlalchemy.schema import MetaData, DropConstraint
import pytest

from yourapp import db, create_app

def app(request):
    return create_app()

@pytest.fixture(scope='session', autouse=True)
def setup_db(request, app):
    # Clear out any existing tables
    metadata = MetaData(db.engine)
    for table in metadata.tables.values():
        for fk in table.foreign_keys:

    # Create the tables based on the current model

    # Add base data here
    # ...

def dbsession(request, monkeypatch):
    # Roll back at the end of every test

    # Prevent the session from closing (make it a no-op) and
    # committing (redirect to flush() instead)
    monkeypatch.setattr(db.session, 'commit', db.session.flush)
    monkeypatch.setattr(db.session, 'remove', lambda: None)

def test_example(app):
    with app.test_client() as client:
        response = client.get('/some/path')
    assert response.status_code == 200
    assert db.session.query(Foo).count() == 5

Saturday, November 9, 2013

10 common stumbling blocks for SQLAlchemy newbies

I've been hanging around on #sqlalchemy for years now, helping other users where I could. People come there for help with many different kinds of problems, but after a while, you start seeing common patterns. Certain difficulties seem to persist amongst SQLAlchemy newbies. This is my attempt at documenting these issues and providing answers and solutions where applicable. These 10 issues are not in any real order of appearance frequency – I just wrote them down in the order in which they popped into my mind.

Updated 2013-09-09: Corrected facts in section 5.

1. Not finding the (right) tutorial

Occasionally, someone comes on the channel asking questions while referring to a tutorial outside of sqlalchemy.org. These tutorials are often out of date or just plain incorrect. Personally, when I look for a tutorial on something, the first place to look would be the official website of the project you're trying to learn to use. I would say that is common sense.

There's also another group of people, who are able to find the official documentation but miss the tutorials, despite them being the first links on both the ORM and Core sections.

While I don't think there's any important information really missing from the SQLAlchemy documentation, the top page could probably be laid out differently to emphasize the suggested starting points for new users.

2. Boilerplate constructors

The declarative metaclass provides a default constructor for model classes, making explicit constructors largely unnecessary. The issue is that many people just don't seem to know about this, and happily add explicit constructors to all their classes. I actually blame the SQLAlchemy documentation for not clearly pointing this out. I've only ever found a single mention of this feature in the documentation. In my opinion, its existence should be prominently advertised in the ORM tutorial.

So instead of creating a constructor that accepts all your fields as arguments, just skip the constructor and instantiate classes like Company(name='Foo', address='blah').

3. Believing that defining or modifying the model classes creates or modifies the tables themselves

A surprising amount of new users seem to believe that SQLAlchemy is capable of automatically modifying their tables to match their classes. I am not aware of any ORM that does such a thing automatically, least of all reliably.

If you change your declared classes, you need to explicitly alter the schema in the actual database too, by either:
  1. using the tool of your choice (usually psql / mysql / other command line tool or a graphical tool like phpMyAdmin etc.) to manually create/alter/drop the tables
  2. using a migration library like Alembic

4. Forgetting to import model modules before attempting to call metadata.create_all()

I must admit that I was bitten by this one when I was starting with SQLAlchemy. You already know that metadata.create_all() creates all your tables, right? Sure, but before that can happen, all the modules that declare tables must have been imported. I mean, how else would SQLAlchemy know what the tables are?

Personally I hadn't even thought about how that works, but then there are some people who seem to expect SQLAlchemy to scan the project structure for model classes! Admittedly, the metaclass machinery that puts declarative classes in the metadata may in the beginning seem so magical that you expect everything to "just work" by itself.

So remember kids, you need to import all your model modules so that the metadata can be filled in. Only then will the call to metadata.create_all() do anything useful.

5. Confusion between .one(), .first() and .scalar()

Every week I see someone pasting code that uses Query.one() and catching the NoResultException. Then others complain that Query.first() should return a scalar value instead of a KeyedTuple when a single column or expression has been selected. Query.scalar() is virtually unheard of.

So let me clear up the confusion once and for all:
  • Query.one() executes the query and raises MultipleResultsFound if the number of rows was more than 1, and NoResultFound no rows were found. Otherwise it returns the sole result row (as tuple)
  • Query.first() executes the query with LIMIT 1 and returns the result row as tuple, or None if no rows were found
  • Query.scalar() executes the query and raises MultipleResultsFound if the number of rows was more than 1. If no rows were found, it returns None. Otherwise it returns the first column of the sole result row.
Query.one() should be used if you want to select a single row and you assume there is exactly one row to be found. Query.first() should be used when you want just one row but you're not sure if one will be found. Query.scalar() should be used when you want, for example, just the maximum of a single column.

6. Not understanding scoped_session and sessionmaker

This is quite possibly the greatest source of confusion among SQLAlchemy users. Especially so for developers who have very little experience in threaded programming. First, a quick recap:
  • Session: a container of instances of mapped classes; among other things lets you construct queries dynamically and add/delete/update entities
  • sessionmaker: a session factory generator (in other words, a function that returns a function that returns a new Session for each call)
  • scoped_session: a threadlocal manager and proxy of sessions
If you're still confused, you're not alone! So let me try and clear this up a bit. The sqlalchemy.orm.session.Session class is where the real meat is. The other two are merely helpers. In trivial scripts, this one is all you need.

The sessionmaker function is a convenience function for instantiating Session objects with customized parameters. Most notably session makers can't be used as proxies – you have to call one to get a session first.

The scoped_session function creates a thread local proxy/manager of Session instances. What this means is that you can access the scoped session object as if it was like a Session instance. The scoped session "knows" (by virtue of threadlocal objects) which thread is asking for a session and so it always returns the same session for the same thread. This is very convenient for multithreaded applications because you don't have to worry about sessions accidentally crossing threads.

Some want to call the scoped session to get a Session instance. While that certainly works, it's almost always unnecessary. Instead:

DBSession = scoped_session(sessionmaker(engine))
companies = DBSession.query(Company).all()

7. Query properties

Certain web frameworks (I'm looking at you, Flask-SQLAlchemy!) promote the use of query properties that supposedly make querying easier. Trouble is, it is apparently not obvious from the example code how one would query for specific columns, aggregates or other expressions using said query property.

The solution is to use the session directly:


8. Attempting to access related classes through relationships in queries:

Someone asks something like this every week on #sqlalchemy:
<anon> anyone see anything wrong with this query: c = Category.query.join(Category.parent).filter(Category.name=='Social Issues', Category.parent.name=='Teens').first()
What is wrong with this query is of course that Category.parent is a relationship and thus it doesn't have a "name" attribute. I don't blame anyone who falls for this though. It would make sense, from the syntactic perspective, for this to work. It might automatically add a join to Category.parent in the query. The reason why this can't be done is beyond my knowledge of SQLAlchemy. Anyway, to fix the query, you add an alias and join that:

parent = aliased(Category)
c = Category.query.join(parent, Category.parent).\
    filter(Category.name=='Social Issues',

9. Wanting to commit the session to get the ID of an object so it can be used to insert related objects

Most nontrivial SQLAlchemy applications need to work with relationships. Imagine a situation like this: You need to create a Company and add an Employee to it. You already have a relationship called employees in the Company class. So how would you accomplish this task? The typical answer goes along the lines of:
  1. Create the Company instance
  2. Add the Company instance to the session
  3. Flush the session
  4. Create the Employee instance with company_id from the previously flushed Company instance
  5. Add the Employee instance to the session
  6. Commit the session
This is just about how you'd do it without SQLAlchemy. But with SQLAlchemy this can be done much easier:
  1. Create the Company instance
  2. Create the Employee instance
  3. Add the Company instance to the session
  4. Commit the session
When the session is flushed, the Company row is first inserted to the database. Then, by virtue of relationship cascade, the Employee instance is discovered and also inserted into the database. The RDBMS can't tell the difference, but for the developer, the latter approach is much nicer.

10. Running tests using SQLite instead of the production RDBMS

I've written a separate blog post about this, but tl;dr: always test against the same RDBMS that you deploy with.

Sunday, August 25, 2013

Unit testing SQLAlchemy apps

I help out a lot on the #sqlalchemy channel on Freenode IRC. When people ask questions there, one of the issues that comes up often is unit testing applications that use SQLAlchemy. Almost all developers instinctively use SQLite for testing due to its simplicity and the possibility to use an in-memory database that leaves no garbage behind to be cleaned up. It seems like a clean and easy solution, yes? Well, my answer is that if you don't plan on deploying with SQLite support, don't test on SQLite! Always use the production RDBMS for testing. Going to deploy with PostgreSQL? Then test with PostgreSQL! Why?

There are at least two good reasons why testing and deploying with the same RDBMS is a good idea. The first and foremost is that SQLite is vastly different from other RDBMS's. For one, it does not really enforce column types, so code that erroneously inputs data of the wrong type won't cause errors when it should. There are also many semantic differences on how embedded and your typical client-server RDBMS's work, so you may run into bugs that only occur in production while all the tests pass just fine. The second reason is that SQLite's rather modest design, which lets it fit into small memory spaces, is also a big hindrance since it can't support some more advanced database features like window functions or recursive queries. This shortcoming prevents you from taking full advantage of the features of your chosen RDBMS.

If I managed to convince you, then you'll probably be asking how testing should be done on RDBMS's other than SQLite. The answer boils down to whether your RDBMS supports two crucial features: nested transactions and transactional DDL. Nested transactions are savepoints within a transaction, to which you can roll back without losing any changes done before the savepoint. Transactional DDL means that in addition to normal data modification (INSERT, UPDATE, DELETE), schema changes are also transactional. That means they can be rolled back, which is a very nice thing to have when unit testing. According to the article linked to above, the following RDBMS's support transactional DDL: PostgreSQL, SyBase, Microsoft SQL Server, DB2, Informix and Firebird. Most notably, MySQL and Oracle do not support it.

If your RDBMS does support the aforementioned two features, then you can conveniently test your SQLAlchemy apps in the following manner:
  1. Make sure you have an empty database for unit testing
  2. Create the engine, create one connection and start a transaction in it
  3. Create the tables
  4. Optionally, insert test fixtures
  5. For every test, repeat:
    1. Create a savepoint
    2. Run the test
    3. Roll back to the savepoint
  6. Roll back the transaction
This way all the tests are isolated from each other. Every tests gets the same "blank slate" as far as the database state is concerned. After the tests are done, the database will look just as empty as before the test suite was run, even if the tests were interrupted.

So how to actually accomplish this in practice? I'll give you an example using PostgreSQL and nose. This example should be adaptable for other equally capable test runners. The following code should be placed in __init__.py of the root package of your tests.

from sqlalchemy.engine import create_engine
from sqlalchemy.orm.session import Session

from your.package import Base  # This is your declarative base class

def setup_module():
    global transaction, connection, engine

    # Connect to the database and create the schema within a transaction
    engine = create_engine('postgresql:///yourdb')
    connection = engine.connect()
    transaction = connection.begin()

    # If you want to insert fixtures to the DB, do it here

def teardown_module():
    # Roll back the top level transaction and disconnect from the database

class DatabaseTest(object):
    def setup(self):
        self.__transaction = connection.begin_nested()
        self.session = Session(connection)

    def teardown(self):

To take advantage of this setup, your test class should inherit from DatabaseTest. If you override either setup() or teardown(), make sure you remember to call the superclass implementations too.

If you want to use this testing scheme with your web framework or other application framework, you should be aware of the requirement that the framework's SQLAlchemy extension/middleware/whatever must have a way to receive the connectable (Engine or Connection) as a Python object and not just as a connection URL.