Simplify transactional logging code, fix deadlock situation.
This commit is contained in:
parent
522415a27c
commit
77762266dd
@ -65,39 +65,22 @@ class Database(object):
|
||||
|
||||
@property
|
||||
def executable(self):
|
||||
"""Connection or engine against which statements will be executed."""
|
||||
if hasattr(self.local, 'connection'):
|
||||
return self.local.connection
|
||||
return self.engine
|
||||
"""Connection against which statements will be executed."""
|
||||
if not hasattr(self.local, 'conn'):
|
||||
self.local.conn = self.engine.connect()
|
||||
return self.local.conn
|
||||
|
||||
@property
|
||||
def op(self):
|
||||
"""Get an alembic operations context."""
|
||||
ctx = MigrationContext.configure(self.engine)
|
||||
ctx = MigrationContext.configure(self.executable)
|
||||
return Operations(ctx)
|
||||
|
||||
def _acquire(self):
|
||||
self.lock.acquire()
|
||||
|
||||
def _release(self):
|
||||
if not hasattr(self.local, 'tx'):
|
||||
self.lock.release()
|
||||
else:
|
||||
self.local.lock_count[-1] += 1
|
||||
|
||||
def _release_internal(self):
|
||||
for index in range(self.local.lock_count[-1]):
|
||||
self.lock.release()
|
||||
del self.local.lock_count[-1]
|
||||
|
||||
def _dispose_transaction(self):
|
||||
self._release_internal()
|
||||
self.local.tx.remove(self.local.tx[-1])
|
||||
if not self.local.tx:
|
||||
del self.local.tx
|
||||
del self.local.lock_count
|
||||
self.local.connection.close()
|
||||
del self.local.connection
|
||||
self.lock.release()
|
||||
|
||||
def begin(self):
|
||||
"""
|
||||
@ -107,13 +90,9 @@ class Database(object):
|
||||
**NOTICE:** Schema modification operations, such as the creation
|
||||
of tables or columns will not be part of the transactional context.
|
||||
"""
|
||||
if not hasattr(self.local, 'connection'):
|
||||
self.local.connection = self.engine.connect()
|
||||
if not hasattr(self.local, 'tx'):
|
||||
self.local.tx = []
|
||||
self.local.lock_count = []
|
||||
self.local.tx.append(self.local.connection.begin())
|
||||
self.local.lock_count.append(0)
|
||||
self.local.tx.append(self.executable.begin())
|
||||
|
||||
def commit(self):
|
||||
"""
|
||||
@ -122,8 +101,8 @@ class Database(object):
|
||||
Make all statements executed since the transaction was begun permanent.
|
||||
"""
|
||||
if hasattr(self.local, 'tx') and self.local.tx:
|
||||
self.local.tx[-1].commit()
|
||||
self._dispose_transaction()
|
||||
tx = self.local.tx.pop()
|
||||
tx.commit()
|
||||
|
||||
def rollback(self):
|
||||
"""
|
||||
@ -132,8 +111,8 @@ class Database(object):
|
||||
Discard all statements executed since the transaction was begun.
|
||||
"""
|
||||
if hasattr(self.local, 'tx') and self.local.tx:
|
||||
self.local.tx[-1].rollback()
|
||||
self._dispose_transaction()
|
||||
tx = self.local.tx.pop()
|
||||
tx.rollback()
|
||||
|
||||
def __enter__(self):
|
||||
"""Start a transaction."""
|
||||
|
||||
@ -298,15 +298,17 @@ class Table(object):
|
||||
"""
|
||||
self._check_dropped()
|
||||
self.database._acquire()
|
||||
|
||||
try:
|
||||
if normalize_column_name(name) not in self._normalized_columns:
|
||||
self.database.op.add_column(
|
||||
self.table.name,
|
||||
Column(name, type),
|
||||
self.table.schema
|
||||
)
|
||||
self.table = self.database.update_table(self.table.name)
|
||||
if normalize_column_name(name) in self._normalized_columns:
|
||||
log.warn("Column with similar name exists: %s" % name)
|
||||
return
|
||||
|
||||
self.database.op.add_column(
|
||||
self.table.name,
|
||||
Column(name, type),
|
||||
self.table.schema
|
||||
)
|
||||
self.table = self.database.update_table(self.table.name)
|
||||
finally:
|
||||
self.database._release()
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user