Make database asnyc.
This commit is contained in:
parent
4854d40508
commit
097889ba3f
@ -1,6 +1,6 @@
|
||||
DEFAULT_LIMIT = 30
|
||||
import typing
|
||||
|
||||
import asyncio
|
||||
from snek.system.model import BaseModel
|
||||
|
||||
|
||||
@ -12,13 +12,24 @@ class BaseMapper:
|
||||
|
||||
def __init__(self, app):
|
||||
self.app = app
|
||||
|
||||
|
||||
self.default_limit = self.__class__.default_limit
|
||||
|
||||
@property
|
||||
def db(self):
|
||||
return self.app.db
|
||||
|
||||
@property
|
||||
def loop(self):
|
||||
return asyncio.get_event_loop()
|
||||
|
||||
async def run_in_executor(self, func, *args, **kwargs):
|
||||
def call():
|
||||
return func(*args, **kwargs)
|
||||
return await self.loop.run_in_executor(None, call)
|
||||
|
||||
|
||||
|
||||
async def new(self):
|
||||
return self.model_class(mapper=self, app=self.app)
|
||||
|
||||
@ -29,7 +40,8 @@ class BaseMapper:
|
||||
async def get(self, uid: str = None, **kwargs) -> BaseModel:
|
||||
if uid:
|
||||
kwargs["uid"] = uid
|
||||
record = self.table.find_one(**kwargs)
|
||||
|
||||
record = await self.run_in_executor(self.table.find_one,**kwargs)
|
||||
if not record:
|
||||
return None
|
||||
record = dict(record)
|
||||
@ -40,31 +52,31 @@ class BaseMapper:
|
||||
return await self.model_class.from_record(mapper=self, record=record)
|
||||
|
||||
async def exists(self, **kwargs):
|
||||
return self.table.exists(**kwargs)
|
||||
return await self.run_in_executor(self.table.exists,**kwargs)
|
||||
|
||||
async def count(self, **kwargs) -> int:
|
||||
return self.table.count(**kwargs)
|
||||
return await self.run_in_executor(self.table.count, **kwargs)
|
||||
|
||||
async def save(self, model: BaseModel) -> bool:
|
||||
if not model.record.get("uid"):
|
||||
raise Exception(f"Attempt to save without uid: {model.record}.")
|
||||
model.updated_at.update()
|
||||
return self.table.upsert(model.record, ["uid"])
|
||||
return await self.run_in_executor(self.table.upsert, model.record, ["uid"])
|
||||
|
||||
async def find(self, **kwargs) -> typing.AsyncGenerator:
|
||||
if not kwargs.get("_limit"):
|
||||
kwargs["_limit"] = self.default_limit
|
||||
for record in self.table.find(**kwargs):
|
||||
for record in await self.run_in_executor(self.table.find, **kwargs):
|
||||
model = await self.new()
|
||||
for key, value in record.items():
|
||||
model[key] = value
|
||||
yield model
|
||||
|
||||
async def query(self, sql, *args):
|
||||
for record in self.db.query(sql, *args):
|
||||
for record in await self.run_in_executor(self.db.query,sql, *args):
|
||||
yield dict(record)
|
||||
|
||||
async def delete(self, **kwargs) -> int:
|
||||
if not kwargs or not isinstance(kwargs, dict):
|
||||
raise Exception("Can't execute delete with no filter.")
|
||||
return self.table.delete(**kwargs)
|
||||
return await self.run_in_executor(self.table.delete, **kwargs)
|
||||
|
Loading…
Reference in New Issue
Block a user