Merge pull request #144 from twds/master
Use fetchmany to prevent huge tables loaded at once
This commit is contained in:
commit
00b4efd024
@ -343,10 +343,6 @@ class Table(object):
|
||||
# return all rows sorted by multiple columns (by year in descending order)
|
||||
results = table.find(order_by=['country', '-year'])
|
||||
|
||||
By default :py:meth:`find() <dataset.Table.find>` will break the
|
||||
query into chunks of ``_step`` rows to prevent huge tables
|
||||
from being loaded into memory at once.
|
||||
|
||||
For more complex queries, please use :py:meth:`db.query() <dataset.Database.query>`
|
||||
instead."""
|
||||
self._check_dropped()
|
||||
@ -371,21 +367,10 @@ class Table(object):
|
||||
if _step is None or _step is False or _step == 0:
|
||||
_step = total_row_count
|
||||
|
||||
if total_row_count > _step and not order_by:
|
||||
_step = total_row_count
|
||||
log.warn("query cannot be broken into smaller sections because it is unordered")
|
||||
|
||||
queries = []
|
||||
|
||||
for i in count():
|
||||
qoffset = _offset + (_step * i)
|
||||
qlimit = min(_limit - (_step * i), _step)
|
||||
if qlimit <= 0:
|
||||
break
|
||||
queries.append(self.table.select(whereclause=args, limit=qlimit,
|
||||
offset=qoffset, order_by=order_by))
|
||||
return ResultIter((self.database.executable.execute(q) for q in queries),
|
||||
row_type=self.database.row_type)
|
||||
query = self.table.select(whereclause=args, limit=_limit,
|
||||
offset=_offset, order_by=order_by)
|
||||
return ResultIter(self.database.executable.execute(query),
|
||||
row_type=self.database.row_type, step=_step)
|
||||
|
||||
def count(self, **_filter):
|
||||
"""
|
||||
|
||||
@ -43,25 +43,26 @@ class ResultIter(object):
|
||||
""" SQLAlchemy ResultProxies are not iterable to get a
|
||||
list of dictionaries. This is to wrap them. """
|
||||
|
||||
def __init__(self, result_proxies, row_type=row_type):
|
||||
def __init__(self, result_proxy, row_type=row_type, step=None):
|
||||
self.result_proxy = result_proxy
|
||||
self.row_type = row_type
|
||||
if not isgenerator(result_proxies):
|
||||
result_proxies = iter((result_proxies, ))
|
||||
self.result_proxies = result_proxies
|
||||
self.step = step
|
||||
self.keys = list(result_proxy.keys())
|
||||
self._iter = None
|
||||
|
||||
def _next_rp(self):
|
||||
def _next_chunk(self):
|
||||
try:
|
||||
rp = next(self.result_proxies)
|
||||
self.keys = list(rp.keys())
|
||||
self._iter = iter(rp.fetchall())
|
||||
if not self.step:
|
||||
self._iter = iter(self.result_proxy.fetchall())
|
||||
else:
|
||||
self._iter = iter(self.result_proxy.fetchmany(self.step))
|
||||
return True
|
||||
except StopIteration:
|
||||
return False
|
||||
|
||||
def __next__(self):
|
||||
if self._iter is None:
|
||||
if not self._next_rp():
|
||||
if not self._next_chunk():
|
||||
raise StopIteration
|
||||
try:
|
||||
return convert_row(self.row_type, next(self._iter))
|
||||
|
||||
Loading…
Reference in New Issue
Block a user