From 4232606d270d108e9d1df938e691a9a8dcd1648f Mon Sep 17 00:00:00 2001 From: Friedrich Lindenberg Date: Sun, 3 Sep 2017 23:23:57 +0200 Subject: [PATCH] Move to a model where the table is created lazily, with an initial set of columns --- dataset/persistence/database.py | 86 ++++-------- dataset/persistence/table.py | 225 ++++++++++++++++++++------------ dataset/persistence/util.py | 29 ++-- test/test_persistence.py | 8 +- 4 files changed, 191 insertions(+), 157 deletions(-) diff --git a/dataset/persistence/database.py b/dataset/persistence/database.py index 4a28690..01663a7 100644 --- a/dataset/persistence/database.py +++ b/dataset/persistence/database.py @@ -6,11 +6,9 @@ from six.moves.urllib.parse import parse_qs, urlparse from sqlalchemy import create_engine from sqlalchemy.sql import text -from sqlalchemy.schema import MetaData, Column -from sqlalchemy.schema import Table as SQLATable +from sqlalchemy.schema import MetaData from sqlalchemy.pool import StaticPool from sqlalchemy.util import safe_reraise -from sqlalchemy.exc import NoSuchTableError from sqlalchemy.engine.reflection import Inspector from alembic.migration import MigrationContext @@ -18,6 +16,7 @@ from alembic.operations import Operations from dataset.persistence.table import Table from dataset.persistence.util import ResultIter, row_type, safe_url, QUERY_STEP +from dataset.persistence.util import normalize_table_name from dataset.persistence.types import Types log = logging.getLogger(__name__) @@ -94,8 +93,7 @@ class Database(object): tx.commit() def rollback(self): - """ - Roll back the current transaction. + """Roll back the current transaction. Discard all statements executed since the transaction was begun. """ @@ -128,24 +126,18 @@ class Database(object): def __contains__(self, table_name): """Check if the given table name exists in the database.""" try: - return self._valid_table_name(table_name) in self.tables + return normalize_table_name(table_name) in self.tables except ValueError: return False - def _valid_table_name(self, table_name): - """Check if the table name is obviously invalid.""" - if table_name is None or not len(table_name.strip()): - raise ValueError("Invalid table name: %r" % table_name) - return table_name.strip() - def create_table(self, table_name, primary_id=None, primary_type=None): """Create a new table. Either loads a table or creates it if it doesn't exist yet. You can define the name and type of the primary key field, if a new table is to be created. The default is to create an auto-incrementing integer, - `id`. You can also set the primary key to be a string or big integer. - The caller will be responsible for the uniqueness of `primary_id` if + ``id``. You can also set the primary key to be a string or big integer. + The caller will be responsible for the uniqueness of ``primary_id`` if it is defined as a text type. Returns a :py:class:`Table ` instance. @@ -168,64 +160,37 @@ class Database(object): """ assert not isinstance(primary_type, six.string_types), \ 'Text-based primary_type support is dropped, use db.types.' - table_name = self._valid_table_name(table_name) + table_name = normalize_table_name(table_name) with self.lock: - if table_name in self: - return self.load_table(table_name) - log.debug("Creating table: %s" % (table_name)) - table = SQLATable(table_name, self.metadata, schema=self.schema) - if primary_id is not False: - primary_id = primary_id or Table.PRIMARY_DEFAULT - primary_type = primary_type or self.types.integer - autoincrement = primary_type in [self.types.integer, - self.types.bigint] - col = Column(primary_id, primary_type, - primary_key=True, - autoincrement=autoincrement) - table.append_column(col) - table.create(self.executable, checkfirst=True) - self._tables[table_name] = Table(self, table) - return self._tables[table_name] + if table_name not in self._tables: + self._tables[table_name] = Table(self, table_name, + primary_id=primary_id, + primary_type=primary_type, + auto_create=True) + return self._tables.get(table_name) def load_table(self, table_name): """Load a table. - This will fail if the tables does not already exist in the database. If the - table exists, its columns will be reflected and are available on the - :py:class:`Table ` object. + This will fail if the tables does not already exist in the database. If + the table exists, its columns will be reflected and are available on + the :py:class:`Table ` object. Returns a :py:class:`Table ` instance. :: table = db.load_table('population') """ - table_name = self._valid_table_name(table_name) - if table_name in self._tables: - return self._tables.get(table_name) - log.debug("Loading table: %s", table_name) + table_name = normalize_table_name(table_name) with self.lock: - table = self._reflect_table(table_name) - if table is not None: - self._tables[table_name] = Table(self, table) - return self._tables[table_name] - - def _reflect_table(self, table_name): - """Reload a table schema from the database.""" - table_name = self._valid_table_name(table_name) - try: - table = SQLATable(table_name, - self.metadata, - schema=self.schema, - autoload=True, - autoload_with=self.executable) - return table - except NoSuchTableError: - return None + if table_name not in self._tables: + self._tables[table_name] = Table(self, table_name) + return self._tables.get(table_name) def get_table(self, table_name, primary_id=None, primary_type=None): """Load or create a table. - This is now the same as `create_table`. + This is now the same as ``create_table``. :: table = db.get_table('population') @@ -249,17 +214,16 @@ class Database(object): Further positional and keyword arguments will be used for parameter binding. To include a positional argument in your query, use question - marks in the query (i.e. `SELECT * FROM tbl WHERE a = ?`). For keyword - arguments, use a bind parameter (i.e. `SELECT * FROM tbl WHERE a = - :foo`). - - The returned iterator will yield each result sequentially. + marks in the query (i.e. ``SELECT * FROM tbl WHERE a = ?```). For + keyword arguments, use a bind parameter (i.e. ``SELECT * FROM tbl + WHERE a = :foo``). :: statement = 'SELECT user, COUNT(*) c FROM photos GROUP BY user' for row in db.query(statement): print(row['user'], row['c']) + The returned iterator will yield each result sequentially. """ if isinstance(query, six.string_types): query = text(query) diff --git a/dataset/persistence/table.py b/dataset/persistence/table.py index 581c690..c0b8ed9 100644 --- a/dataset/persistence/table.py +++ b/dataset/persistence/table.py @@ -5,9 +5,13 @@ from sqlalchemy.sql.expression import ClauseElement from sqlalchemy.schema import Column, Index from sqlalchemy import func, select, false from sqlalchemy.engine.reflection import Inspector +from sqlalchemy.schema import Table as SQLATable +from sqlalchemy.exc import NoSuchTableError +from dataset.persistence.types import Types from dataset.persistence.util import normalize_column_name, index_name from dataset.persistence.util import ensure_tuple, ResultIter, QUERY_STEP +from dataset.persistence.util import normalize_table_name from dataset.util import DatasetException @@ -18,21 +22,31 @@ class Table(object): """Represents a table in a database and exposes common operations.""" PRIMARY_DEFAULT = 'id' - def __init__(self, database, table): + def __init__(self, database, table_name, primary_id=None, + primary_type=None, auto_create=False): """Initialise the table from database schema.""" self.database = database - self.name = table.name - self.table = table - self._is_dropped = False + self.name = normalize_table_name(table_name) + self._table = None self._indexes = [] + self._primary_id = primary_id + self._primary_type = primary_type + self._auto_create = auto_create @property def exists(self): """Check to see if the table currently exists in the database.""" - if self.table is not None: + if self._table is not None: return True return self.name in self.database + @property + def table(self): + """Get a reference to the table, which may be reflected or created.""" + if self._table is None: + self._sync_table(()) + return self._table + @property def columns(self): """Get a listing of all columns that exist in the table.""" @@ -40,24 +54,12 @@ class Table(object): return [] return self.table.columns.keys() - def drop(self): - """ - Drop the table from the database. - - Delete both the schema and all the contents within it. - Note: the object will raise an Exception if you use it after - dropping the table. If you want to re-create the table, make - sure to get a fresh instance from the :py:class:`Database `. - """ - with self.database.lock: - if not self.exists: - return - self.table.drop(self.database.executable, checkfirst=True) - self.table = None + def has_column(self, column): + """Check if a column with the given name exists on this table.""" + return normalize_column_name(column) in self.columns def insert(self, row, ensure=None, types=None): - """ - Add a row (type: dict) by inserting it into the table. + """Add a ``row`` dict by inserting it into the table. If ``ensure`` is set, any of the keys of the row are not table columns, they will be created automatically. @@ -81,8 +83,7 @@ class Table(object): return True def insert_ignore(self, row, keys, ensure=None, types=None): - """ - Add a row (type: dict) into the table if the row does not exist. + """Add a ``row`` dict into the table if the row does not exist. If rows with matching ``keys`` exist they will be added to the table. @@ -108,8 +109,7 @@ class Table(object): return False def insert_many(self, rows, chunk_size=1000, ensure=None, types=None): - """ - Add many rows at a time. + """Add many rows at a time. This is significantly faster than adding them one by one. Per default the rows are processed in chunks of 1000 per commit, unless you specify @@ -134,21 +134,20 @@ class Table(object): self.table.insert().execute(chunk) def update(self, row, keys, ensure=None, types=None, return_count=False): - """ - Update a row in the table. + """Update a row in the table. The update is managed via the set of column names stated in ``keys``: - they will be used as filters for the data to be updated, using the values - in ``row``. + they will be used as filters for the data to be updated, using the + values in ``row``. :: # update all entries with id matching 10, setting their title columns data = dict(id=10, title='I am a banana!') table.update(data, ['id']) - If keys in ``row`` update columns not present in the table, - they will be created based on the settings of ``ensure`` and - ``types``, matching the behavior of :py:meth:`insert() `. + If keys in ``row`` update columns not present in the table, they will + be created based on the settings of ``ensure`` and ``types``, matching + the behavior of :py:meth:`insert() `. """ row = self._sync_columns(row, ensure, types=types) args, row = self._keys_to_args(row, keys) @@ -185,8 +184,7 @@ class Table(object): Keyword arguments can be used to add column-based filters. The filter criterion will always be equality: - - .. code-block:: python + :: table.delete(place='Berlin') @@ -199,27 +197,76 @@ class Table(object): rp = self.database.executable.execute(stmt) return rp.rowcount > 0 - def has_column(self, column): - """Check if a column with the given name exists on this table.""" - return normalize_column_name(column) in self.columns + def _reflect_table(self): + """Load the tables definition from the database.""" + with self.database.lock: + try: + self._table = SQLATable(self.name, + self.database.metadata, + schema=self.database.schema, + autoload=True) + except NoSuchTableError: + pass + + def _sync_table(self, columns): + """Lazy load, create or adapt the table structure in the database.""" + if self._table is None: + # Load an existing table from the database. + self._reflect_table() + if self._table is None: + # Create the table with an initial set of columns. + if not self._auto_create: + raise DatasetException("Table does not exist: %s" % self.name) + # Keep the lock scope small because this is run very often. + with self.database.lock: + self._table = SQLATable(self.name, + self.database.metadata, + schema=self.database.schema) + if self._primary_id is not False: + # This can go wrong on DBMS like MySQL and SQLite where + # tables cannot have no columns. + primary_id = self._primary_id or self.PRIMARY_DEFAULT + primary_type = self._primary_type or Types.integer + autoincrement = primary_type in [Types.integer, + Types.bigint] + column = Column(primary_id, primary_type, + primary_key=True, + autoincrement=autoincrement) + self._table.append_column(column) + for column in columns: + self._table.append_column(column) + self._table.create(self.database.executable, checkfirst=True) + elif len(columns): + with self.database.lock: + for column in columns: + self.database.op.add_column(self.name, column, + self.database.schema) + self._reflect_table() def _sync_columns(self, row, ensure, types=None): + """Create missing columns (or the table) prior to writes. + + If automatic schema generation is disabled (``ensure`` is ``False``), + this will remove any keys from the ``row`` for which there is no + matching column. + """ columns = self.columns ensure = self._check_ensure(ensure) types = types or {} types = {normalize_column_name(k): v for (k, v) in types.items()} out = {} + sync_columns = [] for name, value in row.items(): name = normalize_column_name(name) if ensure and name not in columns: _type = types.get(name) if _type is None: _type = self.database.types.guess(value) - log.debug("Create column: %s on %s", name, self.name) - self.create_column(name, _type) + sync_columns.append(Column(name, _type)) columns.append(name) if name in columns: out[name] = value + self._sync_table(sync_columns) return out def _check_ensure(self, ensure): @@ -238,6 +285,20 @@ class Table(object): clauses.append(self.table.c[column] == value) return and_(*clauses) + def _args_to_order_by(self, order_by): + orderings = [] + for ordering in ensure_tuple(order_by): + if ordering is None: + continue + column = ordering.lstrip('-') + if column not in self.table.columns: + continue + if ordering.startswith('-'): + orderings.append(self.table.c[column].desc()) + else: + orderings.append(self.table.c[column].asc()) + return orderings + def _keys_to_args(self, row, keys): keys = ensure_tuple(keys) keys = [normalize_column_name(k) for k in keys] @@ -247,10 +308,7 @@ class Table(object): return args, row def create_column(self, name, type): - """ - Explicitly create a new column ``name`` of a specified type. - - ``type`` must be a `SQLAlchemy column type `_. + """Create a new column ``name`` of a specified type. :: table.create_column('created_at', db.types.datetime) @@ -262,47 +320,61 @@ class Table(object): log.debug("Column exists: %s" % name) return + log.debug("Create column: %s on %s", name, self.name) self.database.op.add_column( - self.table.name, + self.name, Column(name, type), - self.table.schema + self.database.schema ) - self.table = self.database._reflect_table(self.table.name) + self._reflect_table() def create_column_by_example(self, name, value): """ - Explicitly create a new column ``name`` with a type that is appropriate to store - the given example ``value``. The type is guessed in the same way as for the - insert method with ``ensure=True``. If a column of the same name already exists, - no action is taken, even if it is not of the type we would have created. + Explicitly create a new column ``name`` with a type that is appropriate + to store the given example ``value``. The type is guessed in the same + way as for the insert method with ``ensure=True``. + :: table.create_column_by_example('length', 4.2) + + If a column of the same name already exists, no action is taken, even + if it is not of the type we would have created. """ type_ = self.database.types.guess(value) self.create_column(name, type_) def drop_column(self, name): """Drop the column ``name``. - :: table.drop_column('created_at') """ if self.database.engine.dialect.name == 'sqlite': - raise NotImplementedError("SQLite does not support dropping columns.") + raise RuntimeError("SQLite does not support dropping columns.") name = normalize_column_name(name) - if not self.exists or not self.has_column(name): - log.debug("Column does not exist: %s", name) - return with self.database.lock: + if not self.exists or not self.has_column(name): + log.debug("Column does not exist: %s", name) + return + self.database.op.drop_column( self.table.name, name, self.table.schema ) - self.table = self.database._reflect_table(self.table.name) + self._reflect_table() + + def drop(self): + """Drop the table from the database. + + Deletes both the schema and all the contents within it. + """ + with self.database.lock: + if self.exists: + self.table.drop(self.database.executable, checkfirst=True) + self._table = None def has_index(self, columns): - """Check if an index exists to cover the given `columns`.""" + """Check if an index exists to cover the given ``columns``.""" if not self.exists: return False columns = set([normalize_column_name(c) for c in columns]) @@ -320,42 +392,26 @@ class Table(object): return False def create_index(self, columns, name=None, **kw): - """ - Create an index to speed up queries on a table. + """Create an index to speed up queries on a table. If no ``name`` is given a random name is created. :: table.create_index(['name', 'country']) """ - columns = [normalize_column_name(c) for c in columns] + columns = [normalize_column_name(c) for c in ensure_tuple(columns)] with self.database.lock: if not self.exists: - # TODO - pass + raise DatasetException("Table has not been created yet.") + if not self.has_index(columns): name = name or index_name(self.name, columns) columns = [self.table.c[c] for c in columns] idx = Index(name, *columns, **kw) idx.create(self.database.executable) - def _args_to_order_by(self, order_by): - orderings = [] - for ordering in ensure_tuple(order_by): - if ordering is None: - continue - column = ordering.lstrip('-') - if column not in self.table.columns: - continue - if ordering.startswith('-'): - orderings.append(self.table.c[column].desc()) - else: - orderings.append(self.table.c[column].asc()) - return orderings - def find(self, *_clauses, **kwargs): - """ - Perform a simple search on the table. + """Perform a simple search on the table. Simply pass keyword arguments as ``filter``. :: @@ -368,20 +424,20 @@ class Table(object): # just return the first 10 rows results = table.find(country='France', _limit=10) - You can sort the results by single or multiple columns. Append a minus sign - to the column name for descending order:: + You can sort the results by single or multiple columns. Append a minus + sign to the column name for descending order:: # sort results by a column 'year' results = table.find(country='France', order_by='year') - # return all rows sorted by multiple columns (by year in descending order) + # return all rows sorted by multiple columns (descending by year) results = table.find(order_by=['country', '-year']) - For more complex queries, please use :py:meth:`db.query() ` + To perform complex queries with advanced filters or to perform + aggregation, use :py:meth:`db.query() ` instead. """ _limit = kwargs.pop('_limit', None) _offset = kwargs.pop('_offset', 0) - _step = kwargs.pop('_step', QUERY_STEP) order_by = kwargs.pop('order_by', None) if not self.exists: @@ -389,6 +445,7 @@ class Table(object): order_by = self._args_to_order_by(order_by) args = self._args_to_clause(kwargs, clauses=_clauses) + _step = kwargs.pop('_step', QUERY_STEP) if _step is False or _step == 0: _step = None @@ -405,7 +462,7 @@ class Table(object): """Get a single result from the table. Works just like :py:meth:`find() ` but returns one - result, or None. + result, or ``None``. :: row = table.find_one(country='United States') diff --git a/dataset/persistence/util.py b/dataset/persistence/util.py index 94cf79f..5a71d60 100644 --- a/dataset/persistence/util.py +++ b/dataset/persistence/util.py @@ -22,15 +22,6 @@ def convert_row(row_type, row): return row_type(row.items()) -def normalize_column_name(name): - if not isinstance(name, string_types): - raise ValueError('%r is not a valid column name.' % name) - name = name.strip() - if not len(name) or '.' in name or '-' in name: - raise ValueError('%r is not a valid column name.' % name) - return name - - def iter_result_proxy(rp, step=None): """Iterate over the ResultProxy.""" while True: @@ -66,6 +57,26 @@ class ResultIter(object): self.result_proxy.close() +def normalize_column_name(name): + """Check if a string is a reasonable thing to use as a column name.""" + if not isinstance(name, string_types): + raise ValueError('%r is not a valid column name.' % name) + name = name.strip() + if not len(name) or '.' in name or '-' in name: + raise ValueError('%r is not a valid column name.' % name) + return name + + +def normalize_table_name(name): + """Check if the table name is obviously invalid.""" + if not isinstance(name, string_types): + raise ValueError("Invalid table name: %r" % name) + name = name.strip() + if not len(name): + raise ValueError("Invalid table name: %r" % name) + return name + + def safe_url(url): """Remove password from printed connection URLs.""" parsed = urlparse(url) diff --git a/test/test_persistence.py b/test/test_persistence.py index 0559bfa..b2be9da 100644 --- a/test/test_persistence.py +++ b/test/test_persistence.py @@ -341,9 +341,11 @@ class TableTestCase(unittest.TestCase): assert len(self.tbl) == len(data) + 6 def test_drop_operations(self): - assert self.tbl.table is not None, 'table shouldn\'t be dropped yet' + assert self.tbl._table is not None, \ + 'table shouldn\'t be dropped yet' self.tbl.drop() - assert self.tbl.table is None, 'table should be dropped now' + assert self.tbl._table is None, \ + 'table should be dropped now' assert list(self.tbl.all()) == [], self.tbl.all() assert self.tbl.count() == 0, self.tbl.count() @@ -367,7 +369,7 @@ class TableTestCase(unittest.TestCase): try: self.tbl.drop_column('date') assert 'date' not in self.tbl.columns - except NotImplementedError: + except RuntimeError: pass def test_iter(self):