SQLAlchemy Loading Tools
A collection of wrappers and functions to make SQLAlchemy core easier to use in ETL applications. SQLAlchemy is used only for database abstraction and not as an ORM, allowing users to write extraction scripts that can work with multiple database backends. Functions include:
- Automatic schema. If a column is written that does not exist on the table, it will be created automatically.
- Upserts. Records are either created or updated, depdending on whether an existing version can be found.
- Query helpers for simple queries such as all rows in a table or all distinct values across a set of columns.
Examples
A typical use of sqlaload would look like this:
from sqlaload import connect, get_table, distinct, update
engine = connect('sqlite:///customers.db')
table = get_table(engine, 'customers')
for entry in distinct(engine, table, 'post_code', 'city')
lon, lat = geocode(entry['post_code'], entry['city'])
update(entry, {'lon': lon, 'lat': lat})
In this example, we selected all distinct post codes and city names from an imaginary customers database, send them through our geocoding routine and finally updated all matching rows with the returned geo information.
Another example, updating data in a datastore, might look like this:
from sqlaload import connect, get_table, upsert
engine = connect('sqlite:///things.db')
table = get_table(engine, 'data')
for item in magic_data_source_that_produces_entries():
assert 'key1' in item
assert 'key2' in item
# this will either insert or update, depending on
# whether an entry with the matching values for
# 'key1' and 'key2' already exists:
upsert(engine, table, item, ['key1', 'key2'])
Here's the same example, but using the object-oriented API:
import sqlaload
db = sqlaload.create('sqlite:///things.db')
table = db.get_table('data')
for item in magic_data_source_that_produces_entries():
assert 'key1' in item
assert 'key2' in item
table.upsert(item, ['key1', 'key2'])
Functions
The library currently exposes the following functions:
Schema management
connect(url), connect to a database and return anengine. See the SQLAlchemy documentation for information about URL schemes and formats.get_table(engine, table_name)will load a table configuration from the database, either reflecting the existing schema or creating a new table (with anidcolumn).create_table(engine, table_name)andload_table(engine, table_name)are more explicit thanget_tablebut allow the same functions.drop_table(engine, table_name)will remove an existing table, deleting all of its contents.create_column(engine, table, column_name, type)adds a new column to a table,typemust be a SQLAlchemy type class.create_index(engine, table, columns)creates an index on the given table, based on a list of strings to specify the includedcolumns.
Queries
find(engine, table, _limit=N, _offset=N, order_by='id', **kw)will retrieve database records. The query will return an iterator that only loads 5000 records at any one time, even if_limitand_offsetare specified - meaning thatfindcan be run on tables of arbitrary size.order_byis a string column name, always returned in ascending order. Finally**kwcan be used to filter columns for equality, e.g.find(…, category=5).find_one(engine, table, **kw), likefindbut will only return the first matching row orNoneif no matches were found.distinct(engine, table, *columns, **kw)will return the combined distinct values forcolumns.**kwallows filtering the same way it does infind.all, alias forfindwithout filter options.
Adding and updating data
add_row(engine, table, row, ensure=True, types={})add the values in the dictionaryrowto the giventable.ensurewill check the schema and create the columns if necessary, their types can be specified using thetypesdictionary. If notypesare given, the type will be guessed from the first submitted value of the column, defaulting to a text column.update_row(engine, table, row, unique, ensure=True, types={})will update a row or set of rows based on the data in therowdictionary and the column names specified inunique. The remaining arguments are handled like those inadd_row.upsert(engine, table, row, unique, ensure=True, types={})will combine the semantics ofupdate_rowandadd_rowby first attempting to update existing data and otherwise (only if no record matching on theuniquekeys can be found) creating a new record.delete(engine, table, **kw)will remove records from a table.**kwis the same as infindand can be used to limit the set of records to be removed.
Feedback
Please feel free create issues on the GitHub tracker at okfn/sqlaload. For other discussions, join the okfn-labs mailing list.
| dataset | |
| docs | |
| test | |
| .gitignore | |
| LICENSE.txt | |
| README.md | |
| setup.cfg | |
| setup.py |