add options to create custom primary id and type with shorthand format

This commit is contained in:
Yi Xie 2013-09-15 14:12:30 -04:00
parent 95cc5dd411
commit 257d767b6d
3 changed files with 54 additions and 17 deletions

View File

@ -7,9 +7,9 @@ warnings.filterwarnings(
from dataset.persistence.database import Database from dataset.persistence.database import Database
from dataset.persistence.table import Table from dataset.persistence.table import Table
from dataset.freeze.app import freeze from dataset.freeze.app import freeze
from sqlalchemy import Integer, String from sqlalchemy import Integer, Text
__all__ = ['Database', 'Table', 'freeze', 'connect', 'Integer', 'String'] __all__ = ['Database', 'Table', 'freeze', 'connect']
def connect(url=None, schema=None, reflectMetadata=True): def connect(url=None, schema=None, reflectMetadata=True):

View File

@ -7,7 +7,7 @@ from migrate.versioning.util import construct_engine
from sqlalchemy.pool import NullPool from sqlalchemy.pool import NullPool
from sqlalchemy.schema import MetaData, Column, Index from sqlalchemy.schema import MetaData, Column, Index
from sqlalchemy.schema import Table as SQLATable from sqlalchemy.schema import Table as SQLATable
from sqlalchemy import Integer, String from sqlalchemy import Integer, Text
from dataset.persistence.table import Table from dataset.persistence.table import Table
from dataset.persistence.util import ResultIter from dataset.persistence.util import ResultIter
@ -97,13 +97,13 @@ class Database(object):
return list(set(self.metadata.tables.keys() + return list(set(self.metadata.tables.keys() +
self._tables.keys())) self._tables.keys()))
def create_table(self, table_name, primary_id='id', primary_type=Integer): def create_table(self, table_name, primary_id='id', primary_type='Integer'):
""" """
Creates a new table. The new table will automatically have an `id` column Creates a new table. The new table will automatically have an `id` column
unless specified via optional parameter primary_id, which will be used unless specified via optional parameter primary_id, which will be used
as the primary key of the table. Automatic id is set to be an as the primary key of the table. Automatic id is set to be an
auto-incrementing integer, while the type of custom primary_id can be a auto-incrementing integer, while the type of custom primary_id can be a
String (i.e. VARCHAR(255)) or an Integer as specified with primary_type flag. Text or an Integer as specified with primary_type flag.
The caller will be responsible for the uniqueness of manual primary_id. The caller will be responsible for the uniqueness of manual primary_id.
This custom id feature is only available via direct create_table call. This custom id feature is only available via direct create_table call.
@ -112,22 +112,25 @@ class Database(object):
:: ::
table = db.create_table('population') table = db.create_table('population')
table2 = db.create_table('population2', primary_id='age', primary_type=Integer)
table3 = db.create_table('population3', primary_id='race', primary_type=String) # custom id and type
table2 = db.create_table('population2', 'age')
table3 = db.create_table('population3', primary_id='race', primary_type='Text')
""" """
self._acquire() self._acquire()
try: try:
log.debug("Creating table: %s on %r" % (table_name, self.engine)) log.debug("Creating table: %s on %r" % (table_name, self.engine))
table = SQLATable(table_name, self.metadata) table = SQLATable(table_name, self.metadata)
if primary_type is Integer: if primary_type is 'Integer':
auto_flag = False auto_flag = False
if primary_id is 'id': if primary_id is 'id':
auto_flag = True auto_flag = True
col = Column(primary_id, Integer, primary_key=True, autoincrement=auto_flag) col = Column(primary_id, Integer, primary_key=True, autoincrement=auto_flag)
elif primary_type is String: elif primary_type is 'Text':
col = Column(primary_id, String(255), primary_key=True) col = Column(primary_id, Text, primary_key=True)
else: else:
raise DatasetException("The primary_type has to be either int or str.") raise DatasetException(
"The primary_type has to be either 'Integer' or 'Text'.")
table.append_column(col) table.append_column(col)
table.create(self.engine) table.create(self.engine)
@ -157,10 +160,12 @@ class Database(object):
finally: finally:
self._release() self._release()
def get_table(self, table_name): def get_table(self, table_name, primary_id='id', primary_type='Integer'):
""" """
Smart wrapper around *load_table* and *create_table*. Either loads a table Smart wrapper around *load_table* and *create_table*. Either loads a table
or creates it if it doesn't exist yet. or creates it if it doesn't exist yet.
For short-hand to create a table with custom id and type using [], where
table_name, primary_id, and primary_type are specified as a tuple
Returns a :py:class:`Table <dataset.Table>` instance. Returns a :py:class:`Table <dataset.Table>` instance.
:: ::
@ -168,6 +173,10 @@ class Database(object):
table = db.get_table('population') table = db.get_table('population')
# you can also use the short-hand syntax: # you can also use the short-hand syntax:
table = db['population'] table = db['population']
# custom id and type
table2 = db['population2', 'age'] # default type is 'Integer'
table3 = db['population3', 'race', 'Text']
""" """
if table_name in self._tables: if table_name in self._tables:
return Table(self, self._tables[table_name]) return Table(self, self._tables[table_name])
@ -176,12 +185,15 @@ class Database(object):
if self.engine.has_table(table_name, schema=self.schema): if self.engine.has_table(table_name, schema=self.schema):
return self.load_table(table_name) return self.load_table(table_name)
else: else:
return self.create_table(table_name) return self.create_table(table_name, primary_id, primary_type)
finally: finally:
self._release() self._release()
def __getitem__(self, table_name): def __getitem__(self, table_name):
return self.get_table(table_name) if type(table_name) is tuple:
return self.get_table(*table_name[:3])
else:
return self.get_table(table_name)
def query(self, query, **kw): def query(self, query, **kw):
""" """

View File

@ -2,7 +2,7 @@ import os
import unittest import unittest
from datetime import datetime from datetime import datetime
from dataset import connect, Integer, String from dataset import connect
from dataset.util import DatasetException from dataset.util import DatasetException
from sample_data import TEST_DATA from sample_data import TEST_DATA
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
@ -35,7 +35,7 @@ class DatabaseTestCase(unittest.TestCase):
def test_create_table_custom_id1(self): def test_create_table_custom_id1(self):
pid = "string_id" pid = "string_id"
table = self.db.create_table("foo2", primary_id = pid, primary_type=String) table = self.db.create_table("foo2", pid, 'Text')
assert table.table.exists() assert table.table.exists()
assert len(table.table.columns) == 1, table.table.columns assert len(table.table.columns) == 1, table.table.columns
assert pid in table.table.c, table.table.c assert pid in table.table.c, table.table.c
@ -46,7 +46,7 @@ class DatabaseTestCase(unittest.TestCase):
def test_create_table_custom_id2(self): def test_create_table_custom_id2(self):
pid = "int_id" pid = "int_id"
table = self.db.create_table("foo3", primary_id = pid, primary_type=Integer) table = self.db.create_table("foo3", primary_id = pid)
assert table.table.exists() assert table.table.exists()
assert len(table.table.columns) == 1, table.table.columns assert len(table.table.columns) == 1, table.table.columns
assert pid in table.table.c, table.table.c assert pid in table.table.c, table.table.c
@ -58,6 +58,31 @@ class DatabaseTestCase(unittest.TestCase):
with self.assertRaises(IntegrityError): with self.assertRaises(IntegrityError):
table.insert({'int_id': 123}) table.insert({'int_id': 123})
def test_create_table_shorthand1(self):
pid = "int_id"
table = self.db['foo4', pid]
assert table.table.exists
assert len(table.table.columns) == 1, table.table.columns
assert pid in table.table.c, table.table.c
table.insert({'int_id': 123})
table.insert({'int_id': 124})
assert table.find_one(int_id = 123)[0] == 123
assert table.find_one(int_id = 124)[0] == 124
with self.assertRaises(IntegrityError):
table.insert({'int_id': 123})
def test_create_table_shorthand2(self):
pid = "string_id"
table = self.db['foo5', pid, 'Text']
assert table.table.exists
assert len(table.table.columns) == 1, table.table.columns
assert pid in table.table.c, table.table.c
table.insert({
'string_id': 'foobar'})
assert table.find_one(string_id = 'foobar')[0] == 'foobar'
def test_load_table(self): def test_load_table(self):
tbl = self.db.load_table('weather') tbl = self.db.load_table('weather')
assert tbl.table == self.tbl.table assert tbl.table == self.tbl.table