ported freezing as csv to python 3.4

This commit is contained in:
xrotwang 2014-07-25 12:15:08 +02:00
parent 9a91f3d113
commit a14f0bbde2
4 changed files with 102 additions and 13 deletions

View File

@ -3,10 +3,7 @@ import re
import sys import sys
import locale import locale
try: from six import binary_type, text_type
str = unicode
except NameError:
pass
from dataset.util import FreezeException from dataset.util import FreezeException
from slugify import slugify from slugify import slugify
@ -16,7 +13,7 @@ TMPL_KEY = re.compile("{{([^}]*)}}")
OPERATIONS = { OPERATIONS = {
'identity': lambda x: x, 'identity': lambda x: x,
'lower': lambda x: str(x).lower(), 'lower': lambda x: text_type(x).lower(),
'slug': slugify 'slug': slugify
} }
@ -24,6 +21,7 @@ OPERATIONS = {
class Serializer(object): class Serializer(object):
def __init__(self, export, query): def __init__(self, export, query):
self._encoding = locale.getpreferredencoding()
self.export = export self.export = export
self.query = query self.query = query
self._paths = [] self._paths = []
@ -35,10 +33,14 @@ class Serializer(object):
def _get_basepath(self): def _get_basepath(self):
prefix = self.export.get('prefix', '') prefix = self.export.get('prefix', '')
if isinstance(prefix, binary_type):
prefix = text_type(prefix, encoding=self._encoding)
prefix = os.path.abspath(prefix) prefix = os.path.abspath(prefix)
prefix = os.path.realpath(prefix) prefix = os.path.realpath(prefix)
self._prefix = prefix self._prefix = prefix
filename = self.export.get('filename') filename = self.export.get('filename')
if isinstance(filename, binary_type):
filename = text_type(filename, encoding=self._encoding)
if filename is None: if filename is None:
raise FreezeException("No 'filename' is specified") raise FreezeException("No 'filename' is specified")
self._basepath = os.path.join(prefix, filename) self._basepath = os.path.join(prefix, filename)
@ -50,8 +52,7 @@ class Serializer(object):
op, key = key.split(':', 1) op, key = key.split(':', 1)
return str(OPERATIONS.get(op)(data.get(key, ''))) return str(OPERATIONS.get(op)(data.get(key, '')))
path = TMPL_KEY.sub(repl, self._basepath) path = TMPL_KEY.sub(repl, self._basepath)
enc = locale.getpreferredencoding() return os.path.realpath(path)
return os.path.realpath(path.encode(enc, 'replace'))
def file_name(self, row): def file_name(self, row):
# signal that there is a fileobj available: # signal that there is a fileobj available:

View File

@ -1,13 +1,23 @@
from __future__ import unicode_literals
import csv import csv
from datetime import datetime from datetime import datetime, date
from six import PY3, binary_type, text_type
from sqlalchemy import DateTime
from dataset.freeze.format.common import Serializer from dataset.freeze.format.common import Serializer
def value_to_str(value): def value_to_str(value):
if isinstance(value, datetime): if isinstance(value, (datetime, date)):
return value.isoformat() #
if hasattr(value, 'encode'): # FIXME: this check does not work for values returned from a db query!
# As a workaround, we make sure, the isoformat call returns the regular
# str representation.
#
sep = ' ' if PY3 else str(' ')
return text_type(value.isoformat(sep=sep))
if not PY3 and hasattr(value, 'encode'):
return value.encode('utf-8') return value.encode('utf-8')
if value is None: if value is None:
return '' return ''
@ -25,11 +35,17 @@ class CSVSerializer(Serializer):
# handle fileobj that has been passed in: # handle fileobj that has been passed in:
if path is not None: if path is not None:
if PY3:
fh = open(path, 'wt', encoding='utf8', newline='')
else:
fh = open(path, 'wb') fh = open(path, 'wb')
else: else:
fh = self.fileobj fh = self.fileobj
writer = csv.writer(fh) writer = csv.writer(fh)
if PY3:
writer.writerow(keys)
else:
writer.writerow([k.encode('utf-8') for k in keys]) writer.writerow([k.encode('utf-8') for k in keys])
self.handles[path] = (writer, fh) self.handles[path] = (writer, fh)
writer, fh = self.handles[path] writer, fh = self.handles[path]

View File

@ -30,6 +30,7 @@ setup(
include_package_data=False, include_package_data=False,
zip_safe=False, zip_safe=False,
install_requires=[ install_requires=[
'six',
'sqlalchemy >= 0.9.1', 'sqlalchemy >= 0.9.1',
'alembic >= 0.6.2', 'alembic >= 0.6.2',
'python-slugify >= 0.0.6', 'python-slugify >= 0.0.6',

71
test/test_freeze.py Normal file
View File

@ -0,0 +1,71 @@
# coding: utf8
from __future__ import unicode_literals
import os
from csv import reader
import unittest
from datetime import datetime
from tempfile import mkdtemp
from shutil import rmtree
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict # Python < 2.7 drop-in
from six import PY3, text_type, binary_type
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from dataset import connect
from dataset.util import DatasetException
from .sample_data import TEST_DATA, TEST_CITY_1
class FreezeTestCase(unittest.TestCase):
def setUp(self):
self.db = connect('sqlite://')
self.tbl = self.db['weather']
for row in TEST_DATA:
self.tbl.insert(row)
self.d = mkdtemp()
def tearDown(self):
rmtree(self.d, ignore_errors=True)
def test_freeze(self):
from dataset.freeze.app import freeze
freeze(self.db['weather'].all(), format='csv', filename='wäther.csv'.encode('utf8'), prefix=self.d)
self.assert_(os.path.exists(os.path.join(self.d, 'wäther.csv')))
freeze(self.db['weather'].all(), format='csv', filename='wäther.csv', prefix=self.d)
self.assert_(os.path.exists(os.path.join(self.d, 'wäther.csv')))
def test_freeze_csv(self):
from dataset.freeze.app import freeze
from dataset.freeze.format.fcsv import value_to_str
freeze(self.db['weather'].all(), format='csv', filename='weather.csv', prefix=self.d)
path = os.path.join(self.d, 'weather.csv')
if PY3:
fh = open(path, 'rt', encoding='utf8', newline='')
else:
fh = open(path, 'rU')
rows = list(reader(fh))
keys = rows[0]
if not PY3:
keys = [k.decode('utf8') for k in keys]
for i, d1 in enumerate(TEST_DATA):
d2 = dict(zip(keys, rows[i + 1]))
for k in d1.keys():
v2 = d2[k]
if not PY3:
v2 = v2.decode('utf8')
v1 = value_to_str(d1[k])
if not isinstance(v1, text_type):
if isinstance(v1, binary_type):
v1 = text_type(v1, encoding='utf8')
else:
v1 = '%s' % v1
self.assertEqual(v2, v1)