Source code for trackintel.io.postgis

from functools import wraps
from inspect import signature

import geopandas as gpd
from geopandas.io.sql import _get_srid_from_crs
from shapely import wkb
import pandas as pd
from geoalchemy2 import Geometry
from sqlalchemy import create_engine

import trackintel as ti


def _handle_con_string(func):
    """Decorator function to create a `Connection` out of a connection string."""

    @wraps(func)  # copy all metadata
    def wrapper(*args, **kwargs):
        # bind to name for easy access of both kwargs and args
        bound_values = signature(func).bind(*args, **kwargs)
        con = bound_values.arguments["con"]
        # only do something if connection string
        if not isinstance(con, str):
            return func(*args, **kwargs)

        engine = create_engine(con)
        con = engine.connect()

        # overwrite con argument with open connection
        bound_values.arguments["con"] = con
        args = bound_values.args
        kwargs = bound_values.kwargs
        try:
            result = func(*args, **kwargs)
        finally:
            con.close()
        return result

    return wrapper


[docs]@_handle_con_string def read_positionfixes_postgis( sql, con, geom_col="geom", crs=None, index_col=None, coerce_float=True, parse_dates=None, params=None, chunksize=None, **kwargs ): """Reads positionfixes from a PostGIS database. Parameters ---------- sql : str SQL query e.g. "SELECT * FROM positionfixes" con : str, sqlalchemy.engine.Connection or sqlalchemy.engine.Engine Connection string or active connection to PostGIS database. geom_col : str, default 'geom' The geometry column of the table. crs : optional Coordinate reference system to use for the returned GeoDataFrame index_col : string or list of strings, optional, default: None Column(s) to set as index(MultiIndex) coerce_float : boolean, default True Attempt to convert values of non-string, non-numeric objects (like decimal.Decimal) to floating point, useful for SQL result sets parse_dates : list or dict, default None - List of column names to parse as dates. - Dict of ``{column_name: format string}`` where format string is strftime compatible in case of parsing string times, or is one of (D, s, ns, ms, us) in case of parsing integer timestamps. - Dict of ``{column_name: arg dict}``, where the arg dict corresponds to the keyword arguments of :func:`pandas.to_datetime`. Especially useful with databases without native Datetime support, such as SQLite. params : list, tuple or dict, optional, default None List of parameters to pass to execute method. chunksize : int, default None If specified, return an iterator where chunksize is the number of rows to include in each chunk. **kwargs Further keyword arguments as available in trackintels trackintel.io.read_positionfixes_gpd(). Especially useful to rename column names from the SQL table to trackintel conform column names. See second example how to use it in code. Returns ------- GeoDataFrame A GeoDataFrame containing the positionfixes. Examples -------- >>> pfs = ti.io.read_positionfixes_postgis("SELECT * FROM positionfixes", con, geom_col="geom") >>> pfs = ti.io.read_positionfixes_postgis("SELECT * FROM positionfixes", con, geom_col="geom", ... index_col="id", user_id="USER", tracked_at="time") """ pfs = gpd.GeoDataFrame.from_postgis( sql, con, geom_col=geom_col, crs=crs, index_col=index_col, coerce_float=coerce_float, parse_dates=parse_dates, params=params, chunksize=chunksize, ) return ti.io.read_positionfixes_gpd(pfs, **kwargs)
[docs]@_handle_con_string def write_positionfixes_postgis( positionfixes, name, con, schema=None, if_exists="fail", index=True, index_label=None, chunksize=None, dtype=None ): positionfixes.to_postgis( name, con, schema=schema, if_exists=if_exists, index=index, index_label=index_label, chunksize=chunksize, dtype=dtype, )
[docs]@_handle_con_string def read_triplegs_postgis( sql, con, geom_col="geom", crs=None, index_col=None, coerce_float=True, parse_dates=None, params=None, chunksize=None, **kwargs ): """Reads triplegs from a PostGIS database. Parameters ---------- sql : str SQL query e.g. "SELECT * FROM triplegs" con : str, sqlalchemy.engine.Connection or sqlalchemy.engine.Engine Connection string or active connection to PostGIS database. geom_col : str, default 'geom' The geometry column of the table. crs : optional Coordinate reference system to use for the returned GeoDataFrame index_col : string or list of strings, optional, default: None Column(s) to set as index(MultiIndex) coerce_float : boolean, default True Attempt to convert values of non-string, non-numeric objects (like decimal.Decimal) to floating point, useful for SQL result sets parse_dates : list or dict, default None - List of column names to parse as dates. - Dict of ``{column_name: format string}`` where format string is strftime compatible in case of parsing string times, or is one of (D, s, ns, ms, us) in case of parsing integer timestamps. - Dict of ``{column_name: arg dict}``, where the arg dict corresponds to the keyword arguments of :func:`pandas.to_datetime`. Especially useful with databases without native Datetime support, such as SQLite. params : list, tuple or dict, optional, default None List of parameters to pass to execute method. chunksize : int, default None If specified, return an iterator where chunksize is the number of rows to include in each chunk. **kwargs Further keyword arguments as available in trackintels trackintel.io.read_triplegs_gpd(). Especially useful to rename column names from the SQL table to trackintel conform column names. See second example how to use it in code. Returns ------- GeoDataFrame A GeoDataFrame containing the triplegs. Examples -------- >>> tpls = ti.io.read_triplegs_postgis("SELECT * FROM triplegs", con, geom_col="geom") >>> tpls = ti.io.read_triplegs_postgis("SELECT * FROM triplegs", con, geom_col="geom", index_col="id", ... started_at="start_time", finished_at="end_time", user_id="USER") """ tpls = gpd.GeoDataFrame.from_postgis( sql, con, geom_col=geom_col, crs=crs, index_col=index_col, coerce_float=coerce_float, parse_dates=parse_dates, params=params, chunksize=chunksize, ) return ti.io.read_triplegs_gpd(tpls, **kwargs)
[docs]@_handle_con_string def write_triplegs_postgis( triplegs, name, con, schema=None, if_exists="fail", index=True, index_label=None, chunksize=None, dtype=None ): triplegs.to_postgis( name, con, schema=schema, if_exists=if_exists, index=index, index_label=index_label, chunksize=chunksize, dtype=dtype, )
[docs]@_handle_con_string def read_staypoints_postgis( sql, con, geom_col="geom", crs=None, index_col=None, coerce_float=True, parse_dates=None, params=None, chunksize=None, **kwargs ): """Read staypoints from a PostGIS database. Parameters ---------- sql : str SQL query e.g. "SELECT * FROM staypoints" con : str, sqlalchemy.engine.Connection or sqlalchemy.engine.Engine Connection string or active connection to PostGIS database. geom_col : str, default 'geom' The geometry column of the table. crs : optional Coordinate reference system to use for the returned GeoDataFrame index_col : string or list of strings, optional, default: None Column(s) to set as index(MultiIndex) coerce_float : boolean, default True Attempt to convert values of non-string, non-numeric objects (like decimal.Decimal) to floating point, useful for SQL result sets parse_dates : list or dict, default None - List of column names to parse as dates. - Dict of ``{column_name: format string}`` where format string is strftime compatible in case of parsing string times, or is one of (D, s, ns, ms, us) in case of parsing integer timestamps. - Dict of ``{column_name: arg dict}``, where the arg dict corresponds to the keyword arguments of :func:`pandas.to_datetime`. Especially useful with databases without native Datetime support, such as SQLite. params : list, tuple or dict, optional, default None List of parameters to pass to execute method. chunksize : int, default None If specified, return an iterator where chunksize is the number of rows to include in each chunk. **kwargs Further keyword arguments as available in trackintels trackintel.io.read_staypoints_gpd(). Especially useful to rename column names from the SQL table to trackintel conform column names. See second example how to use it in code. Returns ------- GeoDataFrame A GeoDataFrame containing the staypoints. Examples -------- >>> sp = ti.io.read_staypoints_postgis("SELECT * FROM staypoints", con, geom_col="geom") >>> sp = ti.io.read_staypoints_postgis("SELECT * FROM staypoints", con, geom_col="geom", index_col="id", ... started_at="start_time", finished_at="end_time", user_id="USER") """ sp = gpd.GeoDataFrame.from_postgis( sql, con, geom_col=geom_col, crs=crs, index_col=index_col, coerce_float=coerce_float, parse_dates=parse_dates, params=params, chunksize=chunksize, ) return ti.io.read_staypoints_gpd(sp, **kwargs)
[docs]@_handle_con_string def write_staypoints_postgis( staypoints, name, con, schema=None, if_exists="fail", index=True, index_label=None, chunksize=None, dtype=None ): staypoints.to_postgis( name, con, schema=schema, if_exists=if_exists, index=index, index_label=index_label, chunksize=chunksize, dtype=dtype, )
[docs]@_handle_con_string def read_locations_postgis( sql, con, center="center", crs=None, index_col=None, coerce_float=True, parse_dates=None, params=None, chunksize=None, **kwargs ): """Reads locations from a PostGIS database. Parameters ---------- sql : str SQL query e.g. "SELECT * FROM locations" con : str, sqlalchemy.engine.Connection or sqlalchemy.engine.Engine Connection string or active connection to PostGIS database. center : str, default 'center' The geometry column of the table. For the center of the location. crs : optional Coordinate reference system to use for the returned GeoDataFrame index_col : string or list of strings, optional, default: None Column(s) to set as index(MultiIndex) coerce_float : boolean, default True Attempt to convert values of non-string, non-numeric objects (like decimal.Decimal) to floating point, useful for SQL result sets parse_dates : list or dict, default None - List of column names to parse as dates. - Dict of ``{column_name: format string}`` where format string is strftime compatible in case of parsing string times, or is one of (D, s, ns, ms, us) in case of parsing integer timestamps. - Dict of ``{column_name: arg dict}``, where the arg dict corresponds to the keyword arguments of :func:`pandas.to_datetime`. Especially useful with databases without native Datetime support, such as SQLite. params : list, tuple or dict, optional, default None List of parameters to pass to execute method. chunksize : int, default None If specified, return an iterator where chunksize is the number of rows to include in each chunk. **kwargs Further keyword arguments as available in trackintels trackintel.io.read_locations_gpd(). Especially useful to rename column names from the SQL table to trackintel conform column names. See second example how to use it in code. Returns ------- GeoDataFrame A GeoDataFrame containing the locations. Examples -------- >>> locs = ti.io.read_locations_postgis("SELECT * FROM locations", con, center="center") >>> locs = ti.io.read_locations_postgis("SELECT * FROM locations", con, center="geom", index_col="id", ... user_id="USER", extent="extent") ) """ locs = gpd.GeoDataFrame.from_postgis( sql, con, geom_col=center, crs=crs, index_col=index_col, coerce_float=coerce_float, parse_dates=parse_dates, params=params, chunksize=chunksize, ) if "extent" in kwargs: locs[kwargs["extent"]] = gpd.GeoSeries.from_wkb(locs[kwargs["extent"]]) return ti.io.read_locations_gpd(locs, center=center, **kwargs)
[docs]@_handle_con_string def write_locations_postgis( locations, name, con, schema=None, if_exists="fail", index=True, index_label=None, chunksize=None, dtype=None ): # Assums that "extent" is not geometry column but center is. # May build additional check for that. if "extent" in locations.columns: # geopandas.to_postgis can only handle one geometry column -> do it manually srid = _get_srid_from_crs(locations) extent_schema = Geometry("POLYGON", srid) if dtype is None: dtype = {"extent": extent_schema} else: dtype["extent"] = extent_schema locations = locations.copy() locations["extent"] = locations["extent"].apply(lambda x: wkb.dumps(x, srid=srid, hex=True)) locations.to_postgis( name, con, schema=schema, if_exists=if_exists, index=index, index_label=index_label, chunksize=chunksize, dtype=dtype, )
[docs]@_handle_con_string def read_trips_postgis( sql, con, geom_col=None, crs=None, index_col=None, coerce_float=True, parse_dates=None, params=None, chunksize=None, **kwargs ): """Read trips from a PostGIS database. Parameters ---------- sql : str SQL query e.g. "SELECT * FROM trips" con : str, sqlalchemy.engine.Connection or sqlalchemy.engine.Engine Connection string or active connection to PostGIS database. geom_col : str, optional The geometry column of the table (if exists). Start and endpoint of the trip. crs : optional Coordinate reference system if table has geometry. index_col : string or list of strings, optional, default: None Column(s) to set as index(MultiIndex) coerce_float : boolean, default True Attempt to convert values of non-string, non-numeric objects (like decimal.Decimal) to floating point, useful for SQL result sets parse_dates : list or dict, default None - List of column names to parse as dates. - Dict of ``{column_name: format string}`` where format string is strftime compatible in case of parsing string times, or is one of (D, s, ns, ms, us) in case of parsing integer timestamps. - Dict of ``{column_name: arg dict}``, where the arg dict corresponds to the keyword arguments of :func:`pandas.to_datetime`. Especially useful with databases without native Datetime support, such as SQLite. params : list, tuple or dict, optional, default None List of parameters to pass to execute method. chunksize : int, default None If specified, return an iterator where chunksize is the number of rows to include in each chunk. **kwargs Further keyword arguments as available in trackintels trackintel.io.read_trips_gpd(). Especially useful to rename column names from the SQL table to trackintel conform column names. See second example how to use it in code. Returns ------- GeoDataFrame A GeoDataFrame containing the trips. Examples -------- >>> trips = ti.io.read_trips_postgis("SELECT * FROM trips", con) >>> trips = ti.io.read_trips_postgis("SELECT * FROM trips", con, geom_col="geom", index_col="id", ... started_at="start_time", finished_at="end_time", user_id="USER", ... origin_staypoint_id="ORIGIN", destination_staypoint_id="DEST") """ if geom_col is None: trips = pd.read_sql( sql, con, index_col=index_col, coerce_float=coerce_float, params=params, parse_dates=parse_dates, chunksize=chunksize, ) else: trips = gpd.GeoDataFrame.from_postgis( sql, con, geom_col=geom_col, crs=crs, index_col=index_col, coerce_float=coerce_float, parse_dates=parse_dates, params=params, chunksize=chunksize, ) return ti.io.read_trips_gpd(trips, **kwargs)
[docs]@_handle_con_string def write_trips_postgis( trips, name, con, schema=None, if_exists="fail", index=True, index_label=None, chunksize=None, dtype=None ): if isinstance(trips, gpd.GeoDataFrame): trips.to_postgis( name, con, schema=schema, if_exists=if_exists, index=index, index_label=index_label, chunksize=chunksize, dtype=dtype, ) else: # is DataFrame trips.to_sql( name, con, schema=schema, if_exists=if_exists, index=index, index_label=index_label, chunksize=chunksize, dtype=dtype, )
[docs]@_handle_con_string def read_tours_postgis( sql, con, geom_col=None, crs=None, index_col=None, coerce_float=True, parse_dates=None, params=None, chunksize=None, **kwargs ): """Read tours from a PostGIS database. Parameters ---------- sql : str SQL query e.g. "SELECT * FROM tours" con : str, sqlalchemy.engine.Connection or sqlalchemy.engine.Engine Connection string or active connection to PostGIS database. geom_col : str, optional The geometry column of the table (if exists). crs : optional Coordinate reference system if table has geometry. index_col : string or list of strings, optional Column(s) to set as index(MultiIndex) coerce_float : boolean, default True Attempt to convert values of non-string, non-numeric objects (like decimal.Decimal) to floating point, useful for SQL result sets parse_dates : list or dict, default None - List of column names to parse as dates. - Dict of ``{column_name: format string}`` where format string is strftime compatible in case of parsing string times, or is one of (D, s, ns, ms, us) in case of parsing integer timestamps. - Dict of ``{column_name: arg dict}``, where the arg dict corresponds to the keyword arguments of :func:`pandas.to_datetime`. Especially useful with databases without native Datetime support, such as SQLite. params : list, tuple or dict, optional, default None List of parameters to pass to execute method. chunksize : int, default None If specified, return an iterator where chunksize is the number of rows to include in each chunk. **kwargs Further keyword arguments as available in trackintels trackintel.io.read_tours_gpd(). Especially useful to rename column names from the SQL table to trackintel conform column names. See second example how to use it in code. Returns ------- GeoDataFrame (as trackintel tours) A GeoDataFrame containing the tours. Examples -------- >>> tours = ti.io.read_tours_postgis("SELECT * FROM tours", con) >>> tours = ti.io.read_tours_postgis("SELECT * FROM tours", con, index_col="id", started_at="start_time", ... finished_at="end_time", user_id="USER") """ if geom_col is None: tours = pd.read_sql( sql, con, index_col=index_col, coerce_float=coerce_float, params=params, parse_dates=parse_dates, chunksize=chunksize, ) else: tours = gpd.GeoDataFrame.from_postgis( sql, con, geom_col=geom_col, crs=crs, index_col=index_col, coerce_float=coerce_float, parse_dates=parse_dates, params=params, chunksize=chunksize, ) return ti.io.read_tours_gpd(tours, **kwargs)
[docs]@_handle_con_string def write_tours_postgis( tours, name, con, schema=None, if_exists="fail", index=True, index_label=None, chunksize=None, dtype=None ): write_trips_postgis( tours, name=name, con=con, schema=schema, if_exists=if_exists, index=index, index_label=index_label, chunksize=chunksize, dtype=dtype, )
# helper docstring to change __doc__ of all write functions conveniently in one place __doc = """Stores {long} to PostGIS. Usually, this is directly called on a {long} DataFrame (see example below). Parameters ---------- {long} : GeoDataFrame (as trackintel {long}) The {long} to store to the database. name : str The name of the table to write to. con : str, sqlalchemy.engine.Connection or sqlalchemy.engine.Engine Connection string or active connection to PostGIS database. schema : str, optional The schema (if the database supports this) where the table resides. if_exists : str, {{'fail', 'replace', 'append'}}, default 'fail' How to behave if the table already exists. - fail: Raise a ValueError. - replace: Drop the table before inserting new values. - append: Insert new values to the existing table. index : bool, default True Write DataFrame index as a column. Uses index_label as the column name in the table. index_label : str or sequence, default None Column label for index column(s). If None is given (default) and index is True, then the index names are used. chunksize : int, optional How many entries should be written at the same time. dtype: dict of column name to SQL type, default None Specifying the datatype for columns. The keys should be the column names and the values should be the SQLAlchemy types. Examples -------- >>> {short}.as_{long}.to_postgis(conn_string, table_name) >>> ti.io.postgis.write_{long}_postgis({short}, conn_string, table_name) """ write_positionfixes_postgis.__doc__ = __doc.format(long="positionfixes", short="pfs") write_triplegs_postgis.__doc__ = __doc.format(long="triplegs", short="tpls") write_staypoints_postgis.__doc__ = __doc.format(long="staypoints", short="sp") write_locations_postgis.__doc__ = __doc.format(long="locations", short="locs") write_trips_postgis.__doc__ = __doc.format(long="trips", short="trips") write_tours_postgis.__doc__ = __doc.format(long="tours", short="tours")