2014-07-25 12:15:08 +02:00
|
|
|
# coding: utf8
|
|
|
|
|
from __future__ import unicode_literals
|
|
|
|
|
import os
|
|
|
|
|
from csv import reader
|
|
|
|
|
import unittest
|
|
|
|
|
from tempfile import mkdtemp
|
|
|
|
|
from shutil import rmtree
|
|
|
|
|
|
|
|
|
|
from six import PY3, text_type, binary_type
|
|
|
|
|
|
|
|
|
|
from dataset import connect
|
2015-05-21 16:47:36 +02:00
|
|
|
from dataset.freeze.app import freeze
|
2014-09-03 16:39:53 +02:00
|
|
|
from dataset.freeze.format.fcsv import value_to_str
|
2014-07-25 12:15:08 +02:00
|
|
|
|
2015-06-08 10:54:46 +02:00
|
|
|
from .sample_data import TEST_DATA
|
2014-07-25 12:15:08 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class FreezeTestCase(unittest.TestCase):
|
|
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
|
self.db = connect('sqlite://')
|
|
|
|
|
self.tbl = self.db['weather']
|
|
|
|
|
for row in TEST_DATA:
|
|
|
|
|
self.tbl.insert(row)
|
|
|
|
|
self.d = mkdtemp()
|
|
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
|
rmtree(self.d, ignore_errors=True)
|
|
|
|
|
|
|
|
|
|
def test_freeze(self):
|
2015-05-21 16:47:36 +02:00
|
|
|
freeze(self.tbl.all(), format='csv',
|
2016-02-14 11:05:46 +01:00
|
|
|
filename=u'wäther.csv'.encode('utf8'), prefix=self.d)
|
|
|
|
|
self.assertTrue(os.path.exists(os.path.join(self.d, u'wäther.csv')))
|
2015-05-21 16:47:36 +02:00
|
|
|
freeze(self.tbl.all(), format='csv',
|
2016-02-14 11:05:46 +01:00
|
|
|
filename=u'wäther.csv', prefix=self.d)
|
|
|
|
|
self.assertTrue(os.path.exists(os.path.join(self.d, u'wäther.csv')))
|
2014-07-25 12:15:08 +02:00
|
|
|
|
|
|
|
|
def test_freeze_csv(self):
|
2015-05-21 16:47:36 +02:00
|
|
|
freeze(self.tbl.all(), format='csv',
|
|
|
|
|
filename='weather.csv', prefix=self.d)
|
2014-07-25 12:15:08 +02:00
|
|
|
path = os.path.join(self.d, 'weather.csv')
|
|
|
|
|
if PY3:
|
|
|
|
|
fh = open(path, 'rt', encoding='utf8', newline='')
|
|
|
|
|
else:
|
|
|
|
|
fh = open(path, 'rU')
|
2015-01-06 21:57:32 +01:00
|
|
|
try:
|
|
|
|
|
rows = list(reader(fh))
|
|
|
|
|
keys = rows[0]
|
|
|
|
|
for i, d1 in enumerate(TEST_DATA):
|
|
|
|
|
d2 = dict(zip(keys, rows[i + 1]))
|
|
|
|
|
for k in d1.keys():
|
|
|
|
|
v2 = d2[k]
|
|
|
|
|
if not PY3:
|
|
|
|
|
v2 = v2.decode('utf8')
|
|
|
|
|
v1 = value_to_str(d1[k])
|
|
|
|
|
if not isinstance(v1, text_type):
|
|
|
|
|
if isinstance(v1, binary_type):
|
|
|
|
|
v1 = text_type(v1, encoding='utf8')
|
|
|
|
|
else:
|
|
|
|
|
v1 = '%s' % v1
|
|
|
|
|
self.assertEqual(v2, v1)
|
|
|
|
|
finally:
|
|
|
|
|
fh.close()
|
2014-11-06 14:09:52 +01:00
|
|
|
|
2016-01-18 05:11:24 +01:00
|
|
|
def test_memory_streams(self):
|
2016-01-18 10:56:35 +01:00
|
|
|
if PY3:
|
|
|
|
|
from io import StringIO
|
|
|
|
|
else:
|
|
|
|
|
from io import BytesIO as StringIO
|
2016-01-18 05:11:24 +01:00
|
|
|
|
|
|
|
|
for fmt in ('csv', 'json', 'tabson'):
|
2016-01-18 10:56:35 +01:00
|
|
|
with StringIO() as fd:
|
2016-01-18 05:11:24 +01:00
|
|
|
freeze(self.tbl.all(), format=fmt, fileobj=fd)
|
|
|
|
|
self.assertFalse(fd.closed, 'fileobj was closed for format %s' % fmt)
|
2016-01-18 10:56:35 +01:00
|
|
|
fd.getvalue() # should not throw
|
2016-01-18 05:11:24 +01:00
|
|
|
|
2016-01-18 05:55:03 +01:00
|
|
|
def test_freeze_json_no_wrap(self):
|
|
|
|
|
freeze(self.tbl.all(), format='json',
|
|
|
|
|
filename='weather.csv', prefix=self.d, wrap=False)
|
|
|
|
|
path = os.path.join(self.d, 'weather.csv')
|
|
|
|
|
if PY3:
|
|
|
|
|
fh = open(path, 'rt', encoding='utf8', newline='')
|
|
|
|
|
else:
|
|
|
|
|
fh = open(path, 'rU')
|
|
|
|
|
try:
|
|
|
|
|
import json
|
|
|
|
|
data = json.load(fh)
|
|
|
|
|
self.assertIsInstance(data, list,
|
|
|
|
|
'Without wrapping, returned JSON should be a list')
|
|
|
|
|
finally:
|
|
|
|
|
fh.close()
|
|
|
|
|
|
|
|
|
|
def test_freeze_json_wrap(self):
|
|
|
|
|
freeze(self.tbl.all(), format='json',
|
|
|
|
|
filename='weather.csv', prefix=self.d, wrap=True)
|
|
|
|
|
path = os.path.join(self.d, 'weather.csv')
|
|
|
|
|
if PY3:
|
|
|
|
|
fh = open(path, 'rt', encoding='utf8', newline='')
|
|
|
|
|
else:
|
|
|
|
|
fh = open(path, 'rU')
|
|
|
|
|
try:
|
|
|
|
|
import json
|
|
|
|
|
data = json.load(fh)
|
|
|
|
|
self.assertIsInstance(data, dict,
|
|
|
|
|
'With wrapping, returned JSON should be a dict')
|
|
|
|
|
self.assertIn('results', data.keys())
|
|
|
|
|
self.assertIn('count', data.keys())
|
|
|
|
|
self.assertIn('meta', data.keys())
|
|
|
|
|
finally:
|
|
|
|
|
fh.close()
|
|
|
|
|
|
2014-11-06 14:09:52 +01:00
|
|
|
|
|
|
|
|
class SerializerTestCase(unittest.TestCase):
|
2015-05-21 16:47:36 +02:00
|
|
|
|
2015-05-23 16:15:17 +02:00
|
|
|
def test_serializer(self):
|
2014-11-06 14:09:52 +01:00
|
|
|
from dataset.freeze.format.common import Serializer
|
|
|
|
|
from dataset.freeze.config import Export
|
|
|
|
|
from dataset.util import FreezeException
|
|
|
|
|
|
|
|
|
|
self.assertRaises(FreezeException, Serializer, {}, {})
|
|
|
|
|
s = Serializer(Export({'filename': 'f'}, {'mode': 'nomode'}), '')
|
|
|
|
|
self.assertRaises(FreezeException, getattr, s, 'wrap')
|
|
|
|
|
s = Serializer(Export({'filename': 'f'}, {}), '')
|
|
|
|
|
s.wrap
|
|
|
|
|
s = Serializer(Export({'filename': '-'}, {}), '')
|
2015-01-06 21:57:32 +01:00
|
|
|
self.assertTrue(s.fileobj)
|
2015-05-21 16:47:36 +02:00
|
|
|
|
2014-09-03 16:39:53 +02:00
|
|
|
def test_value_to_str1(self):
|
2015-05-21 16:47:36 +02:00
|
|
|
assert '2011-01-01T00:00:00' == value_to_str(TEST_DATA[0]['date']), \
|
|
|
|
|
value_to_str(TEST_DATA[0]['date'])
|
2014-09-03 16:39:53 +02:00
|
|
|
|
|
|
|
|
def test_value_to_str2(self):
|
2014-09-04 06:02:27 +02:00
|
|
|
if PY3:
|
|
|
|
|
assert 'hóla' == value_to_str('\u0068\u00f3\u006c\u0061')
|
|
|
|
|
else:
|
2015-05-21 16:47:36 +02:00
|
|
|
assert u'hóla'.encode('utf-8') == value_to_str(u'\u0068\u00f3\u006c\u0061'), \
|
|
|
|
|
[value_to_str(u'\u0068\u00f3\u006c\u0061')]
|
2014-09-03 16:39:53 +02:00
|
|
|
|
|
|
|
|
def test_value_to_str3(self):
|
|
|
|
|
assert '' == value_to_str(None)
|
2014-09-04 08:15:53 +02:00
|
|
|
|
|
|
|
|
def test_value_to_str4(self):
|
|
|
|
|
assert [] == value_to_str([])
|
2015-06-08 10:04:30 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
unittest.main()
|