Massoud Mazar

Sharing The Knowledge

NAVIGATION - SEARCH

Setting Postgres transaction isolation level through Flask-SqlAlchemy

Recently I had to deal with a concurrency issue in a legacy Flask application which did not enforce uniqueness. Multiple API clients trying to create a record in the database with the same value in 'name' column would end up creating duplicate entries. Applying a unique constraint to the name column required cleaning the duplicates first, and cleaning up data was not easy, so I decided to first implement a workaround to prevent more duplicates, and then go about cleaning up the data.

The solution was to use SET TRANSACTION ISOLATION LEVEL SERIALIZABLE statement in Postgres to lock the table and first check to see if the record exists, before inserting a new one.

As you may have encountered, in a Flask-SqlAlchemy application, database session exposed by SqlAlchemy is shared and that blocks you from setting the transaction isolation level. I ended up creating a function (in a mixin class) to do the get_or_create operation in a new database session which is created and destroyed inside the function. When multiple clients try to create the same element in the database, a specific exception occurs which needs to be captured so the operation can be retried.

class GetOrCreateMixin(object):
    """ provides get_or_create functionality """
    # TODO: this is a temporary workaround until we cleanup duplicates
    def get_or_create(self, **kwargs):
        # create a new session so we can set transaction isolation level
        session = db.create_session({})()
        # Set isolation level for postgres (skipped for unit tests)
        if db.engine.name == 'postgresql':
            session.execute('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;')
        retry_count = 0
        model = type(self)
        while True:
            try:
                item = session.query(model).filter_by(**kwargs).first()
                if not item:
                    item = self
                    session.add(item)
                session.commit()
                session.close()
                break
            except sqlalchemy.exc.OperationalError:
                session.rollback()
                time.sleep(0.1)
                retry_count += 1
                if retry_count > MAX_RETRY:
                    session.close()
                    raise
        # use a shared session to load the object
        return db.session.query(model).filter_by(**kwargs).first()

Now you can use this function in your data models:

 

class Employee(db.Model, NameMixin, GetOrCreateMixin):
    """ An Employee """
    department_id = db.Column(db.Integer, db.ForeignKey("department.id"), 
                    nullable=False)
    department = db.relationship("Department", 
                    backref=db.backref("employees", cascade="delete"))

And the API which creates the employee can leverage the function to make sure no duplicate employee is created:

@employee_api.route("/", endpoint="employees")
class EmployeesResource(Resource):

    @employee_api.expect(parsers.employee)
    def post(self):
        """ Create a new Employee or return existing based 
            on name and department id"""
        args = parsers.employee.parse_args()
        employee = models.Employee(**args)
        employee = employee.get_or_create(
                name=employee.name, 
                department_id=employee.department_id)
        return schemas.Employee().dump(employee)

 

Add comment