Adapt data freeze code base to integrate with dataset.persistence

This commit is contained in:
Friedrich Lindenberg 2013-04-04 23:53:37 +02:00
parent c2d606bc6f
commit 5cfa9abfb0
10 changed files with 74 additions and 75 deletions

View File

@ -5,7 +5,7 @@ warnings.filterwarnings(
from dataset.persistence.database import Database from dataset.persistence.database import Database
from dataset.persistence.table import Table from dataset.persistence.table import Table
from dataset.freeze.app import freeze
def connect(url): def connect(url):
""" """
@ -18,3 +18,5 @@ def connect(url):
.. _SQLAlchemy Engine URL: http://docs.sqlalchemy.org/en/latest/core/engines.html#sqlalchemy.create_engine .. _SQLAlchemy Engine URL: http://docs.sqlalchemy.org/en/latest/core/engines.html#sqlalchemy.create_engine
""" """
return Database(url) return Database(url)

View File

@ -1,9 +1,10 @@
import logging import logging
import argparse import argparse
from sqlalchemy.exc import ProgrammingError
from dataset.util import FreezeException from dataset.util import FreezeException
from dataset.freeze.config import Configuration from dataset.persistence.database import Database
from dataset.freeze.engine import ExportEngine from dataset.freeze.config import Configuration, Export
from dataset.freeze.format import get_serializer from dataset.freeze.format import get_serializer
@ -16,6 +17,37 @@ parser = argparse.ArgumentParser(
parser.add_argument('config', metavar='CONFIG', type=str, parser.add_argument('config', metavar='CONFIG', type=str,
help='freeze file cofiguration') help='freeze file cofiguration')
def freeze(database, query, format='csv', filename='freeze.csv',
prefix='.', meta={}, indent=2, mode='list', wrap=True, **kw):
"""
Perform a data export of a given SQL statement. This is a very
flexible exporter, allowing for various output formats, metadata
assignment, and file name templating to dump each record (or a set
of records) into individual files.
"""
kw.update({
'database': database,
'query': query,
'format': format,
'filename': filename,
'prefix': prefix,
'meta': meta,
'indent': indent,
'mode': mode,
'wrap': wrap
})
return freeze_export(Export(kw))
def freeze_export(export):
try:
database = Database(export.get('database'))
query = database.query(export.get('query'))
serializer_cls = get_serializer(export)
serializer = serializer_cls(export, query)
serializer.serialize()
except ProgrammingError, pe:
raise FreezeException("Invalid query: %s" % pe)
def main(): def main():
try: try:
args = parser.parse_args() args = parser.parse_args()
@ -25,13 +57,10 @@ def main():
log.info("Skipping: %s", export.name) log.info("Skipping: %s", export.name)
continue continue
log.info("Running: %s", export.name) log.info("Running: %s", export.name)
engine = ExportEngine(export) freeze_export(export)
query = engine.query()
serializer_cls = get_serializer(export)
serializer = serializer_cls(engine)
serializer.serialize()
except FreezeException, fe: except FreezeException, fe:
log.error(fe) log.error(fe)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -1,42 +0,0 @@
from sqlalchemy import create_engine
from sqlalchemy.exc import ProgrammingError
from dataset.util import FreezeException
class Query(object):
def __init__(self, query, rp):
self.query = query
self.rp = rp
def __len__(self):
return self.rp.rowcount
def __iter__(self):
keys = self.rp.keys()
while True:
row = self.rp.fetchone()
if row is None:
return
yield dict(zip(keys, row))
class ExportEngine(object):
def __init__(self, config):
self.config = config
@property
def engine(self):
if not hasattr(self, '_engine'):
self._engine = create_engine(self.config.get('database'))
return self._engine
def query(self):
try:
q = self.config.get('query')
rp = self.engine.execute(q)
return Query(q, rp)
except ProgrammingError, pe:
raise FreezeException("Invalid query: %s - %s" % (q, pe))

View File

@ -17,18 +17,18 @@ OPERATIONS = {
class Serializer(object): class Serializer(object):
def __init__(self, engine): def __init__(self, export, query):
self.engine = engine self.export = export
self.config = engine.config self.query = query
self._paths = [] self._paths = []
self._get_basepath() self._get_basepath()
def _get_basepath(self): def _get_basepath(self):
prefix = self.config.get('prefix') prefix = self.export.get('prefix')
prefix = os.path.abspath(prefix) prefix = os.path.abspath(prefix)
prefix = os.path.realpath(prefix) prefix = os.path.realpath(prefix)
self._prefix = prefix self._prefix = prefix
filename = self.config.get('filename') filename = self.export.get('filename')
if filename is None: if filename is None:
raise FreezeException("No 'filename' is specified") raise FreezeException("No 'filename' is specified")
self._basepath = os.path.join(prefix, filename) self._basepath = os.path.join(prefix, filename)
@ -56,20 +56,19 @@ class Serializer(object):
@property @property
def mode(self): def mode(self):
mode = self.config.get_normalized('mode', 'list') mode = self.export.get_normalized('mode', 'list')
if mode not in ['list', 'item']: if mode not in ['list', 'item']:
raise FreezeException("Invalid mode: %s" % mode) raise FreezeException("Invalid mode: %s" % mode)
return mode return mode
@property @property
def wrap(self): def wrap(self):
return self.config.get_bool('wrap', return self.export.get_bool('wrap',
default=self.mode=='list') default=self.mode=='list')
def serialize(self): def serialize(self):
self.init() self.init()
query = self.engine.query() for row in self.query:
for row in query:
self.write(self.file_name(row), row) self.write(self.file_name(row), row)
self.close() self.close()

View File

@ -21,15 +21,14 @@ class JSONSerializer(Serializer):
self.buckets[path].append(result) self.buckets[path].append(result)
def wrap(self, result): def wrap(self, result):
count = len(result)
if self.mode == 'item': if self.mode == 'item':
result = result[0] result = result[0]
if self.wrap: if self.wrap:
result = { result = {
'count': count, 'count': self.query.count,
'results': result 'results': result
} }
meta = self.config.get('meta') meta = self.export.get('meta', {})
if meta is not None: if meta is not None:
result['meta'] = meta result['meta'] = meta
return result return result
@ -40,6 +39,6 @@ class JSONSerializer(Serializer):
fh = open(path, 'wb') fh = open(path, 'wb')
json.dump(result, fh, json.dump(result, fh,
cls=JSONEncoder, cls=JSONEncoder,
indent=self.config.get_int('indent')) indent=self.export.get_int('indent'))
fh.close() fh.close()

View File

@ -13,11 +13,11 @@ class TabsonSerializer(JSONSerializer):
d = [row.get(k) for k in keys] d = [row.get(k) for k in keys]
data.append(d) data.append(d)
result = { result = {
'count': len(result), 'count': self.query.count,
'fields': fields, 'fields': fields,
'data': data 'data': data
} }
meta = self.config.get('meta') meta = self.export.get('meta', {})
if meta is not None: if meta is not None:
result['meta'] = meta result['meta'] = meta
return result return result

View File

@ -9,7 +9,7 @@ from sqlalchemy.schema import Table as SQLATable
from sqlalchemy import Integer from sqlalchemy import Integer
from dataset.persistence.table import Table from dataset.persistence.table import Table
from dataset.persistence.util import resultiter from dataset.persistence.util import ResultIter
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -37,7 +37,8 @@ class Database(object):
>>> print db.tables >>> print db.tables
set([u'user', u'action']) set([u'user', u'action'])
""" """
return set(self.metadata.tables.keys() + self._tables.keys()) return list(set(self.metadata.tables.keys() +
self._tables.keys()))
def create_table(self, table_name): def create_table(self, table_name):
""" """
@ -111,7 +112,8 @@ class Database(object):
for row in res: for row in res:
print row['user'], row['c'] print row['user'], row['c']
""" """
return resultiter(self.engine.execute(query)) return ResultIter(self.engine.execute(query))
def __repr__(self): def __repr__(self):
return '<Database(%s)>' % self.url return '<Database(%s)>' % self.url

View File

@ -319,3 +319,4 @@ class Table(object):
""" """
for row in self.all(): for row in self.all():
yield row yield row

View File

@ -15,15 +15,22 @@ def guess_type(sample):
return UnicodeText return UnicodeText
def resultiter(rp): class ResultIter(object):
""" SQLAlchemy ResultProxies are not iterable to get a """ SQLAlchemy ResultProxies are not iterable to get a
list of dictionaries. This is to wrap them. """ list of dictionaries. This is to wrap them. """
keys = rp.keys()
while True: def __init__(self, rp):
row = rp.fetchone() self.rp = rp
self.count = rp.rowcount
self.keys = self.rp.keys()
def next(self):
row = self.rp.fetchone()
if row is None: if row is None:
break raise StopIteration
yield dict(zip(keys, row)) return dict(zip(self.keys, row))
def __iter__(self):
return self

View File

@ -4,6 +4,8 @@ API documentation
.. autofunction:: dataset.connect .. autofunction:: dataset.connect
.. autofunction:: dataset.freeze
Database Database
-------- --------