import os import unittest from datetime import datetime try: from collections import OrderedDict except ImportError: from ordereddict import OrderedDict # Python < 2.7 drop-in from sqlalchemy.exc import IntegrityError, SQLAlchemyError from dataset import connect from dataset.util import DatasetException from .sample_data import TEST_DATA, TEST_CITY_1 class DatabaseTestCase(unittest.TestCase): def setUp(self): os.environ.setdefault('DATABASE_URL', 'sqlite:///:memory:') self.db = connect(os.environ['DATABASE_URL']) self.tbl = self.db['weather'] for row in TEST_DATA: self.tbl.insert(row) def tearDown(self): for table in self.db.tables: self.db[table].drop() def test_valid_database_url(self): assert self.db.url, os.environ['DATABASE_URL'] def test_database_url_query_string(self): db = connect('sqlite:///:memory:/?cached_statements=1') assert 'cached_statements' in db.url, db.url def test_tables(self): assert self.db.tables == ['weather'], self.db.tables def test_create_table(self): table = self.db['foo'] assert table.table.exists() assert len(table.table.columns) == 1, table.table.columns assert 'id' in table.table.c, table.table.c def test_create_table_custom_id1(self): pid = "string_id" table = self.db.create_table("foo2", pid, 'String') assert table.table.exists() assert len(table.table.columns) == 1, table.table.columns assert pid in table.table.c, table.table.c table.insert({ 'string_id': 'foobar'}) assert table.find_one(string_id='foobar')['string_id'] == 'foobar' def test_create_table_custom_id2(self): pid = "string_id" table = self.db.create_table("foo3", pid, 'String(50)') assert table.table.exists() assert len(table.table.columns) == 1, table.table.columns assert pid in table.table.c, table.table.c table.insert({ 'string_id': 'foobar'}) assert table.find_one(string_id='foobar')['string_id'] == 'foobar' def test_create_table_custom_id3(self): pid = "int_id" table = self.db.create_table("foo4", primary_id=pid) assert table.table.exists() assert len(table.table.columns) == 1, table.table.columns assert pid in table.table.c, table.table.c table.insert({'int_id': 123}) table.insert({'int_id': 124}) assert table.find_one(int_id=123)['int_id'] == 123 assert table.find_one(int_id=124)['int_id'] == 124 self.assertRaises(IntegrityError, lambda: table.insert({'int_id': 123})) def test_create_table_shorthand1(self): pid = "int_id" table = self.db.get_table('foo5', pid) assert table.table.exists assert len(table.table.columns) == 1, table.table.columns assert pid in table.table.c, table.table.c table.insert({'int_id': 123}) table.insert({'int_id': 124}) assert table.find_one(int_id=123)['int_id'] == 123 assert table.find_one(int_id=124)['int_id'] == 124 self.assertRaises(IntegrityError, lambda: table.insert({'int_id': 123})) def test_create_table_shorthand2(self): pid = "string_id" table = self.db.get_table('foo6', primary_id=pid, primary_type='String') assert table.table.exists assert len(table.table.columns) == 1, table.table.columns assert pid in table.table.c, table.table.c table.insert({ 'string_id': 'foobar'}) assert table.find_one(string_id='foobar')['string_id'] == 'foobar' def test_create_table_shorthand3(self): pid = "string_id" table = self.db.get_table('foo7', primary_id=pid, primary_type='String(20)') assert table.table.exists assert len(table.table.columns) == 1, table.table.columns assert pid in table.table.c, table.table.c table.insert({ 'string_id': 'foobar'}) assert table.find_one(string_id='foobar')['string_id'] == 'foobar' def test_with(self): init_length = len(self.db['weather']) try: with self.db as tx: tx['weather'].insert({'date': datetime(2011, 1, 1), 'temperature': 1, 'place': u'tmp_place'}) tx['weather'].insert({'date': True, 'temperature': 'wrong_value', 'place': u'tmp_place'}) except SQLAlchemyError: pass assert len(self.db['weather']) == init_length def test_load_table(self): tbl = self.db.load_table('weather') assert tbl.table == self.tbl.table def test_query(self): r = self.db.query('SELECT COUNT(*) AS num FROM weather').next() assert r['num'] == len(TEST_DATA), r def test_table_cache_updates(self): tbl1 = self.db.get_table('people') data = OrderedDict([('first_name', 'John'), ('last_name', 'Smith')]) tbl1.insert(data) data['id'] = 1 tbl2 = self.db.get_table('people') assert dict(tbl2.all().next()) == dict(data), (tbl2.all().next(), data) class TableTestCase(unittest.TestCase): def setUp(self): self.db = connect('sqlite:///:memory:') self.tbl = self.db['weather'] for row in TEST_DATA: self.tbl.insert(row) def test_insert(self): assert len(self.tbl) == len(TEST_DATA), len(self.tbl) last_id = self.tbl.insert({ 'date': datetime(2011, 1, 2), 'temperature': -10, 'place': 'Berlin'} ) assert len(self.tbl) == len(TEST_DATA) + 1, len(self.tbl) assert self.tbl.find_one(id=last_id)['place'] == 'Berlin' def test_upsert(self): self.tbl.upsert({ 'date': datetime(2011, 1, 2), 'temperature': -10, 'place': 'Berlin'}, ['place'] ) assert len(self.tbl) == len(TEST_DATA) + 1, len(self.tbl) self.tbl.upsert({ 'date': datetime(2011, 1, 2), 'temperature': -10, 'place': 'Berlin'}, ['place'] ) assert len(self.tbl) == len(TEST_DATA) + 1, len(self.tbl) def test_upsert_all_key(self): for i in range(0, 2): self.tbl.upsert({ 'date': datetime(2011, 1, 2), 'temperature': -10, 'place': 'Berlin'}, ['date', 'temperature', 'place'] ) def test_delete(self): self.tbl.insert({ 'date': datetime(2011, 1, 2), 'temperature': -10, 'place': 'Berlin'} ) assert len(self.tbl) == len(TEST_DATA) + 1, len(self.tbl) assert self.tbl.delete(place='Berlin') is True, 'should return 1' assert len(self.tbl) == len(TEST_DATA), len(self.tbl) assert self.tbl.delete() is True, 'should return non zero' assert len(self.tbl) == 0, len(self.tbl) def test_repr(self): assert repr(self.tbl) == '