Use fetchmany to prevent huge tables from being loaded into memory at once.

This commit is contained in:
Di Wu 2015-12-13 20:51:14 +08:00
parent 93162ac6fc
commit 6f4f0313a8
2 changed files with 14 additions and 28 deletions

View File

@ -343,10 +343,6 @@ class Table(object):
# return all rows sorted by multiple columns (by year in descending order) # return all rows sorted by multiple columns (by year in descending order)
results = table.find(order_by=['country', '-year']) results = table.find(order_by=['country', '-year'])
By default :py:meth:`find() <dataset.Table.find>` will break the
query into chunks of ``_step`` rows to prevent huge tables
from being loaded into memory at once.
For more complex queries, please use :py:meth:`db.query() <dataset.Database.query>` For more complex queries, please use :py:meth:`db.query() <dataset.Database.query>`
instead.""" instead."""
self._check_dropped() self._check_dropped()
@ -371,21 +367,10 @@ class Table(object):
if _step is None or _step is False or _step == 0: if _step is None or _step is False or _step == 0:
_step = total_row_count _step = total_row_count
if total_row_count > _step and not order_by: query = self.table.select(whereclause=args, limit=_limit,
_step = total_row_count offset=_offset, order_by=order_by)
log.warn("query cannot be broken into smaller sections because it is unordered") return ResultIter(self.database.executable.execute(query),
row_type=self.database.row_type, step=_step)
queries = []
for i in count():
qoffset = _offset + (_step * i)
qlimit = min(_limit - (_step * i), _step)
if qlimit <= 0:
break
queries.append(self.table.select(whereclause=args, limit=qlimit,
offset=qoffset, order_by=order_by))
return ResultIter((self.database.executable.execute(q) for q in queries),
row_type=self.database.row_type)
def count(self, **_filter): def count(self, **_filter):
""" """

View File

@ -43,25 +43,26 @@ class ResultIter(object):
""" SQLAlchemy ResultProxies are not iterable to get a """ SQLAlchemy ResultProxies are not iterable to get a
list of dictionaries. This is to wrap them. """ list of dictionaries. This is to wrap them. """
def __init__(self, result_proxies, row_type=row_type): def __init__(self, result_proxy, row_type=row_type, step=None):
self.result_proxy = result_proxy
self.row_type = row_type self.row_type = row_type
if not isgenerator(result_proxies): self.step = step
result_proxies = iter((result_proxies, )) self.keys = list(result_proxy.keys())
self.result_proxies = result_proxies
self._iter = None self._iter = None
def _next_rp(self): def _next_chunk(self):
try: try:
rp = next(self.result_proxies) if not self.step:
self.keys = list(rp.keys()) self._iter = iter(self.result_proxy.fetchall())
self._iter = iter(rp.fetchall()) else:
self._iter = iter(self.result_proxy.fetchmany(self.step))
return True return True
except StopIteration: except StopIteration:
return False return False
def __next__(self): def __next__(self):
if self._iter is None: if self._iter is None:
if not self._next_rp(): if not self._next_chunk():
raise StopIteration raise StopIteration
try: try:
return convert_row(self.row_type, next(self._iter)) return convert_row(self.row_type, next(self._iter))