Move to a model where the table is created lazily, with an initial set of columns

This commit is contained in:
Friedrich Lindenberg 2017-09-03 23:23:57 +02:00
parent e30cf24195
commit 4232606d27
4 changed files with 191 additions and 157 deletions

View File

@ -6,11 +6,9 @@ from six.moves.urllib.parse import parse_qs, urlparse
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.schema import MetaData, Column
from sqlalchemy.schema import Table as SQLATable
from sqlalchemy.schema import MetaData
from sqlalchemy.pool import StaticPool
from sqlalchemy.util import safe_reraise
from sqlalchemy.exc import NoSuchTableError
from sqlalchemy.engine.reflection import Inspector
from alembic.migration import MigrationContext
@ -18,6 +16,7 @@ from alembic.operations import Operations
from dataset.persistence.table import Table
from dataset.persistence.util import ResultIter, row_type, safe_url, QUERY_STEP
from dataset.persistence.util import normalize_table_name
from dataset.persistence.types import Types
log = logging.getLogger(__name__)
@ -94,8 +93,7 @@ class Database(object):
tx.commit()
def rollback(self):
"""
Roll back the current transaction.
"""Roll back the current transaction.
Discard all statements executed since the transaction was begun.
"""
@ -128,24 +126,18 @@ class Database(object):
def __contains__(self, table_name):
"""Check if the given table name exists in the database."""
try:
return self._valid_table_name(table_name) in self.tables
return normalize_table_name(table_name) in self.tables
except ValueError:
return False
def _valid_table_name(self, table_name):
"""Check if the table name is obviously invalid."""
if table_name is None or not len(table_name.strip()):
raise ValueError("Invalid table name: %r" % table_name)
return table_name.strip()
def create_table(self, table_name, primary_id=None, primary_type=None):
"""Create a new table.
Either loads a table or creates it if it doesn't exist yet. You can
define the name and type of the primary key field, if a new table is to
be created. The default is to create an auto-incrementing integer,
`id`. You can also set the primary key to be a string or big integer.
The caller will be responsible for the uniqueness of `primary_id` if
``id``. You can also set the primary key to be a string or big integer.
The caller will be responsible for the uniqueness of ``primary_id`` if
it is defined as a text type.
Returns a :py:class:`Table <dataset.Table>` instance.
@ -168,64 +160,37 @@ class Database(object):
"""
assert not isinstance(primary_type, six.string_types), \
'Text-based primary_type support is dropped, use db.types.'
table_name = self._valid_table_name(table_name)
table_name = normalize_table_name(table_name)
with self.lock:
if table_name in self:
return self.load_table(table_name)
log.debug("Creating table: %s" % (table_name))
table = SQLATable(table_name, self.metadata, schema=self.schema)
if primary_id is not False:
primary_id = primary_id or Table.PRIMARY_DEFAULT
primary_type = primary_type or self.types.integer
autoincrement = primary_type in [self.types.integer,
self.types.bigint]
col = Column(primary_id, primary_type,
primary_key=True,
autoincrement=autoincrement)
table.append_column(col)
table.create(self.executable, checkfirst=True)
self._tables[table_name] = Table(self, table)
return self._tables[table_name]
if table_name not in self._tables:
self._tables[table_name] = Table(self, table_name,
primary_id=primary_id,
primary_type=primary_type,
auto_create=True)
return self._tables.get(table_name)
def load_table(self, table_name):
"""Load a table.
This will fail if the tables does not already exist in the database. If the
table exists, its columns will be reflected and are available on the
:py:class:`Table <dataset.Table>` object.
This will fail if the tables does not already exist in the database. If
the table exists, its columns will be reflected and are available on
the :py:class:`Table <dataset.Table>` object.
Returns a :py:class:`Table <dataset.Table>` instance.
::
table = db.load_table('population')
"""
table_name = self._valid_table_name(table_name)
if table_name in self._tables:
return self._tables.get(table_name)
log.debug("Loading table: %s", table_name)
table_name = normalize_table_name(table_name)
with self.lock:
table = self._reflect_table(table_name)
if table is not None:
self._tables[table_name] = Table(self, table)
return self._tables[table_name]
def _reflect_table(self, table_name):
"""Reload a table schema from the database."""
table_name = self._valid_table_name(table_name)
try:
table = SQLATable(table_name,
self.metadata,
schema=self.schema,
autoload=True,
autoload_with=self.executable)
return table
except NoSuchTableError:
return None
if table_name not in self._tables:
self._tables[table_name] = Table(self, table_name)
return self._tables.get(table_name)
def get_table(self, table_name, primary_id=None, primary_type=None):
"""Load or create a table.
This is now the same as `create_table`.
This is now the same as ``create_table``.
::
table = db.get_table('population')
@ -249,17 +214,16 @@ class Database(object):
Further positional and keyword arguments will be used for parameter
binding. To include a positional argument in your query, use question
marks in the query (i.e. `SELECT * FROM tbl WHERE a = ?`). For keyword
arguments, use a bind parameter (i.e. `SELECT * FROM tbl WHERE a =
:foo`).
The returned iterator will yield each result sequentially.
marks in the query (i.e. ``SELECT * FROM tbl WHERE a = ?```). For
keyword arguments, use a bind parameter (i.e. ``SELECT * FROM tbl
WHERE a = :foo``).
::
statement = 'SELECT user, COUNT(*) c FROM photos GROUP BY user'
for row in db.query(statement):
print(row['user'], row['c'])
The returned iterator will yield each result sequentially.
"""
if isinstance(query, six.string_types):
query = text(query)

View File

@ -5,9 +5,13 @@ from sqlalchemy.sql.expression import ClauseElement
from sqlalchemy.schema import Column, Index
from sqlalchemy import func, select, false
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.schema import Table as SQLATable
from sqlalchemy.exc import NoSuchTableError
from dataset.persistence.types import Types
from dataset.persistence.util import normalize_column_name, index_name
from dataset.persistence.util import ensure_tuple, ResultIter, QUERY_STEP
from dataset.persistence.util import normalize_table_name
from dataset.util import DatasetException
@ -18,21 +22,31 @@ class Table(object):
"""Represents a table in a database and exposes common operations."""
PRIMARY_DEFAULT = 'id'
def __init__(self, database, table):
def __init__(self, database, table_name, primary_id=None,
primary_type=None, auto_create=False):
"""Initialise the table from database schema."""
self.database = database
self.name = table.name
self.table = table
self._is_dropped = False
self.name = normalize_table_name(table_name)
self._table = None
self._indexes = []
self._primary_id = primary_id
self._primary_type = primary_type
self._auto_create = auto_create
@property
def exists(self):
"""Check to see if the table currently exists in the database."""
if self.table is not None:
if self._table is not None:
return True
return self.name in self.database
@property
def table(self):
"""Get a reference to the table, which may be reflected or created."""
if self._table is None:
self._sync_table(())
return self._table
@property
def columns(self):
"""Get a listing of all columns that exist in the table."""
@ -40,24 +54,12 @@ class Table(object):
return []
return self.table.columns.keys()
def drop(self):
"""
Drop the table from the database.
Delete both the schema and all the contents within it.
Note: the object will raise an Exception if you use it after
dropping the table. If you want to re-create the table, make
sure to get a fresh instance from the :py:class:`Database <dataset.Database>`.
"""
with self.database.lock:
if not self.exists:
return
self.table.drop(self.database.executable, checkfirst=True)
self.table = None
def has_column(self, column):
"""Check if a column with the given name exists on this table."""
return normalize_column_name(column) in self.columns
def insert(self, row, ensure=None, types=None):
"""
Add a row (type: dict) by inserting it into the table.
"""Add a ``row`` dict by inserting it into the table.
If ``ensure`` is set, any of the keys of the row are not
table columns, they will be created automatically.
@ -81,8 +83,7 @@ class Table(object):
return True
def insert_ignore(self, row, keys, ensure=None, types=None):
"""
Add a row (type: dict) into the table if the row does not exist.
"""Add a ``row`` dict into the table if the row does not exist.
If rows with matching ``keys`` exist they will be added to the table.
@ -108,8 +109,7 @@ class Table(object):
return False
def insert_many(self, rows, chunk_size=1000, ensure=None, types=None):
"""
Add many rows at a time.
"""Add many rows at a time.
This is significantly faster than adding them one by one. Per default
the rows are processed in chunks of 1000 per commit, unless you specify
@ -134,21 +134,20 @@ class Table(object):
self.table.insert().execute(chunk)
def update(self, row, keys, ensure=None, types=None, return_count=False):
"""
Update a row in the table.
"""Update a row in the table.
The update is managed via the set of column names stated in ``keys``:
they will be used as filters for the data to be updated, using the values
in ``row``.
they will be used as filters for the data to be updated, using the
values in ``row``.
::
# update all entries with id matching 10, setting their title columns
data = dict(id=10, title='I am a banana!')
table.update(data, ['id'])
If keys in ``row`` update columns not present in the table,
they will be created based on the settings of ``ensure`` and
``types``, matching the behavior of :py:meth:`insert() <dataset.Table.insert>`.
If keys in ``row`` update columns not present in the table, they will
be created based on the settings of ``ensure`` and ``types``, matching
the behavior of :py:meth:`insert() <dataset.Table.insert>`.
"""
row = self._sync_columns(row, ensure, types=types)
args, row = self._keys_to_args(row, keys)
@ -185,8 +184,7 @@ class Table(object):
Keyword arguments can be used to add column-based filters. The filter
criterion will always be equality:
.. code-block:: python
::
table.delete(place='Berlin')
@ -199,27 +197,76 @@ class Table(object):
rp = self.database.executable.execute(stmt)
return rp.rowcount > 0
def has_column(self, column):
"""Check if a column with the given name exists on this table."""
return normalize_column_name(column) in self.columns
def _reflect_table(self):
"""Load the tables definition from the database."""
with self.database.lock:
try:
self._table = SQLATable(self.name,
self.database.metadata,
schema=self.database.schema,
autoload=True)
except NoSuchTableError:
pass
def _sync_table(self, columns):
"""Lazy load, create or adapt the table structure in the database."""
if self._table is None:
# Load an existing table from the database.
self._reflect_table()
if self._table is None:
# Create the table with an initial set of columns.
if not self._auto_create:
raise DatasetException("Table does not exist: %s" % self.name)
# Keep the lock scope small because this is run very often.
with self.database.lock:
self._table = SQLATable(self.name,
self.database.metadata,
schema=self.database.schema)
if self._primary_id is not False:
# This can go wrong on DBMS like MySQL and SQLite where
# tables cannot have no columns.
primary_id = self._primary_id or self.PRIMARY_DEFAULT
primary_type = self._primary_type or Types.integer
autoincrement = primary_type in [Types.integer,
Types.bigint]
column = Column(primary_id, primary_type,
primary_key=True,
autoincrement=autoincrement)
self._table.append_column(column)
for column in columns:
self._table.append_column(column)
self._table.create(self.database.executable, checkfirst=True)
elif len(columns):
with self.database.lock:
for column in columns:
self.database.op.add_column(self.name, column,
self.database.schema)
self._reflect_table()
def _sync_columns(self, row, ensure, types=None):
"""Create missing columns (or the table) prior to writes.
If automatic schema generation is disabled (``ensure`` is ``False``),
this will remove any keys from the ``row`` for which there is no
matching column.
"""
columns = self.columns
ensure = self._check_ensure(ensure)
types = types or {}
types = {normalize_column_name(k): v for (k, v) in types.items()}
out = {}
sync_columns = []
for name, value in row.items():
name = normalize_column_name(name)
if ensure and name not in columns:
_type = types.get(name)
if _type is None:
_type = self.database.types.guess(value)
log.debug("Create column: %s on %s", name, self.name)
self.create_column(name, _type)
sync_columns.append(Column(name, _type))
columns.append(name)
if name in columns:
out[name] = value
self._sync_table(sync_columns)
return out
def _check_ensure(self, ensure):
@ -238,6 +285,20 @@ class Table(object):
clauses.append(self.table.c[column] == value)
return and_(*clauses)
def _args_to_order_by(self, order_by):
orderings = []
for ordering in ensure_tuple(order_by):
if ordering is None:
continue
column = ordering.lstrip('-')
if column not in self.table.columns:
continue
if ordering.startswith('-'):
orderings.append(self.table.c[column].desc())
else:
orderings.append(self.table.c[column].asc())
return orderings
def _keys_to_args(self, row, keys):
keys = ensure_tuple(keys)
keys = [normalize_column_name(k) for k in keys]
@ -247,10 +308,7 @@ class Table(object):
return args, row
def create_column(self, name, type):
"""
Explicitly create a new column ``name`` of a specified type.
``type`` must be a `SQLAlchemy column type <http://docs.sqlalchemy.org/en/rel_0_8/core/types.html>`_.
"""Create a new column ``name`` of a specified type.
::
table.create_column('created_at', db.types.datetime)
@ -262,47 +320,61 @@ class Table(object):
log.debug("Column exists: %s" % name)
return
log.debug("Create column: %s on %s", name, self.name)
self.database.op.add_column(
self.table.name,
self.name,
Column(name, type),
self.table.schema
self.database.schema
)
self.table = self.database._reflect_table(self.table.name)
self._reflect_table()
def create_column_by_example(self, name, value):
"""
Explicitly create a new column ``name`` with a type that is appropriate to store
the given example ``value``. The type is guessed in the same way as for the
insert method with ``ensure=True``. If a column of the same name already exists,
no action is taken, even if it is not of the type we would have created.
Explicitly create a new column ``name`` with a type that is appropriate
to store the given example ``value``. The type is guessed in the same
way as for the insert method with ``ensure=True``.
::
table.create_column_by_example('length', 4.2)
If a column of the same name already exists, no action is taken, even
if it is not of the type we would have created.
"""
type_ = self.database.types.guess(value)
self.create_column(name, type_)
def drop_column(self, name):
"""Drop the column ``name``.
::
table.drop_column('created_at')
"""
if self.database.engine.dialect.name == 'sqlite':
raise NotImplementedError("SQLite does not support dropping columns.")
raise RuntimeError("SQLite does not support dropping columns.")
name = normalize_column_name(name)
if not self.exists or not self.has_column(name):
log.debug("Column does not exist: %s", name)
return
with self.database.lock:
if not self.exists or not self.has_column(name):
log.debug("Column does not exist: %s", name)
return
self.database.op.drop_column(
self.table.name,
name,
self.table.schema
)
self.table = self.database._reflect_table(self.table.name)
self._reflect_table()
def drop(self):
"""Drop the table from the database.
Deletes both the schema and all the contents within it.
"""
with self.database.lock:
if self.exists:
self.table.drop(self.database.executable, checkfirst=True)
self._table = None
def has_index(self, columns):
"""Check if an index exists to cover the given `columns`."""
"""Check if an index exists to cover the given ``columns``."""
if not self.exists:
return False
columns = set([normalize_column_name(c) for c in columns])
@ -320,42 +392,26 @@ class Table(object):
return False
def create_index(self, columns, name=None, **kw):
"""
Create an index to speed up queries on a table.
"""Create an index to speed up queries on a table.
If no ``name`` is given a random name is created.
::
table.create_index(['name', 'country'])
"""
columns = [normalize_column_name(c) for c in columns]
columns = [normalize_column_name(c) for c in ensure_tuple(columns)]
with self.database.lock:
if not self.exists:
# TODO
pass
raise DatasetException("Table has not been created yet.")
if not self.has_index(columns):
name = name or index_name(self.name, columns)
columns = [self.table.c[c] for c in columns]
idx = Index(name, *columns, **kw)
idx.create(self.database.executable)
def _args_to_order_by(self, order_by):
orderings = []
for ordering in ensure_tuple(order_by):
if ordering is None:
continue
column = ordering.lstrip('-')
if column not in self.table.columns:
continue
if ordering.startswith('-'):
orderings.append(self.table.c[column].desc())
else:
orderings.append(self.table.c[column].asc())
return orderings
def find(self, *_clauses, **kwargs):
"""
Perform a simple search on the table.
"""Perform a simple search on the table.
Simply pass keyword arguments as ``filter``.
::
@ -368,20 +424,20 @@ class Table(object):
# just return the first 10 rows
results = table.find(country='France', _limit=10)
You can sort the results by single or multiple columns. Append a minus sign
to the column name for descending order::
You can sort the results by single or multiple columns. Append a minus
sign to the column name for descending order::
# sort results by a column 'year'
results = table.find(country='France', order_by='year')
# return all rows sorted by multiple columns (by year in descending order)
# return all rows sorted by multiple columns (descending by year)
results = table.find(order_by=['country', '-year'])
For more complex queries, please use :py:meth:`db.query() <dataset.Database.query>`
To perform complex queries with advanced filters or to perform
aggregation, use :py:meth:`db.query() <dataset.Database.query>`
instead.
"""
_limit = kwargs.pop('_limit', None)
_offset = kwargs.pop('_offset', 0)
_step = kwargs.pop('_step', QUERY_STEP)
order_by = kwargs.pop('order_by', None)
if not self.exists:
@ -389,6 +445,7 @@ class Table(object):
order_by = self._args_to_order_by(order_by)
args = self._args_to_clause(kwargs, clauses=_clauses)
_step = kwargs.pop('_step', QUERY_STEP)
if _step is False or _step == 0:
_step = None
@ -405,7 +462,7 @@ class Table(object):
"""Get a single result from the table.
Works just like :py:meth:`find() <dataset.Table.find>` but returns one
result, or None.
result, or ``None``.
::
row = table.find_one(country='United States')

View File

@ -22,15 +22,6 @@ def convert_row(row_type, row):
return row_type(row.items())
def normalize_column_name(name):
if not isinstance(name, string_types):
raise ValueError('%r is not a valid column name.' % name)
name = name.strip()
if not len(name) or '.' in name or '-' in name:
raise ValueError('%r is not a valid column name.' % name)
return name
def iter_result_proxy(rp, step=None):
"""Iterate over the ResultProxy."""
while True:
@ -66,6 +57,26 @@ class ResultIter(object):
self.result_proxy.close()
def normalize_column_name(name):
"""Check if a string is a reasonable thing to use as a column name."""
if not isinstance(name, string_types):
raise ValueError('%r is not a valid column name.' % name)
name = name.strip()
if not len(name) or '.' in name or '-' in name:
raise ValueError('%r is not a valid column name.' % name)
return name
def normalize_table_name(name):
"""Check if the table name is obviously invalid."""
if not isinstance(name, string_types):
raise ValueError("Invalid table name: %r" % name)
name = name.strip()
if not len(name):
raise ValueError("Invalid table name: %r" % name)
return name
def safe_url(url):
"""Remove password from printed connection URLs."""
parsed = urlparse(url)

View File

@ -341,9 +341,11 @@ class TableTestCase(unittest.TestCase):
assert len(self.tbl) == len(data) + 6
def test_drop_operations(self):
assert self.tbl.table is not None, 'table shouldn\'t be dropped yet'
assert self.tbl._table is not None, \
'table shouldn\'t be dropped yet'
self.tbl.drop()
assert self.tbl.table is None, 'table should be dropped now'
assert self.tbl._table is None, \
'table should be dropped now'
assert list(self.tbl.all()) == [], self.tbl.all()
assert self.tbl.count() == 0, self.tbl.count()
@ -367,7 +369,7 @@ class TableTestCase(unittest.TestCase):
try:
self.tbl.drop_column('date')
assert 'date' not in self.tbl.columns
except NotImplementedError:
except RuntimeError:
pass
def test_iter(self):