More documentation, getting ready to put this on
PyPi.
This commit is contained in:
parent
867008c016
commit
c27f8155bd
20
LICENSE.txt
Normal file
20
LICENSE.txt
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
Copyright (c) 2013, Open Knowledge Foundation, Friedrich Lindenberg
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
copy of this software and associated documentation files (the
|
||||||
|
"Software"), to deal in the Software without restriction, including
|
||||||
|
without limitation the rights to use, copy, modify, merge, publish,
|
||||||
|
distribute, sublicense, and/or sell copies of the Software, and to
|
||||||
|
permit persons to whom the Software is furnished to do so, subject to
|
||||||
|
the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included
|
||||||
|
in all copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||||
|
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
|
||||||
|
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
|
||||||
|
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
|
||||||
|
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||||
|
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
90
README.md
90
README.md
@ -2,62 +2,82 @@ SQLAlchemy Loading Tools
|
|||||||
========================
|
========================
|
||||||
|
|
||||||
A collection of wrappers and functions to make SQLAlchemy core easier
|
A collection of wrappers and functions to make SQLAlchemy core easier
|
||||||
to use in an ETL application. The package is used only for database
|
to use in ETL applications. SQLAlchemy is used only for database
|
||||||
abstraction and not as an ORM, allowing users to write extraction
|
abstraction and not as an ORM, allowing users to write extraction
|
||||||
scripts that can work with multiple database backends. Functions
|
scripts that can work with multiple database backends. Functions
|
||||||
include:
|
include:
|
||||||
|
|
||||||
* **Self-expanding schema**. If a column is written that does not
|
* **Automatic schema**. If a column is written that does not
|
||||||
exist on the table, it will be created automatically.
|
exist on the table, it will be created automatically.
|
||||||
* **Upserts**. Records are either created or updated, depdending on
|
* **Upserts**. Records are either created or updated, depdending on
|
||||||
whether an existing version can be found.
|
whether an existing version can be found.
|
||||||
* **Query helpers** for simple queries such as all rows in a table or
|
* **Query helpers** for simple queries such as all rows in a table or
|
||||||
all distinct values of a set of columns.
|
all distinct values across a set of columns.
|
||||||
|
|
||||||
|
|
||||||
Example
|
Examples
|
||||||
-------
|
--------
|
||||||
|
|
||||||
A typical use case for ``sqlaload`` may include code like this::
|
A typical use of ``sqlaload`` would look like this:
|
||||||
|
|
||||||
```python
|
from sqlaload import connect, get_table, distinct, update
|
||||||
from sqlaload import connect, get_table, distinct, update
|
|
||||||
|
|
||||||
engine = connect('sqlite:///customers.db')
|
engine = connect('sqlite:///customers.db')
|
||||||
table = get_table(engine, 'customers')
|
table = get_table(engine, 'customers')
|
||||||
for entry in distinct(engine, table, 'post_code', 'city')
|
for entry in distinct(engine, table, 'post_code', 'city')
|
||||||
lon, lat = geocode(entry['post_code'], entry['city'])
|
lon, lat = geocode(entry['post_code'], entry['city'])
|
||||||
update(entry, {'lon': lon, 'lat': lat})
|
update(entry, {'lon': lon, 'lat': lat})
|
||||||
```
|
|
||||||
|
|
||||||
In this example, we selected all distinct post codes and city names
|
In this example, we selected all distinct post codes and city names from an imaginary customers database, send them through our geocoding routine and finally updated all matching rows with the returned geo information.
|
||||||
from an imaginary customers database, sent them through our
|
|
||||||
geocoding routine and finally updated all matching rows with our
|
|
||||||
geo information.
|
|
||||||
|
|
||||||
Another example, updating data in a datastore, might look like
|
Another example, updating data in a datastore, might look like this:
|
||||||
this::
|
|
||||||
|
|
||||||
````python
|
from sqlaload import connect, get_table, upsert
|
||||||
from sqlaload import connect, get_table, upsert
|
|
||||||
|
|
||||||
engine = connect('sqlite:///things.db')
|
engine = connect('sqlite:///things.db')
|
||||||
table = get_table(engine, 'data')
|
table = get_table(engine, 'data')
|
||||||
|
|
||||||
|
for item in magic_data_source_that_produces_entries():
|
||||||
|
assert 'key1' in item
|
||||||
|
assert 'key2' in item
|
||||||
|
# this will either insert or update, depending on
|
||||||
|
# whether an entry with the matching values for
|
||||||
|
# 'key1' and 'key2' already exists:
|
||||||
|
upsert(engine, table, item, ['key1', 'key2'])
|
||||||
|
|
||||||
|
|
||||||
|
Functions
|
||||||
|
---------
|
||||||
|
|
||||||
|
The library currently exposes the following functions:
|
||||||
|
|
||||||
|
**Schema management**
|
||||||
|
|
||||||
|
* ``connect(url)``, connect to a database and return an ``engine``. See the [SQLAlchemy documentation](http://docs.sqlalchemy.org/en/rel_0_8/core/engines.html#database-urls) for information about URL schemes and formats.
|
||||||
|
* ``get_table(engine, table_name)`` will load a table configuration from the database, either reflecting the existing schema or creating a new table (with an ``id`` column).
|
||||||
|
* ``create_table(engine, table_name)`` and ``load_table(engine, table_name)`` are more explicit than ``get_table`` but allow the same functions.
|
||||||
|
* ``drop_table(engine, table_name)`` will remove an existing table, deleting all of its contents.
|
||||||
|
* ``create_column(engine, table, column_name, type)`` adds a new column to a table, ``type`` must be a SQLAlchemy type class.
|
||||||
|
* ``create_index(engine, table, columns)`` creates an index on the given table, based on a list of strings to specify the included ``columns``.
|
||||||
|
|
||||||
|
**Queries**
|
||||||
|
|
||||||
|
* ``find(engine, table, _limit=N, _offset=N, order_by='id', **kw)`` will retrieve database records. The query will return an iterator that only loads 5000 records at any one time, even if ``_limit`` and ``_offset`` are specified - meaning that ``find`` can be run on tables of arbitrary size. ``order_by`` is a string column name, always returned in ascending order. Finally ``**kw`` can be used to filter columns for equality, e.g. ``find(…, category=5)``.
|
||||||
|
* ``find_one(engine, table, **kw)``, like ``find`` but will only return the first matching row or ``None`` if no matches were found.
|
||||||
|
* ``distinct(engine, table, *columns, **kw)`` will return the combined distinct values for ``columns``. ``**kw`` allows filtering the same way it does in ``find``.
|
||||||
|
* ``all``, alias for ``find`` without filter options.
|
||||||
|
|
||||||
|
**Adding and updating data**
|
||||||
|
|
||||||
|
* ``add_row(engine, table, row, ensure=True, types={})`` add the values in the dictionary ``row`` to the given ``table``. ``ensure`` will check the schema and create the columns if necessary, their types can be specified using the ``types`` dictionary. If no ``types`` are given, the type will be guessed from the first submitted value of the column, defaulting to a text column.
|
||||||
|
* ``update_row(engine, table, row, unique, ensure=True, types={})`` will update a row or set of rows based on the data in the ``row`` dictionary and the column names specified in ``unique``. The remaining arguments are handled like those in ``add_row``.
|
||||||
|
* ``upsert(engine, table, row, unique, ensure=True, types={})`` will combine the semantics of ``update_row`` and ``add_row`` by first attempting to update existing data and otherwise (only if no record matching on the ``unique`` keys can be found) creating a new record.
|
||||||
|
* ``delete(engine, table, **kw)`` will remove records from a table. ``**kw`` is the same as in ``find`` and can be used to limit the set of records to be removed.
|
||||||
|
|
||||||
for item in magic_data_source_that_produces_entries():
|
|
||||||
assert 'key1' in item
|
|
||||||
assert 'key2' in item
|
|
||||||
# this will either insert or update, depending on
|
|
||||||
# whether an entry with the matching values for
|
|
||||||
# 'key1' and 'key2' already exists:
|
|
||||||
upsert(engine, table, item, ['key1', 'key2'])
|
|
||||||
```
|
|
||||||
|
|
||||||
Feedback
|
Feedback
|
||||||
--------
|
--------
|
||||||
|
|
||||||
Please feel free create issues on the GitHub bug tracker at:
|
Please feel free create issues on the GitHub tracker at [okfn/sqlaload](https://github.com/okfn/sqlaload/issues). For other discussions, join the [okfn-labs](http://lists.okfn.org/mailman/listinfo/okfn-labs) mailing list.
|
||||||
|
|
||||||
* https://github.com/okfn/sqlaload/issues
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
2
setup.py
2
setup.py
@ -2,7 +2,7 @@ from setuptools import setup, find_packages
|
|||||||
|
|
||||||
setup(
|
setup(
|
||||||
name='sqlaload',
|
name='sqlaload',
|
||||||
version='0.1',
|
version='0.2',
|
||||||
description="Utility functions for using SQLAlchemy in ETL.",
|
description="Utility functions for using SQLAlchemy in ETL.",
|
||||||
long_description='',
|
long_description='',
|
||||||
classifiers=[
|
classifiers=[
|
||||||
|
|||||||
@ -1,4 +1,3 @@
|
|||||||
|
|
||||||
from sqlaload.schema import connect
|
from sqlaload.schema import connect
|
||||||
from sqlaload.schema import create_table, load_table, get_table, drop_table
|
from sqlaload.schema import create_table, load_table, get_table, drop_table
|
||||||
from sqlaload.schema import create_column
|
from sqlaload.schema import create_column
|
||||||
@ -6,4 +5,6 @@ from sqlaload.write import add_row, update_row
|
|||||||
from sqlaload.write import upsert, update, delete
|
from sqlaload.write import upsert, update, delete
|
||||||
from sqlaload.query import distinct, resultiter, all, find_one, find, query
|
from sqlaload.query import distinct, resultiter, all, find_one, find, query
|
||||||
|
|
||||||
from sqlaload.util import dump_csv
|
# shut up useless SA warning:
|
||||||
|
import warnings
|
||||||
|
warnings.filterwarnings('ignore', 'Unicode type received non-unicode bind param value.')
|
||||||
|
|||||||
@ -2,7 +2,7 @@ import logging
|
|||||||
from itertools import count
|
from itertools import count
|
||||||
|
|
||||||
from sqlalchemy.sql import expression, and_
|
from sqlalchemy.sql import expression, and_
|
||||||
from sqlaload.schema import _ensure_columns
|
from sqlaload.schema import _ensure_columns, get_table
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -17,19 +17,17 @@ def resultiter(rp):
|
|||||||
yield dict(zip(keys, row))
|
yield dict(zip(keys, row))
|
||||||
|
|
||||||
def find_one(engine, table, **kw):
|
def find_one(engine, table, **kw):
|
||||||
|
table = get_table(engine, table)
|
||||||
res = list(find(engine, table, _limit=1, **kw))
|
res = list(find(engine, table, _limit=1, **kw))
|
||||||
if not len(res):
|
if not len(res):
|
||||||
return None
|
return None
|
||||||
return res[0]
|
return res[0]
|
||||||
|
|
||||||
def find(engine, table, _limit=None, _step=5000, _offset=0,
|
def find(engine, table, _limit=None, _step=5000, _offset=0,
|
||||||
order_by=None, **kw):
|
order_by='id', **kw):
|
||||||
|
table = get_table(engine, table)
|
||||||
_ensure_columns(engine, table, kw)
|
_ensure_columns(engine, table, kw)
|
||||||
|
order_by = [table.c[order_by].asc()]
|
||||||
if order_by is None:
|
|
||||||
order_by = [table.c.id.asc()]
|
|
||||||
else:
|
|
||||||
order_by = [table.c[order_by].asc()]
|
|
||||||
|
|
||||||
qargs = []
|
qargs = []
|
||||||
try:
|
try:
|
||||||
@ -59,7 +57,7 @@ def query(engine, query):
|
|||||||
yield res
|
yield res
|
||||||
|
|
||||||
def distinct(engine, table, *columns, **kw):
|
def distinct(engine, table, *columns, **kw):
|
||||||
|
table = get_table(engine, table)
|
||||||
qargs = []
|
qargs = []
|
||||||
try:
|
try:
|
||||||
columns = [table.c[c] for c in columns]
|
columns = [table.c[c] for c in columns]
|
||||||
|
|||||||
@ -46,6 +46,9 @@ def load_table(engine, table_name):
|
|||||||
return table
|
return table
|
||||||
|
|
||||||
def get_table(engine, table_name):
|
def get_table(engine, table_name):
|
||||||
|
if isinstance(table_name, Table):
|
||||||
|
return table_name
|
||||||
|
|
||||||
# Accept Connection objects here
|
# Accept Connection objects here
|
||||||
if hasattr(engine, 'engine'):
|
if hasattr(engine, 'engine'):
|
||||||
engine = engine.engine
|
engine = engine.engine
|
||||||
@ -102,12 +105,14 @@ def _args_to_clause(table, args):
|
|||||||
return and_(*clauses)
|
return and_(*clauses)
|
||||||
|
|
||||||
def create_column(engine, table, name, type):
|
def create_column(engine, table, name, type):
|
||||||
|
table = get_table(engine, table)
|
||||||
with lock:
|
with lock:
|
||||||
if name not in table.columns.keys():
|
if name not in table.columns.keys():
|
||||||
col = Column(name, type)
|
col = Column(name, type)
|
||||||
col.create(table, connection=engine)
|
col.create(table, connection=engine)
|
||||||
|
|
||||||
def create_index(engine, table, columns, name=None):
|
def create_index(engine, table, columns, name=None):
|
||||||
|
table = get_table(engine, table)
|
||||||
with lock:
|
with lock:
|
||||||
if not name:
|
if not name:
|
||||||
sig = abs(hash('||'.join(columns)))
|
sig = abs(hash('||'.join(columns)))
|
||||||
|
|||||||
@ -1,24 +0,0 @@
|
|||||||
import csv
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
def _convert_cell(v):
|
|
||||||
if isinstance(v, unicode):
|
|
||||||
return v.encode('utf-8')
|
|
||||||
elif isinstance(v, datetime):
|
|
||||||
return v.isoformat()
|
|
||||||
return v
|
|
||||||
|
|
||||||
def dump_csv(query_iter, fh):
|
|
||||||
writer, columns = None, None
|
|
||||||
for row in query_iter:
|
|
||||||
if writer is None:
|
|
||||||
writer = csv.writer(fh)
|
|
||||||
columns = row.keys()
|
|
||||||
writer.writerow(columns)
|
|
||||||
writer.writerow([_convert_cell(row.get(c)) \
|
|
||||||
for c in columns])
|
|
||||||
fh.close()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -1,7 +1,7 @@
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
from sqlaload.schema import _ensure_columns, _args_to_clause
|
from sqlaload.schema import _ensure_columns, _args_to_clause
|
||||||
from sqlaload.schema import create_index
|
from sqlaload.schema import create_index, get_table
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -9,6 +9,7 @@ def add_row(engine, table, row, ensure=True, types={}):
|
|||||||
""" Add a row (type: dict). If ``ensure`` is set, any of
|
""" Add a row (type: dict). If ``ensure`` is set, any of
|
||||||
the keys of the row are not table columns, they will be type
|
the keys of the row are not table columns, they will be type
|
||||||
guessed and created. """
|
guessed and created. """
|
||||||
|
table = get_table(engine, table)
|
||||||
if ensure:
|
if ensure:
|
||||||
_ensure_columns(engine, table, row, types=types)
|
_ensure_columns(engine, table, row, types=types)
|
||||||
engine.execute(table.insert(row))
|
engine.execute(table.insert(row))
|
||||||
@ -16,6 +17,7 @@ def add_row(engine, table, row, ensure=True, types={}):
|
|||||||
def update_row(engine, table, row, unique, ensure=True, types={}):
|
def update_row(engine, table, row, unique, ensure=True, types={}):
|
||||||
if not len(unique):
|
if not len(unique):
|
||||||
return False
|
return False
|
||||||
|
table = get_table(engine, table)
|
||||||
clause = dict([(u, row.get(u)) for u in unique])
|
clause = dict([(u, row.get(u)) for u in unique])
|
||||||
if ensure:
|
if ensure:
|
||||||
_ensure_columns(engine, table, row, types=types)
|
_ensure_columns(engine, table, row, types=types)
|
||||||
@ -28,6 +30,7 @@ def update_row(engine, table, row, unique, ensure=True, types={}):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def upsert(engine, table, row, unique, ensure=True, types={}):
|
def upsert(engine, table, row, unique, ensure=True, types={}):
|
||||||
|
table = get_table(engine, table)
|
||||||
if ensure:
|
if ensure:
|
||||||
create_index(engine, table, unique)
|
create_index(engine, table, unique)
|
||||||
|
|
||||||
@ -35,6 +38,7 @@ def upsert(engine, table, row, unique, ensure=True, types={}):
|
|||||||
add_row(engine, table, row, ensure=ensure, types=types)
|
add_row(engine, table, row, ensure=ensure, types=types)
|
||||||
|
|
||||||
def update(engine, table, criteria, values, ensure=True, types={}):
|
def update(engine, table, criteria, values, ensure=True, types={}):
|
||||||
|
table = get_table(engine, table)
|
||||||
if ensure:
|
if ensure:
|
||||||
_ensure_columns(engine, table, values, types=types)
|
_ensure_columns(engine, table, values, types=types)
|
||||||
q = table.update().values(values)
|
q = table.update().values(values)
|
||||||
@ -43,6 +47,7 @@ def update(engine, table, criteria, values, ensure=True, types={}):
|
|||||||
engine.execute(q)
|
engine.execute(q)
|
||||||
|
|
||||||
def delete(engine, table, **kw):
|
def delete(engine, table, **kw):
|
||||||
|
table = get_table(engine, table)
|
||||||
_ensure_columns(engine, table, kw)
|
_ensure_columns(engine, table, kw)
|
||||||
|
|
||||||
qargs = []
|
qargs = []
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user