Merge pull request #27 from dnatag/master

create table with custom primary_id
This commit is contained in:
Friedrich Lindenberg 2013-09-18 03:49:44 -07:00
commit 4ca0b4f167
3 changed files with 91 additions and 10 deletions

View File

@ -7,6 +7,7 @@ warnings.filterwarnings(
from dataset.persistence.database import Database
from dataset.persistence.table import Table
from dataset.freeze.app import freeze
from sqlalchemy import Integer, Text
__all__ = ['Database', 'Table', 'freeze', 'connect']

View File

@ -7,11 +7,11 @@ from migrate.versioning.util import construct_engine
from sqlalchemy.pool import NullPool
from sqlalchemy.schema import MetaData, Column, Index
from sqlalchemy.schema import Table as SQLATable
from sqlalchemy import Integer
from sqlalchemy import Integer, Text
from dataset.persistence.table import Table
from dataset.persistence.util import ResultIter
from dataset.util import DatasetException
log = logging.getLogger(__name__)
@ -97,21 +97,41 @@ class Database(object):
return list(set(self.metadata.tables.keys() +
self._tables.keys()))
def create_table(self, table_name):
def create_table(self, table_name, primary_id='id', primary_type='Integer'):
"""
Creates a new table. The new table will automatically have an `id` column, which is
set to be an auto-incrementing integer as the primary key of the table.
Creates a new table. The new table will automatically have an `id` column
unless specified via optional parameter primary_id, which will be used
as the primary key of the table. Automatic id is set to be an
auto-incrementing integer, while the type of custom primary_id can be a
Text or an Integer as specified with primary_type flag.
The caller will be responsible for the uniqueness of manual primary_id.
This custom id feature is only available via direct create_table call.
Returns a :py:class:`Table <dataset.Table>` instance.
::
table = db.create_table('population')
# custom id and type
table2 = db.create_table('population2', 'age')
table3 = db.create_table('population3', primary_id='race', primary_type='Text')
"""
self._acquire()
try:
log.debug("Creating table: %s on %r" % (table_name, self.engine))
table = SQLATable(table_name, self.metadata)
col = Column('id', Integer, primary_key=True)
if primary_type is 'Integer':
auto_flag = False
if primary_id is 'id':
auto_flag = True
col = Column(primary_id, Integer, primary_key=True, autoincrement=auto_flag)
elif primary_type is 'Text':
col = Column(primary_id, Text, primary_key=True)
else:
raise DatasetException(
"The primary_type has to be either 'Integer' or 'Text'.")
table.append_column(col)
table.create(self.engine)
self._tables[table_name] = table
@ -140,10 +160,12 @@ class Database(object):
finally:
self._release()
def get_table(self, table_name):
def get_table(self, table_name, primary_id='id', primary_type='Integer'):
"""
Smart wrapper around *load_table* and *create_table*. Either loads a table
or creates it if it doesn't exist yet.
For short-hand to create a table with custom id and type using [], where
table_name, primary_id, and primary_type are specified as a tuple
Returns a :py:class:`Table <dataset.Table>` instance.
::
@ -151,6 +173,10 @@ class Database(object):
table = db.get_table('population')
# you can also use the short-hand syntax:
table = db['population']
# custom id and type
table2 = db['population2', 'age'] # default type is 'Integer'
table3 = db['population3', 'race', 'Text']
"""
if table_name in self._tables:
return Table(self, self._tables[table_name])
@ -159,12 +185,15 @@ class Database(object):
if self.engine.has_table(table_name, schema=self.schema):
return self.load_table(table_name)
else:
return self.create_table(table_name)
return self.create_table(table_name, primary_id, primary_type)
finally:
self._release()
def __getitem__(self, table_name):
return self.get_table(table_name)
if type(table_name) is tuple:
return self.get_table(*table_name[:3])
else:
return self.get_table(table_name)
def query(self, query, **kw):
"""

View File

@ -5,13 +5,14 @@ from datetime import datetime
from dataset import connect
from dataset.util import DatasetException
from sample_data import TEST_DATA
from sqlalchemy.exc import IntegrityError
class DatabaseTestCase(unittest.TestCase):
def setUp(self):
os.environ['DATABASE_URL'] = 'sqlite:///:memory:'
self.db = connect()
self.db = connect('sqlite:///:memory:')
self.tbl = self.db['weather']
for row in TEST_DATA:
self.tbl.insert(row)
@ -32,6 +33,56 @@ class DatabaseTestCase(unittest.TestCase):
assert len(table.table.columns) == 1, table.table.columns
assert 'id' in table.table.c, table.table.c
def test_create_table_custom_id1(self):
pid = "string_id"
table = self.db.create_table("foo2", pid, 'Text')
assert table.table.exists()
assert len(table.table.columns) == 1, table.table.columns
assert pid in table.table.c, table.table.c
table.insert({
'string_id': 'foobar'})
assert table.find_one(string_id = 'foobar')[0] == 'foobar'
def test_create_table_custom_id2(self):
pid = "int_id"
table = self.db.create_table("foo3", primary_id = pid)
assert table.table.exists()
assert len(table.table.columns) == 1, table.table.columns
assert pid in table.table.c, table.table.c
table.insert({'int_id': 123})
table.insert({'int_id': 124})
assert table.find_one(int_id = 123)[0] == 123
assert table.find_one(int_id = 124)[0] == 124
with self.assertRaises(IntegrityError):
table.insert({'int_id': 123})
def test_create_table_shorthand1(self):
pid = "int_id"
table = self.db['foo4', pid]
assert table.table.exists
assert len(table.table.columns) == 1, table.table.columns
assert pid in table.table.c, table.table.c
table.insert({'int_id': 123})
table.insert({'int_id': 124})
assert table.find_one(int_id = 123)[0] == 123
assert table.find_one(int_id = 124)[0] == 124
with self.assertRaises(IntegrityError):
table.insert({'int_id': 123})
def test_create_table_shorthand2(self):
pid = "string_id"
table = self.db['foo5', pid, 'Text']
assert table.table.exists
assert len(table.table.columns) == 1, table.table.columns
assert pid in table.table.c, table.table.c
table.insert({
'string_id': 'foobar'})
assert table.find_one(string_id = 'foobar')[0] == 'foobar'
def test_load_table(self):
tbl = self.db.load_table('weather')
assert tbl.table == self.tbl.table