Various fixes:

* Functions for finding one or multiple entries, with internal
  pagination.
* Limit engine pool size to 1
* Don't block connections.
This commit is contained in:
Friedrich Lindenberg 2012-01-04 00:15:10 +01:00
parent a4b816704b
commit 6bbe8a0486
3 changed files with 53 additions and 9 deletions

View File

@ -4,5 +4,5 @@ from sqlaload.schema import create_table, load_table, get_table
from sqlaload.schema import create_column
from sqlaload.write import add_row, update_row
from sqlaload.write import upsert, update
from sqlaload.query import distinct, resultiter, all
from sqlaload.query import distinct, resultiter, all, find_one, find

View File

@ -1,6 +1,7 @@
import logging
from itertools import count
from sqlalchemy.sql import expression
from sqlalchemy.sql import expression, and_
log = logging.getLogger(__name__)
@ -14,14 +15,47 @@ def resultiter(rp):
break
yield dict(zip(keys, row))
def find_one(engine, table, **kw):
res = list(find(engine, table, _limit=1, **kw))
if not len(res):
return None
return res[0]
def find(engine, table, _limit=None, _step=5000, _offset=0,
order_by=None, **kw):
if order_by is None:
order_by = [table.c.id.asc()]
qargs = []
try:
for col, val in kw.items():
qargs.append(table.c[col]==val)
except KeyError:
return
for i in count():
qoffset = _offset + (_step * i)
qlimit = _step
if _limit is not None:
qlimit = min(_limit-(_step*i), _step)
if qlimit <= 0:
break
q = table.select(whereclause=and_(*qargs), limit=qlimit,
offset=qoffset, order_by=order_by)
rows = list(resultiter(engine.execute(q)))
if not len(rows):
return
for row in rows:
yield row
def distinct(engine, table, *columns):
columns = [table.c[c] for c in columns]
q = expression.select(columns, distinct=True)
return resultiter(engine.execute(q))
return list(resultiter(engine.execute(q)))
def all(engine, table):
q = table.select()
return resultiter(engine.execute(q))
return find(engine, table)

View File

@ -1,17 +1,20 @@
import logging
from datetime import datetime
from collections import defaultdict
from sqlalchemy import create_engine
from sqlalchemy import Integer, UnicodeText, Float, DateTime
from sqlalchemy import Integer, UnicodeText, Float, DateTime, Boolean
from sqlalchemy.schema import Table, MetaData, Column
from sqlalchemy.sql import and_, expression
from migrate.versioning.util import construct_engine
log = logging.getLogger(__name__)
TABLES = defaultdict(dict)
def connect(url):
""" Create an engine for the given database URL. """
engine = create_engine(url)
engine = create_engine(url, pool_size=1)
engine = construct_engine(engine)
meta = MetaData()
meta.bind = engine
@ -24,20 +27,27 @@ def create_table(engine, table_name):
col = Column('id', Integer, primary_key=True)
table.append_column(col)
table.create(engine)
TABLES[engine][table_name] = table
return table
def load_table(engine, table_name):
log.debug("Loading table: %s on %r" % (table_name, engine))
return Table(table_name, engine._metadata, autoload=True)
table = Table(table_name, engine._metadata, autoload=True)
TABLES[engine][table_name] = table
return table
def get_table(engine, table_name):
if table_name in TABLES[engine]:
return TABLES[engine][table_name]
if engine.has_table(table_name):
return load_table(engine, table_name)
else:
return create_table(engine, table_name)
def _guess_type(sample):
if isinstance(sample, int):
if isinstance(sample, bool):
return Boolean
elif isinstance(sample, int):
return Integer
elif isinstance(sample, float):
return Float