Warn about mixing transactions, schema changes and threading.

This commit is contained in:
Friedrich Lindenberg 2017-09-05 07:49:05 +02:00
parent e5b3cd5f49
commit ffea0f7a69
2 changed files with 20 additions and 1 deletions

View File

@ -79,6 +79,11 @@ class Database(object):
"""Return a SQLAlchemy schema cache object.""" """Return a SQLAlchemy schema cache object."""
return MetaData(schema=self.schema, bind=self.executable) return MetaData(schema=self.schema, bind=self.executable)
@property
def in_transaction(self):
"""Check if this database is in a transactional context."""
return len(self.local.tx) > 0
def _flush_tables(self): def _flush_tables(self):
"""Clear the table metadata after transaction rollbacks.""" """Clear the table metadata after transaction rollbacks."""
for table in self._tables.values(): for table in self._tables.values():

View File

@ -1,4 +1,6 @@
import logging import logging
import warnings
import threading
from sqlalchemy.sql import and_, expression from sqlalchemy.sql import and_, expression
from sqlalchemy.sql.expression import ClauseElement from sqlalchemy.sql.expression import ClauseElement
@ -207,6 +209,13 @@ class Table(object):
except NoSuchTableError: except NoSuchTableError:
pass pass
def _threading_warn(self):
if self.db.in_transaction and threading.active_count() > 1:
warnings.warn("Changing the database schema inside a transaction "
"in a multi-threaded environment is likely to lead "
"to race conditions and synchronization issues.",
RuntimeWarning)
def _sync_table(self, columns): def _sync_table(self, columns):
"""Lazy load, create or adapt the table structure in the database.""" """Lazy load, create or adapt the table structure in the database."""
if self._table is None: if self._table is None:
@ -218,6 +227,7 @@ class Table(object):
raise DatasetException("Table does not exist: %s" % self.name) raise DatasetException("Table does not exist: %s" % self.name)
# Keep the lock scope small because this is run very often. # Keep the lock scope small because this is run very often.
with self.db.lock: with self.db.lock:
self._threading_warn()
self._table = SQLATable(self.name, self._table = SQLATable(self.name,
self.db.metadata, self.db.metadata,
schema=self.db.schema) schema=self.db.schema)
@ -236,6 +246,7 @@ class Table(object):
self._table.create(self.db.executable, checkfirst=True) self._table.create(self.db.executable, checkfirst=True)
elif len(columns): elif len(columns):
with self.db.lock: with self.db.lock:
self._threading_warn()
for column in columns: for column in columns:
self.db.op.add_column(self.name, column, self.db.schema) self.db.op.add_column(self.name, column, self.db.schema)
self._reflect_table() self._reflect_table()
@ -317,7 +328,7 @@ class Table(object):
log.debug("Column exists: %s" % name) log.debug("Column exists: %s" % name)
return return
log.debug("Create column: %s on %s", name, self.name) self._threading_warn()
self.db.op.add_column( self.db.op.add_column(
self.name, self.name,
Column(name, type), Column(name, type),
@ -353,6 +364,7 @@ class Table(object):
log.debug("Column does not exist: %s", name) log.debug("Column does not exist: %s", name)
return return
self._threading_warn()
self.db.op.drop_column( self.db.op.drop_column(
self.table.name, self.table.name,
name, name,
@ -367,6 +379,7 @@ class Table(object):
""" """
with self.db.lock: with self.db.lock:
if self.exists: if self.exists:
self._threading_warn()
self.table.drop(self.db.executable, checkfirst=True) self.table.drop(self.db.executable, checkfirst=True)
self._table = None self._table = None
@ -401,6 +414,7 @@ class Table(object):
raise DatasetException("Table has not been created yet.") raise DatasetException("Table has not been created yet.")
if not self.has_index(columns): if not self.has_index(columns):
self._threading_warn()
name = name or index_name(self.name, columns) name = name or index_name(self.name, columns)
columns = [self.table.c[c] for c in columns] columns = [self.table.c[c] for c in columns]
idx = Index(name, *columns, **kw) idx = Index(name, *columns, **kw)