From a14f0bbde282855793ef9582c31f43e0213d7e9a Mon Sep 17 00:00:00 2001 From: xrotwang Date: Fri, 25 Jul 2014 12:15:08 +0200 Subject: [PATCH] ported freezing as csv to python 3.4 --- dataset/freeze/format/common.py | 15 +++---- dataset/freeze/format/fcsv.py | 28 ++++++++++--- setup.py | 1 + test/test_freeze.py | 71 +++++++++++++++++++++++++++++++++ 4 files changed, 102 insertions(+), 13 deletions(-) create mode 100644 test/test_freeze.py diff --git a/dataset/freeze/format/common.py b/dataset/freeze/format/common.py index f28f703..ee00369 100644 --- a/dataset/freeze/format/common.py +++ b/dataset/freeze/format/common.py @@ -3,10 +3,7 @@ import re import sys import locale -try: - str = unicode -except NameError: - pass +from six import binary_type, text_type from dataset.util import FreezeException from slugify import slugify @@ -16,7 +13,7 @@ TMPL_KEY = re.compile("{{([^}]*)}}") OPERATIONS = { 'identity': lambda x: x, - 'lower': lambda x: str(x).lower(), + 'lower': lambda x: text_type(x).lower(), 'slug': slugify } @@ -24,6 +21,7 @@ OPERATIONS = { class Serializer(object): def __init__(self, export, query): + self._encoding = locale.getpreferredencoding() self.export = export self.query = query self._paths = [] @@ -35,10 +33,14 @@ class Serializer(object): def _get_basepath(self): prefix = self.export.get('prefix', '') + if isinstance(prefix, binary_type): + prefix = text_type(prefix, encoding=self._encoding) prefix = os.path.abspath(prefix) prefix = os.path.realpath(prefix) self._prefix = prefix filename = self.export.get('filename') + if isinstance(filename, binary_type): + filename = text_type(filename, encoding=self._encoding) if filename is None: raise FreezeException("No 'filename' is specified") self._basepath = os.path.join(prefix, filename) @@ -50,8 +52,7 @@ class Serializer(object): op, key = key.split(':', 1) return str(OPERATIONS.get(op)(data.get(key, ''))) path = TMPL_KEY.sub(repl, self._basepath) - enc = locale.getpreferredencoding() - return os.path.realpath(path.encode(enc, 'replace')) + return os.path.realpath(path) def file_name(self, row): # signal that there is a fileobj available: diff --git a/dataset/freeze/format/fcsv.py b/dataset/freeze/format/fcsv.py index 1d75ee4..641ec38 100644 --- a/dataset/freeze/format/fcsv.py +++ b/dataset/freeze/format/fcsv.py @@ -1,13 +1,23 @@ +from __future__ import unicode_literals import csv -from datetime import datetime +from datetime import datetime, date + +from six import PY3, binary_type, text_type +from sqlalchemy import DateTime from dataset.freeze.format.common import Serializer def value_to_str(value): - if isinstance(value, datetime): - return value.isoformat() - if hasattr(value, 'encode'): + if isinstance(value, (datetime, date)): + # + # FIXME: this check does not work for values returned from a db query! + # As a workaround, we make sure, the isoformat call returns the regular + # str representation. + # + sep = ' ' if PY3 else str(' ') + return text_type(value.isoformat(sep=sep)) + if not PY3 and hasattr(value, 'encode'): return value.encode('utf-8') if value is None: return '' @@ -25,12 +35,18 @@ class CSVSerializer(Serializer): # handle fileobj that has been passed in: if path is not None: - fh = open(path, 'wb') + if PY3: + fh = open(path, 'wt', encoding='utf8', newline='') + else: + fh = open(path, 'wb') else: fh = self.fileobj writer = csv.writer(fh) - writer.writerow([k.encode('utf-8') for k in keys]) + if PY3: + writer.writerow(keys) + else: + writer.writerow([k.encode('utf-8') for k in keys]) self.handles[path] = (writer, fh) writer, fh = self.handles[path] values = [value_to_str(result.get(k)) for k in keys] diff --git a/setup.py b/setup.py index eb0c54b..a6f61ee 100644 --- a/setup.py +++ b/setup.py @@ -30,6 +30,7 @@ setup( include_package_data=False, zip_safe=False, install_requires=[ + 'six', 'sqlalchemy >= 0.9.1', 'alembic >= 0.6.2', 'python-slugify >= 0.0.6', diff --git a/test/test_freeze.py b/test/test_freeze.py new file mode 100644 index 0000000..eec0d9e --- /dev/null +++ b/test/test_freeze.py @@ -0,0 +1,71 @@ +# coding: utf8 +from __future__ import unicode_literals +import os +from csv import reader +import unittest +from datetime import datetime +from tempfile import mkdtemp +from shutil import rmtree + +try: + from collections import OrderedDict +except ImportError: + from ordereddict import OrderedDict # Python < 2.7 drop-in + +from six import PY3, text_type, binary_type +from sqlalchemy.exc import IntegrityError, SQLAlchemyError + +from dataset import connect +from dataset.util import DatasetException + +from .sample_data import TEST_DATA, TEST_CITY_1 + + +class FreezeTestCase(unittest.TestCase): + + def setUp(self): + self.db = connect('sqlite://') + self.tbl = self.db['weather'] + for row in TEST_DATA: + self.tbl.insert(row) + self.d = mkdtemp() + + def tearDown(self): + rmtree(self.d, ignore_errors=True) + + def test_freeze(self): + from dataset.freeze.app import freeze + + freeze(self.db['weather'].all(), format='csv', filename='wäther.csv'.encode('utf8'), prefix=self.d) + self.assert_(os.path.exists(os.path.join(self.d, 'wäther.csv'))) + freeze(self.db['weather'].all(), format='csv', filename='wäther.csv', prefix=self.d) + self.assert_(os.path.exists(os.path.join(self.d, 'wäther.csv'))) + + def test_freeze_csv(self): + from dataset.freeze.app import freeze + from dataset.freeze.format.fcsv import value_to_str + + freeze(self.db['weather'].all(), format='csv', filename='weather.csv', prefix=self.d) + path = os.path.join(self.d, 'weather.csv') + if PY3: + fh = open(path, 'rt', encoding='utf8', newline='') + else: + fh = open(path, 'rU') + rows = list(reader(fh)) + keys = rows[0] + if not PY3: + keys = [k.decode('utf8') for k in keys] + for i, d1 in enumerate(TEST_DATA): + d2 = dict(zip(keys, rows[i + 1])) + for k in d1.keys(): + v2 = d2[k] + if not PY3: + v2 = v2.decode('utf8') + v1 = value_to_str(d1[k]) + if not isinstance(v1, text_type): + if isinstance(v1, binary_type): + v1 = text_type(v1, encoding='utf8') + else: + v1 = '%s' % v1 + self.assertEqual(v2, v1) +