1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597 |
- # -*- coding: utf-8 -*-
- """
- Collection of query wrappers / abstractions to both facilitate data
- retrieval and to reduce dependency on DB-specific API.
- """
- from __future__ import division, print_function
- from contextlib import contextmanager
- from datetime import date, datetime, time
- from functools import partial
- import re
- import warnings
- import numpy as np
- import pandas._libs.lib as lib
- from pandas.compat import (
- map, raise_with_traceback, string_types, text_type, zip)
- from pandas.core.dtypes.common import (
- is_datetime64tz_dtype, is_dict_like, is_list_like)
- from pandas.core.dtypes.dtypes import DatetimeTZDtype
- from pandas.core.dtypes.missing import isna
- from pandas.core.api import DataFrame, Series
- from pandas.core.base import PandasObject
- from pandas.core.tools.datetimes import to_datetime
- class SQLAlchemyRequired(ImportError):
- pass
- class DatabaseError(IOError):
- pass
- # -----------------------------------------------------------------------------
- # -- Helper functions
- _SQLALCHEMY_INSTALLED = None
- def _is_sqlalchemy_connectable(con):
- global _SQLALCHEMY_INSTALLED
- if _SQLALCHEMY_INSTALLED is None:
- try:
- import sqlalchemy
- _SQLALCHEMY_INSTALLED = True
- from distutils.version import LooseVersion
- ver = sqlalchemy.__version__
- # For sqlalchemy versions < 0.8.2, the BIGINT type is recognized
- # for a sqlite engine, which results in a warning when trying to
- # read/write a DataFrame with int64 values. (GH7433)
- if LooseVersion(ver) < LooseVersion('0.8.2'):
- from sqlalchemy import BigInteger
- from sqlalchemy.ext.compiler import compiles
- @compiles(BigInteger, 'sqlite')
- def compile_big_int_sqlite(type_, compiler, **kw):
- return 'INTEGER'
- except ImportError:
- _SQLALCHEMY_INSTALLED = False
- if _SQLALCHEMY_INSTALLED:
- import sqlalchemy
- return isinstance(con, sqlalchemy.engine.Connectable)
- else:
- return False
- def _convert_params(sql, params):
- """Convert SQL and params args to DBAPI2.0 compliant format."""
- args = [sql]
- if params is not None:
- if hasattr(params, 'keys'): # test if params is a mapping
- args += [params]
- else:
- args += [list(params)]
- return args
- def _process_parse_dates_argument(parse_dates):
- """Process parse_dates argument for read_sql functions"""
- # handle non-list entries for parse_dates gracefully
- if parse_dates is True or parse_dates is None or parse_dates is False:
- parse_dates = []
- elif not hasattr(parse_dates, '__iter__'):
- parse_dates = [parse_dates]
- return parse_dates
- def _handle_date_column(col, utc=None, format=None):
- if isinstance(format, dict):
- return to_datetime(col, errors='ignore', **format)
- else:
- # Allow passing of formatting string for integers
- # GH17855
- if format is None and (issubclass(col.dtype.type, np.floating) or
- issubclass(col.dtype.type, np.integer)):
- format = 's'
- if format in ['D', 'd', 'h', 'm', 's', 'ms', 'us', 'ns']:
- return to_datetime(col, errors='coerce', unit=format, utc=utc)
- elif is_datetime64tz_dtype(col):
- # coerce to UTC timezone
- # GH11216
- return to_datetime(col, utc=True)
- else:
- return to_datetime(col, errors='coerce', format=format, utc=utc)
- def _parse_date_columns(data_frame, parse_dates):
- """
- Force non-datetime columns to be read as such.
- Supports both string formatted and integer timestamp columns.
- """
- parse_dates = _process_parse_dates_argument(parse_dates)
- # we want to coerce datetime64_tz dtypes for now to UTC
- # we could in theory do a 'nice' conversion from a FixedOffset tz
- # GH11216
- for col_name, df_col in data_frame.iteritems():
- if is_datetime64tz_dtype(df_col) or col_name in parse_dates:
- try:
- fmt = parse_dates[col_name]
- except TypeError:
- fmt = None
- data_frame[col_name] = _handle_date_column(df_col, format=fmt)
- return data_frame
- def _wrap_result(data, columns, index_col=None, coerce_float=True,
- parse_dates=None):
- """Wrap result set of query in a DataFrame."""
- frame = DataFrame.from_records(data, columns=columns,
- coerce_float=coerce_float)
- frame = _parse_date_columns(frame, parse_dates)
- if index_col is not None:
- frame.set_index(index_col, inplace=True)
- return frame
- def execute(sql, con, cur=None, params=None):
- """
- Execute the given SQL query using the provided connection object.
- Parameters
- ----------
- sql : string
- SQL query to be executed.
- con : SQLAlchemy connectable(engine/connection) or sqlite3 connection
- Using SQLAlchemy makes it possible to use any DB supported by the
- library.
- If a DBAPI2 object, only sqlite3 is supported.
- cur : deprecated, cursor is obtained from connection, default: None
- params : list or tuple, optional, default: None
- List of parameters to pass to execute method.
- Returns
- -------
- Results Iterable
- """
- if cur is None:
- pandas_sql = pandasSQL_builder(con)
- else:
- pandas_sql = pandasSQL_builder(cur, is_cursor=True)
- args = _convert_params(sql, params)
- return pandas_sql.execute(*args)
- # -----------------------------------------------------------------------------
- # -- Read and write to DataFrames
- def read_sql_table(table_name, con, schema=None, index_col=None,
- coerce_float=True, parse_dates=None, columns=None,
- chunksize=None):
- """Read SQL database table into a DataFrame.
- Given a table name and a SQLAlchemy connectable, returns a DataFrame.
- This function does not support DBAPI connections.
- Parameters
- ----------
- table_name : string
- Name of SQL table in database.
- con : SQLAlchemy connectable (or database string URI)
- SQLite DBAPI connection mode not supported.
- schema : string, default None
- Name of SQL schema in database to query (if database flavor
- supports this). Uses default schema if None (default).
- index_col : string or list of strings, optional, default: None
- Column(s) to set as index(MultiIndex).
- coerce_float : boolean, default True
- Attempts to convert values of non-string, non-numeric objects (like
- decimal.Decimal) to floating point. Can result in loss of Precision.
- parse_dates : list or dict, default: None
- - List of column names to parse as dates.
- - Dict of ``{column_name: format string}`` where format string is
- strftime compatible in case of parsing string times or is one of
- (D, s, ns, ms, us) in case of parsing integer timestamps.
- - Dict of ``{column_name: arg dict}``, where the arg dict corresponds
- to the keyword arguments of :func:`pandas.to_datetime`
- Especially useful with databases without native Datetime support,
- such as SQLite.
- columns : list, default: None
- List of column names to select from SQL table
- chunksize : int, default None
- If specified, returns an iterator where `chunksize` is the number of
- rows to include in each chunk.
- Returns
- -------
- DataFrame
- See Also
- --------
- read_sql_query : Read SQL query into a DataFrame.
- read_sql
- Notes
- -----
- Any datetime values with time zone information will be converted to UTC.
- """
- con = _engine_builder(con)
- if not _is_sqlalchemy_connectable(con):
- raise NotImplementedError("read_sql_table only supported for "
- "SQLAlchemy connectable.")
- import sqlalchemy
- from sqlalchemy.schema import MetaData
- meta = MetaData(con, schema=schema)
- try:
- meta.reflect(only=[table_name], views=True)
- except sqlalchemy.exc.InvalidRequestError:
- raise ValueError("Table {name} not found".format(name=table_name))
- pandas_sql = SQLDatabase(con, meta=meta)
- table = pandas_sql.read_table(
- table_name, index_col=index_col, coerce_float=coerce_float,
- parse_dates=parse_dates, columns=columns, chunksize=chunksize)
- if table is not None:
- return table
- else:
- raise ValueError("Table {name} not found".format(name=table_name), con)
- def read_sql_query(sql, con, index_col=None, coerce_float=True, params=None,
- parse_dates=None, chunksize=None):
- """Read SQL query into a DataFrame.
- Returns a DataFrame corresponding to the result set of the query
- string. Optionally provide an `index_col` parameter to use one of the
- columns as the index, otherwise default integer index will be used.
- Parameters
- ----------
- sql : string SQL query or SQLAlchemy Selectable (select or text object)
- SQL query to be executed.
- con : SQLAlchemy connectable(engine/connection), database string URI,
- or sqlite3 DBAPI2 connection
- Using SQLAlchemy makes it possible to use any DB supported by that
- library.
- If a DBAPI2 object, only sqlite3 is supported.
- index_col : string or list of strings, optional, default: None
- Column(s) to set as index(MultiIndex).
- coerce_float : boolean, default True
- Attempts to convert values of non-string, non-numeric objects (like
- decimal.Decimal) to floating point. Useful for SQL result sets.
- params : list, tuple or dict, optional, default: None
- List of parameters to pass to execute method. The syntax used
- to pass parameters is database driver dependent. Check your
- database driver documentation for which of the five syntax styles,
- described in PEP 249's paramstyle, is supported.
- Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}
- parse_dates : list or dict, default: None
- - List of column names to parse as dates.
- - Dict of ``{column_name: format string}`` where format string is
- strftime compatible in case of parsing string times, or is one of
- (D, s, ns, ms, us) in case of parsing integer timestamps.
- - Dict of ``{column_name: arg dict}``, where the arg dict corresponds
- to the keyword arguments of :func:`pandas.to_datetime`
- Especially useful with databases without native Datetime support,
- such as SQLite.
- chunksize : int, default None
- If specified, return an iterator where `chunksize` is the number of
- rows to include in each chunk.
- Returns
- -------
- DataFrame
- See Also
- --------
- read_sql_table : Read SQL database table into a DataFrame.
- read_sql
- Notes
- -----
- Any datetime values with time zone information parsed via the `parse_dates`
- parameter will be converted to UTC.
- """
- pandas_sql = pandasSQL_builder(con)
- return pandas_sql.read_query(
- sql, index_col=index_col, params=params, coerce_float=coerce_float,
- parse_dates=parse_dates, chunksize=chunksize)
- def read_sql(sql, con, index_col=None, coerce_float=True, params=None,
- parse_dates=None, columns=None, chunksize=None):
- """
- Read SQL query or database table into a DataFrame.
- This function is a convenience wrapper around ``read_sql_table`` and
- ``read_sql_query`` (for backward compatibility). It will delegate
- to the specific function depending on the provided input. A SQL query
- will be routed to ``read_sql_query``, while a database table name will
- be routed to ``read_sql_table``. Note that the delegated function might
- have more specific notes about their functionality not listed here.
- Parameters
- ----------
- sql : string or SQLAlchemy Selectable (select or text object)
- SQL query to be executed or a table name.
- con : SQLAlchemy connectable (engine/connection) or database string URI
- or DBAPI2 connection (fallback mode)
- Using SQLAlchemy makes it possible to use any DB supported by that
- library. If a DBAPI2 object, only sqlite3 is supported.
- index_col : string or list of strings, optional, default: None
- Column(s) to set as index(MultiIndex).
- coerce_float : boolean, default True
- Attempts to convert values of non-string, non-numeric objects (like
- decimal.Decimal) to floating point, useful for SQL result sets.
- params : list, tuple or dict, optional, default: None
- List of parameters to pass to execute method. The syntax used
- to pass parameters is database driver dependent. Check your
- database driver documentation for which of the five syntax styles,
- described in PEP 249's paramstyle, is supported.
- Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}
- parse_dates : list or dict, default: None
- - List of column names to parse as dates.
- - Dict of ``{column_name: format string}`` where format string is
- strftime compatible in case of parsing string times, or is one of
- (D, s, ns, ms, us) in case of parsing integer timestamps.
- - Dict of ``{column_name: arg dict}``, where the arg dict corresponds
- to the keyword arguments of :func:`pandas.to_datetime`
- Especially useful with databases without native Datetime support,
- such as SQLite.
- columns : list, default: None
- List of column names to select from SQL table (only used when reading
- a table).
- chunksize : int, default None
- If specified, return an iterator where `chunksize` is the
- number of rows to include in each chunk.
- Returns
- -------
- DataFrame
- See Also
- --------
- read_sql_table : Read SQL database table into a DataFrame.
- read_sql_query : Read SQL query into a DataFrame.
- """
- pandas_sql = pandasSQL_builder(con)
- if isinstance(pandas_sql, SQLiteDatabase):
- return pandas_sql.read_query(
- sql, index_col=index_col, params=params,
- coerce_float=coerce_float, parse_dates=parse_dates,
- chunksize=chunksize)
- try:
- _is_table_name = pandas_sql.has_table(sql)
- except Exception:
- # using generic exception to catch errors from sql drivers (GH24988)
- _is_table_name = False
- if _is_table_name:
- pandas_sql.meta.reflect(only=[sql])
- return pandas_sql.read_table(
- sql, index_col=index_col, coerce_float=coerce_float,
- parse_dates=parse_dates, columns=columns, chunksize=chunksize)
- else:
- return pandas_sql.read_query(
- sql, index_col=index_col, params=params,
- coerce_float=coerce_float, parse_dates=parse_dates,
- chunksize=chunksize)
- def to_sql(frame, name, con, schema=None, if_exists='fail', index=True,
- index_label=None, chunksize=None, dtype=None, method=None):
- """
- Write records stored in a DataFrame to a SQL database.
- Parameters
- ----------
- frame : DataFrame, Series
- name : string
- Name of SQL table.
- con : SQLAlchemy connectable(engine/connection) or database string URI
- or sqlite3 DBAPI2 connection
- Using SQLAlchemy makes it possible to use any DB supported by that
- library.
- If a DBAPI2 object, only sqlite3 is supported.
- schema : string, default None
- Name of SQL schema in database to write to (if database flavor
- supports this). If None, use default schema (default).
- if_exists : {'fail', 'replace', 'append'}, default 'fail'
- - fail: If table exists, do nothing.
- - replace: If table exists, drop it, recreate it, and insert data.
- - append: If table exists, insert data. Create if does not exist.
- index : boolean, default True
- Write DataFrame index as a column.
- index_label : string or sequence, default None
- Column label for index column(s). If None is given (default) and
- `index` is True, then the index names are used.
- A sequence should be given if the DataFrame uses MultiIndex.
- chunksize : int, default None
- If not None, then rows will be written in batches of this size at a
- time. If None, all rows will be written at once.
- dtype : single SQLtype or dict of column name to SQL type, default None
- Optional specifying the datatype for columns. The SQL type should
- be a SQLAlchemy type, or a string for sqlite3 fallback connection.
- If all columns are of the same type, one single value can be used.
- method : {None, 'multi', callable}, default None
- Controls the SQL insertion clause used:
- - None : Uses standard SQL ``INSERT`` clause (one per row).
- - 'multi': Pass multiple values in a single ``INSERT`` clause.
- - callable with signature ``(pd_table, conn, keys, data_iter)``.
- Details and a sample callable implementation can be found in the
- section :ref:`insert method <io.sql.method>`.
- .. versionadded:: 0.24.0
- """
- if if_exists not in ('fail', 'replace', 'append'):
- raise ValueError("'{0}' is not valid for if_exists".format(if_exists))
- pandas_sql = pandasSQL_builder(con, schema=schema)
- if isinstance(frame, Series):
- frame = frame.to_frame()
- elif not isinstance(frame, DataFrame):
- raise NotImplementedError("'frame' argument should be either a "
- "Series or a DataFrame")
- pandas_sql.to_sql(frame, name, if_exists=if_exists, index=index,
- index_label=index_label, schema=schema,
- chunksize=chunksize, dtype=dtype, method=method)
- def has_table(table_name, con, schema=None):
- """
- Check if DataBase has named table.
- Parameters
- ----------
- table_name: string
- Name of SQL table.
- con: SQLAlchemy connectable(engine/connection) or sqlite3 DBAPI2 connection
- Using SQLAlchemy makes it possible to use any DB supported by that
- library.
- If a DBAPI2 object, only sqlite3 is supported.
- schema : string, default None
- Name of SQL schema in database to write to (if database flavor supports
- this). If None, use default schema (default).
- Returns
- -------
- boolean
- """
- pandas_sql = pandasSQL_builder(con, schema=schema)
- return pandas_sql.has_table(table_name)
- table_exists = has_table
- def _engine_builder(con):
- """
- Returns a SQLAlchemy engine from a URI (if con is a string)
- else it just return con without modifying it.
- """
- global _SQLALCHEMY_INSTALLED
- if isinstance(con, string_types):
- try:
- import sqlalchemy
- except ImportError:
- _SQLALCHEMY_INSTALLED = False
- else:
- con = sqlalchemy.create_engine(con)
- return con
- return con
- def pandasSQL_builder(con, schema=None, meta=None,
- is_cursor=False):
- """
- Convenience function to return the correct PandasSQL subclass based on the
- provided parameters.
- """
- # When support for DBAPI connections is removed,
- # is_cursor should not be necessary.
- con = _engine_builder(con)
- if _is_sqlalchemy_connectable(con):
- return SQLDatabase(con, schema=schema, meta=meta)
- elif isinstance(con, string_types):
- raise ImportError("Using URI string without sqlalchemy installed.")
- else:
- return SQLiteDatabase(con, is_cursor=is_cursor)
- class SQLTable(PandasObject):
- """
- For mapping Pandas tables to SQL tables.
- Uses fact that table is reflected by SQLAlchemy to
- do better type conversions.
- Also holds various flags needed to avoid having to
- pass them between functions all the time.
- """
- # TODO: support for multiIndex
- def __init__(self, name, pandas_sql_engine, frame=None, index=True,
- if_exists='fail', prefix='pandas', index_label=None,
- schema=None, keys=None, dtype=None):
- self.name = name
- self.pd_sql = pandas_sql_engine
- self.prefix = prefix
- self.frame = frame
- self.index = self._index_name(index, index_label)
- self.schema = schema
- self.if_exists = if_exists
- self.keys = keys
- self.dtype = dtype
- if frame is not None:
- # We want to initialize based on a dataframe
- self.table = self._create_table_setup()
- else:
- # no data provided, read-only mode
- self.table = self.pd_sql.get_table(self.name, self.schema)
- if self.table is None:
- raise ValueError(
- "Could not init table '{name}'".format(name=name))
- def exists(self):
- return self.pd_sql.has_table(self.name, self.schema)
- def sql_schema(self):
- from sqlalchemy.schema import CreateTable
- return str(CreateTable(self.table).compile(self.pd_sql.connectable))
- def _execute_create(self):
- # Inserting table into database, add to MetaData object
- self.table = self.table.tometadata(self.pd_sql.meta)
- self.table.create()
- def create(self):
- if self.exists():
- if self.if_exists == 'fail':
- raise ValueError(
- "Table '{name}' already exists.".format(name=self.name))
- elif self.if_exists == 'replace':
- self.pd_sql.drop_table(self.name, self.schema)
- self._execute_create()
- elif self.if_exists == 'append':
- pass
- else:
- raise ValueError(
- "'{0}' is not valid for if_exists".format(self.if_exists))
- else:
- self._execute_create()
- def _execute_insert(self, conn, keys, data_iter):
- """Execute SQL statement inserting data
- Parameters
- ----------
- conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
- keys : list of str
- Column names
- data_iter : generator of list
- Each item contains a list of values to be inserted
- """
- data = [dict(zip(keys, row)) for row in data_iter]
- conn.execute(self.table.insert(), data)
- def _execute_insert_multi(self, conn, keys, data_iter):
- """Alternative to _execute_insert for DBs support multivalue INSERT.
- Note: multi-value insert is usually faster for analytics DBs
- and tables containing a few columns
- but performance degrades quickly with increase of columns.
- """
- data = [dict(zip(keys, row)) for row in data_iter]
- conn.execute(self.table.insert(data))
- def insert_data(self):
- if self.index is not None:
- temp = self.frame.copy()
- temp.index.names = self.index
- try:
- temp.reset_index(inplace=True)
- except ValueError as err:
- raise ValueError(
- "duplicate name in index/columns: {0}".format(err))
- else:
- temp = self.frame
- column_names = list(map(text_type, temp.columns))
- ncols = len(column_names)
- data_list = [None] * ncols
- blocks = temp._data.blocks
- for b in blocks:
- if b.is_datetime:
- # return datetime.datetime objects
- if b.is_datetimetz:
- # GH 9086: Ensure we return datetimes with timezone info
- # Need to return 2-D data; DatetimeIndex is 1D
- d = b.values.to_pydatetime()
- d = np.expand_dims(d, axis=0)
- else:
- # convert to microsecond resolution for datetime.datetime
- d = b.values.astype('M8[us]').astype(object)
- else:
- d = np.array(b.get_values(), dtype=object)
- # replace NaN with None
- if b._can_hold_na:
- mask = isna(d)
- d[mask] = None
- for col_loc, col in zip(b.mgr_locs, d):
- data_list[col_loc] = col
- return column_names, data_list
- def insert(self, chunksize=None, method=None):
- # set insert method
- if method is None:
- exec_insert = self._execute_insert
- elif method == 'multi':
- exec_insert = self._execute_insert_multi
- elif callable(method):
- exec_insert = partial(method, self)
- else:
- raise ValueError('Invalid parameter `method`: {}'.format(method))
- keys, data_list = self.insert_data()
- nrows = len(self.frame)
- if nrows == 0:
- return
- if chunksize is None:
- chunksize = nrows
- elif chunksize == 0:
- raise ValueError('chunksize argument should be non-zero')
- chunks = int(nrows / chunksize) + 1
- with self.pd_sql.run_transaction() as conn:
- for i in range(chunks):
- start_i = i * chunksize
- end_i = min((i + 1) * chunksize, nrows)
- if start_i >= end_i:
- break
- chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list])
- exec_insert(conn, keys, chunk_iter)
- def _query_iterator(self, result, chunksize, columns, coerce_float=True,
- parse_dates=None):
- """Return generator through chunked result set."""
- while True:
- data = result.fetchmany(chunksize)
- if not data:
- break
- else:
- self.frame = DataFrame.from_records(
- data, columns=columns, coerce_float=coerce_float)
- self._harmonize_columns(parse_dates=parse_dates)
- if self.index is not None:
- self.frame.set_index(self.index, inplace=True)
- yield self.frame
- def read(self, coerce_float=True, parse_dates=None, columns=None,
- chunksize=None):
- if columns is not None and len(columns) > 0:
- from sqlalchemy import select
- cols = [self.table.c[n] for n in columns]
- if self.index is not None:
- [cols.insert(0, self.table.c[idx]) for idx in self.index[::-1]]
- sql_select = select(cols)
- else:
- sql_select = self.table.select()
- result = self.pd_sql.execute(sql_select)
- column_names = result.keys()
- if chunksize is not None:
- return self._query_iterator(result, chunksize, column_names,
- coerce_float=coerce_float,
- parse_dates=parse_dates)
- else:
- data = result.fetchall()
- self.frame = DataFrame.from_records(
- data, columns=column_names, coerce_float=coerce_float)
- self._harmonize_columns(parse_dates=parse_dates)
- if self.index is not None:
- self.frame.set_index(self.index, inplace=True)
- return self.frame
- def _index_name(self, index, index_label):
- # for writing: index=True to include index in sql table
- if index is True:
- nlevels = self.frame.index.nlevels
- # if index_label is specified, set this as index name(s)
- if index_label is not None:
- if not isinstance(index_label, list):
- index_label = [index_label]
- if len(index_label) != nlevels:
- raise ValueError(
- "Length of 'index_label' should match number of "
- "levels, which is {0}".format(nlevels))
- else:
- return index_label
- # return the used column labels for the index columns
- if (nlevels == 1 and 'index' not in self.frame.columns and
- self.frame.index.name is None):
- return ['index']
- else:
- return [l if l is not None else "level_{0}".format(i)
- for i, l in enumerate(self.frame.index.names)]
- # for reading: index=(list of) string to specify column to set as index
- elif isinstance(index, string_types):
- return [index]
- elif isinstance(index, list):
- return index
- else:
- return None
- def _get_column_names_and_types(self, dtype_mapper):
- column_names_and_types = []
- if self.index is not None:
- for i, idx_label in enumerate(self.index):
- idx_type = dtype_mapper(
- self.frame.index._get_level_values(i))
- column_names_and_types.append((text_type(idx_label),
- idx_type, True))
- column_names_and_types += [
- (text_type(self.frame.columns[i]),
- dtype_mapper(self.frame.iloc[:, i]),
- False)
- for i in range(len(self.frame.columns))
- ]
- return column_names_and_types
- def _create_table_setup(self):
- from sqlalchemy import Table, Column, PrimaryKeyConstraint
- column_names_and_types = self._get_column_names_and_types(
- self._sqlalchemy_type
- )
- columns = [Column(name, typ, index=is_index)
- for name, typ, is_index in column_names_and_types]
- if self.keys is not None:
- if not is_list_like(self.keys):
- keys = [self.keys]
- else:
- keys = self.keys
- pkc = PrimaryKeyConstraint(*keys, name=self.name + '_pk')
- columns.append(pkc)
- schema = self.schema or self.pd_sql.meta.schema
- # At this point, attach to new metadata, only attach to self.meta
- # once table is created.
- from sqlalchemy.schema import MetaData
- meta = MetaData(self.pd_sql, schema=schema)
- return Table(self.name, meta, *columns, schema=schema)
- def _harmonize_columns(self, parse_dates=None):
- """
- Make the DataFrame's column types align with the SQL table
- column types.
- Need to work around limited NA value support. Floats are always
- fine, ints must always be floats if there are Null values.
- Booleans are hard because converting bool column with None replaces
- all Nones with false. Therefore only convert bool if there are no
- NA values.
- Datetimes should already be converted to np.datetime64 if supported,
- but here we also force conversion if required.
- """
- parse_dates = _process_parse_dates_argument(parse_dates)
- for sql_col in self.table.columns:
- col_name = sql_col.name
- try:
- df_col = self.frame[col_name]
- # Handle date parsing upfront; don't try to convert columns
- # twice
- if col_name in parse_dates:
- try:
- fmt = parse_dates[col_name]
- except TypeError:
- fmt = None
- self.frame[col_name] = _handle_date_column(
- df_col, format=fmt)
- continue
- # the type the dataframe column should have
- col_type = self._get_dtype(sql_col.type)
- if (col_type is datetime or col_type is date or
- col_type is DatetimeTZDtype):
- # Convert tz-aware Datetime SQL columns to UTC
- utc = col_type is DatetimeTZDtype
- self.frame[col_name] = _handle_date_column(df_col, utc=utc)
- elif col_type is float:
- # floats support NA, can always convert!
- self.frame[col_name] = df_col.astype(col_type, copy=False)
- elif len(df_col) == df_col.count():
- # No NA values, can convert ints and bools
- if col_type is np.dtype('int64') or col_type is bool:
- self.frame[col_name] = df_col.astype(
- col_type, copy=False)
- except KeyError:
- pass # this column not in results
- def _sqlalchemy_type(self, col):
- dtype = self.dtype or {}
- if col.name in dtype:
- return self.dtype[col.name]
- # Infer type of column, while ignoring missing values.
- # Needed for inserting typed data containing NULLs, GH 8778.
- col_type = lib.infer_dtype(col, skipna=True)
- from sqlalchemy.types import (BigInteger, Integer, Float,
- Text, Boolean,
- DateTime, Date, Time, TIMESTAMP)
- if col_type == 'datetime64' or col_type == 'datetime':
- # GH 9086: TIMESTAMP is the suggested type if the column contains
- # timezone information
- try:
- if col.dt.tz is not None:
- return TIMESTAMP(timezone=True)
- except AttributeError:
- # The column is actually a DatetimeIndex
- if col.tz is not None:
- return TIMESTAMP(timezone=True)
- return DateTime
- if col_type == 'timedelta64':
- warnings.warn("the 'timedelta' type is not supported, and will be "
- "written as integer values (ns frequency) to the "
- "database.", UserWarning, stacklevel=8)
- return BigInteger
- elif col_type == 'floating':
- if col.dtype == 'float32':
- return Float(precision=23)
- else:
- return Float(precision=53)
- elif col_type == 'integer':
- if col.dtype == 'int32':
- return Integer
- else:
- return BigInteger
- elif col_type == 'boolean':
- return Boolean
- elif col_type == 'date':
- return Date
- elif col_type == 'time':
- return Time
- elif col_type == 'complex':
- raise ValueError('Complex datatypes not supported')
- return Text
- def _get_dtype(self, sqltype):
- from sqlalchemy.types import (Integer, Float, Boolean, DateTime,
- Date, TIMESTAMP)
- if isinstance(sqltype, Float):
- return float
- elif isinstance(sqltype, Integer):
- # TODO: Refine integer size.
- return np.dtype('int64')
- elif isinstance(sqltype, TIMESTAMP):
- # we have a timezone capable type
- if not sqltype.timezone:
- return datetime
- return DatetimeTZDtype
- elif isinstance(sqltype, DateTime):
- # Caution: np.datetime64 is also a subclass of np.number.
- return datetime
- elif isinstance(sqltype, Date):
- return date
- elif isinstance(sqltype, Boolean):
- return bool
- return object
- class PandasSQL(PandasObject):
- """
- Subclasses Should define read_sql and to_sql.
- """
- def read_sql(self, *args, **kwargs):
- raise ValueError("PandasSQL must be created with an SQLAlchemy "
- "connectable or sqlite connection")
- def to_sql(self, *args, **kwargs):
- raise ValueError("PandasSQL must be created with an SQLAlchemy "
- "connectable or sqlite connection")
- class SQLDatabase(PandasSQL):
- """
- This class enables conversion between DataFrame and SQL databases
- using SQLAlchemy to handle DataBase abstraction.
- Parameters
- ----------
- engine : SQLAlchemy connectable
- Connectable to connect with the database. Using SQLAlchemy makes it
- possible to use any DB supported by that library.
- schema : string, default None
- Name of SQL schema in database to write to (if database flavor
- supports this). If None, use default schema (default).
- meta : SQLAlchemy MetaData object, default None
- If provided, this MetaData object is used instead of a newly
- created. This allows to specify database flavor specific
- arguments in the MetaData object.
- """
- def __init__(self, engine, schema=None, meta=None):
- self.connectable = engine
- if not meta:
- from sqlalchemy.schema import MetaData
- meta = MetaData(self.connectable, schema=schema)
- self.meta = meta
- @contextmanager
- def run_transaction(self):
- with self.connectable.begin() as tx:
- if hasattr(tx, 'execute'):
- yield tx
- else:
- yield self.connectable
- def execute(self, *args, **kwargs):
- """Simple passthrough to SQLAlchemy connectable"""
- return self.connectable.execute(*args, **kwargs)
- def read_table(self, table_name, index_col=None, coerce_float=True,
- parse_dates=None, columns=None, schema=None,
- chunksize=None):
- """Read SQL database table into a DataFrame.
- Parameters
- ----------
- table_name : string
- Name of SQL table in database.
- index_col : string, optional, default: None
- Column to set as index.
- coerce_float : boolean, default True
- Attempts to convert values of non-string, non-numeric objects
- (like decimal.Decimal) to floating point. This can result in
- loss of precision.
- parse_dates : list or dict, default: None
- - List of column names to parse as dates.
- - Dict of ``{column_name: format string}`` where format string is
- strftime compatible in case of parsing string times, or is one of
- (D, s, ns, ms, us) in case of parsing integer timestamps.
- - Dict of ``{column_name: arg}``, where the arg corresponds
- to the keyword arguments of :func:`pandas.to_datetime`.
- Especially useful with databases without native Datetime support,
- such as SQLite.
- columns : list, default: None
- List of column names to select from SQL table.
- schema : string, default None
- Name of SQL schema in database to query (if database flavor
- supports this). If specified, this overwrites the default
- schema of the SQL database object.
- chunksize : int, default None
- If specified, return an iterator where `chunksize` is the number
- of rows to include in each chunk.
- Returns
- -------
- DataFrame
- See Also
- --------
- pandas.read_sql_table
- SQLDatabase.read_query
- """
- table = SQLTable(table_name, self, index=index_col, schema=schema)
- return table.read(coerce_float=coerce_float,
- parse_dates=parse_dates, columns=columns,
- chunksize=chunksize)
- @staticmethod
- def _query_iterator(result, chunksize, columns, index_col=None,
- coerce_float=True, parse_dates=None):
- """Return generator through chunked result set"""
- while True:
- data = result.fetchmany(chunksize)
- if not data:
- break
- else:
- yield _wrap_result(data, columns, index_col=index_col,
- coerce_float=coerce_float,
- parse_dates=parse_dates)
- def read_query(self, sql, index_col=None, coerce_float=True,
- parse_dates=None, params=None, chunksize=None):
- """Read SQL query into a DataFrame.
- Parameters
- ----------
- sql : string
- SQL query to be executed.
- index_col : string, optional, default: None
- Column name to use as index for the returned DataFrame object.
- coerce_float : boolean, default True
- Attempt to convert values of non-string, non-numeric objects (like
- decimal.Decimal) to floating point, useful for SQL result sets.
- params : list, tuple or dict, optional, default: None
- List of parameters to pass to execute method. The syntax used
- to pass parameters is database driver dependent. Check your
- database driver documentation for which of the five syntax styles,
- described in PEP 249's paramstyle, is supported.
- Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}
- parse_dates : list or dict, default: None
- - List of column names to parse as dates.
- - Dict of ``{column_name: format string}`` where format string is
- strftime compatible in case of parsing string times, or is one of
- (D, s, ns, ms, us) in case of parsing integer timestamps.
- - Dict of ``{column_name: arg dict}``, where the arg dict
- corresponds to the keyword arguments of
- :func:`pandas.to_datetime` Especially useful with databases
- without native Datetime support, such as SQLite.
- chunksize : int, default None
- If specified, return an iterator where `chunksize` is the number
- of rows to include in each chunk.
- Returns
- -------
- DataFrame
- See Also
- --------
- read_sql_table : Read SQL database table into a DataFrame.
- read_sql
- """
- args = _convert_params(sql, params)
- result = self.execute(*args)
- columns = result.keys()
- if chunksize is not None:
- return self._query_iterator(result, chunksize, columns,
- index_col=index_col,
- coerce_float=coerce_float,
- parse_dates=parse_dates)
- else:
- data = result.fetchall()
- frame = _wrap_result(data, columns, index_col=index_col,
- coerce_float=coerce_float,
- parse_dates=parse_dates)
- return frame
- read_sql = read_query
- def to_sql(self, frame, name, if_exists='fail', index=True,
- index_label=None, schema=None, chunksize=None, dtype=None,
- method=None):
- """
- Write records stored in a DataFrame to a SQL database.
- Parameters
- ----------
- frame : DataFrame
- name : string
- Name of SQL table.
- if_exists : {'fail', 'replace', 'append'}, default 'fail'
- - fail: If table exists, do nothing.
- - replace: If table exists, drop it, recreate it, and insert data.
- - append: If table exists, insert data. Create if does not exist.
- index : boolean, default True
- Write DataFrame index as a column.
- index_label : string or sequence, default None
- Column label for index column(s). If None is given (default) and
- `index` is True, then the index names are used.
- A sequence should be given if the DataFrame uses MultiIndex.
- schema : string, default None
- Name of SQL schema in database to write to (if database flavor
- supports this). If specified, this overwrites the default
- schema of the SQLDatabase object.
- chunksize : int, default None
- If not None, then rows will be written in batches of this size at a
- time. If None, all rows will be written at once.
- dtype : single type or dict of column name to SQL type, default None
- Optional specifying the datatype for columns. The SQL type should
- be a SQLAlchemy type. If all columns are of the same type, one
- single value can be used.
- method : {None', 'multi', callable}, default None
- Controls the SQL insertion clause used:
- * None : Uses standard SQL ``INSERT`` clause (one per row).
- * 'multi': Pass multiple values in a single ``INSERT`` clause.
- * callable with signature ``(pd_table, conn, keys, data_iter)``.
- Details and a sample callable implementation can be found in the
- section :ref:`insert method <io.sql.method>`.
- .. versionadded:: 0.24.0
- """
- if dtype and not is_dict_like(dtype):
- dtype = {col_name: dtype for col_name in frame}
- if dtype is not None:
- from sqlalchemy.types import to_instance, TypeEngine
- for col, my_type in dtype.items():
- if not isinstance(to_instance(my_type), TypeEngine):
- raise ValueError('The type of {column} is not a '
- 'SQLAlchemy type '.format(column=col))
- table = SQLTable(name, self, frame=frame, index=index,
- if_exists=if_exists, index_label=index_label,
- schema=schema, dtype=dtype)
- table.create()
- table.insert(chunksize, method=method)
- if (not name.isdigit() and not name.islower()):
- # check for potentially case sensitivity issues (GH7815)
- # Only check when name is not a number and name is not lower case
- engine = self.connectable.engine
- with self.connectable.connect() as conn:
- table_names = engine.table_names(
- schema=schema or self.meta.schema,
- connection=conn,
- )
- if name not in table_names:
- msg = (
- "The provided table name '{0}' is not found exactly as "
- "such in the database after writing the table, possibly "
- "due to case sensitivity issues. Consider using lower "
- "case table names."
- ).format(name)
- warnings.warn(msg, UserWarning)
- @property
- def tables(self):
- return self.meta.tables
- def has_table(self, name, schema=None):
- return self.connectable.run_callable(
- self.connectable.dialect.has_table,
- name,
- schema or self.meta.schema,
- )
- def get_table(self, table_name, schema=None):
- schema = schema or self.meta.schema
- if schema:
- tbl = self.meta.tables.get('.'.join([schema, table_name]))
- else:
- tbl = self.meta.tables.get(table_name)
- # Avoid casting double-precision floats into decimals
- from sqlalchemy import Numeric
- for column in tbl.columns:
- if isinstance(column.type, Numeric):
- column.type.asdecimal = False
- return tbl
- def drop_table(self, table_name, schema=None):
- schema = schema or self.meta.schema
- if self.has_table(table_name, schema):
- self.meta.reflect(only=[table_name], schema=schema)
- self.get_table(table_name, schema).drop()
- self.meta.clear()
- def _create_sql_schema(self, frame, table_name, keys=None, dtype=None):
- table = SQLTable(table_name, self, frame=frame, index=False, keys=keys,
- dtype=dtype)
- return str(table.sql_schema())
- # ---- SQL without SQLAlchemy ---
- # sqlite-specific sql strings and handler class
- # dictionary used for readability purposes
- _SQL_TYPES = {
- 'string': 'TEXT',
- 'floating': 'REAL',
- 'integer': 'INTEGER',
- 'datetime': 'TIMESTAMP',
- 'date': 'DATE',
- 'time': 'TIME',
- 'boolean': 'INTEGER',
- }
- def _get_unicode_name(name):
- try:
- uname = text_type(name).encode("utf-8", "strict").decode("utf-8")
- except UnicodeError:
- raise ValueError(
- "Cannot convert identifier to UTF-8: '{name}'".format(name=name))
- return uname
- def _get_valid_sqlite_name(name):
- # See http://stackoverflow.com/questions/6514274/how-do-you-escape-strings\
- # -for-sqlite-table-column-names-in-python
- # Ensure the string can be encoded as UTF-8.
- # Ensure the string does not include any NUL characters.
- # Replace all " with "".
- # Wrap the entire thing in double quotes.
- uname = _get_unicode_name(name)
- if not len(uname):
- raise ValueError("Empty table or column name specified")
- nul_index = uname.find("\x00")
- if nul_index >= 0:
- raise ValueError('SQLite identifier cannot contain NULs')
- return '"' + uname.replace('"', '""') + '"'
- _SAFE_NAMES_WARNING = ("The spaces in these column names will not be changed. "
- "In pandas versions < 0.14, spaces were converted to "
- "underscores.")
- class SQLiteTable(SQLTable):
- """
- Patch the SQLTable for fallback support.
- Instead of a table variable just use the Create Table statement.
- """
- def __init__(self, *args, **kwargs):
- # GH 8341
- # register an adapter callable for datetime.time object
- import sqlite3
- # this will transform time(12,34,56,789) into '12:34:56.000789'
- # (this is what sqlalchemy does)
- sqlite3.register_adapter(time, lambda _: _.strftime("%H:%M:%S.%f"))
- super(SQLiteTable, self).__init__(*args, **kwargs)
- def sql_schema(self):
- return str(";\n".join(self.table))
- def _execute_create(self):
- with self.pd_sql.run_transaction() as conn:
- for stmt in self.table:
- conn.execute(stmt)
- def insert_statement(self):
- names = list(map(text_type, self.frame.columns))
- wld = '?' # wildcard char
- escape = _get_valid_sqlite_name
- if self.index is not None:
- [names.insert(0, idx) for idx in self.index[::-1]]
- bracketed_names = [escape(column) for column in names]
- col_names = ','.join(bracketed_names)
- wildcards = ','.join([wld] * len(names))
- insert_statement = \
- u'INSERT INTO {table} ({columns}) VALUES ({wld})'.format(
- table=escape(self.name), columns=col_names, wld=wildcards)
- return insert_statement
- def _execute_insert(self, conn, keys, data_iter):
- data_list = list(data_iter)
- conn.executemany(self.insert_statement(), data_list)
- def _create_table_setup(self):
- """
- Return a list of SQL statements that creates a table reflecting the
- structure of a DataFrame. The first entry will be a CREATE TABLE
- statement while the rest will be CREATE INDEX statements.
- """
- column_names_and_types = self._get_column_names_and_types(
- self._sql_type_name
- )
- pat = re.compile(r'\s+')
- column_names = [col_name for col_name, _, _ in column_names_and_types]
- if any(map(pat.search, column_names)):
- warnings.warn(_SAFE_NAMES_WARNING, stacklevel=6)
- escape = _get_valid_sqlite_name
- create_tbl_stmts = [escape(cname) + ' ' + ctype
- for cname, ctype, _ in column_names_and_types]
- if self.keys is not None and len(self.keys):
- if not is_list_like(self.keys):
- keys = [self.keys]
- else:
- keys = self.keys
- cnames_br = ", ".join(escape(c) for c in keys)
- create_tbl_stmts.append(
- "CONSTRAINT {tbl}_pk PRIMARY KEY ({cnames_br})".format(
- tbl=self.name, cnames_br=cnames_br))
- create_stmts = ["CREATE TABLE " + escape(self.name) + " (\n" +
- ',\n '.join(create_tbl_stmts) + "\n)"]
- ix_cols = [cname for cname, _, is_index in column_names_and_types
- if is_index]
- if len(ix_cols):
- cnames = "_".join(ix_cols)
- cnames_br = ",".join(escape(c) for c in ix_cols)
- create_stmts.append(
- "CREATE INDEX " + escape("ix_" + self.name + "_" + cnames) +
- "ON " + escape(self.name) + " (" + cnames_br + ")")
- return create_stmts
- def _sql_type_name(self, col):
- dtype = self.dtype or {}
- if col.name in dtype:
- return dtype[col.name]
- # Infer type of column, while ignoring missing values.
- # Needed for inserting typed data containing NULLs, GH 8778.
- col_type = lib.infer_dtype(col, skipna=True)
- if col_type == 'timedelta64':
- warnings.warn("the 'timedelta' type is not supported, and will be "
- "written as integer values (ns frequency) to the "
- "database.", UserWarning, stacklevel=8)
- col_type = "integer"
- elif col_type == "datetime64":
- col_type = "datetime"
- elif col_type == "empty":
- col_type = "string"
- elif col_type == "complex":
- raise ValueError('Complex datatypes not supported')
- if col_type not in _SQL_TYPES:
- col_type = "string"
- return _SQL_TYPES[col_type]
- class SQLiteDatabase(PandasSQL):
- """
- Version of SQLDatabase to support SQLite connections (fallback without
- SQLAlchemy). This should only be used internally.
- Parameters
- ----------
- con : sqlite connection object
- """
- def __init__(self, con, is_cursor=False):
- self.is_cursor = is_cursor
- self.con = con
- @contextmanager
- def run_transaction(self):
- cur = self.con.cursor()
- try:
- yield cur
- self.con.commit()
- except Exception:
- self.con.rollback()
- raise
- finally:
- cur.close()
- def execute(self, *args, **kwargs):
- if self.is_cursor:
- cur = self.con
- else:
- cur = self.con.cursor()
- try:
- if kwargs:
- cur.execute(*args, **kwargs)
- else:
- cur.execute(*args)
- return cur
- except Exception as exc:
- try:
- self.con.rollback()
- except Exception: # pragma: no cover
- ex = DatabaseError(
- "Execution failed on sql: {sql}\n{exc}\nunable "
- "to rollback".format(sql=args[0], exc=exc))
- raise_with_traceback(ex)
- ex = DatabaseError(
- "Execution failed on sql '{sql}': {exc}".format(
- sql=args[0], exc=exc))
- raise_with_traceback(ex)
- @staticmethod
- def _query_iterator(cursor, chunksize, columns, index_col=None,
- coerce_float=True, parse_dates=None):
- """Return generator through chunked result set"""
- while True:
- data = cursor.fetchmany(chunksize)
- if type(data) == tuple:
- data = list(data)
- if not data:
- cursor.close()
- break
- else:
- yield _wrap_result(data, columns, index_col=index_col,
- coerce_float=coerce_float,
- parse_dates=parse_dates)
- def read_query(self, sql, index_col=None, coerce_float=True, params=None,
- parse_dates=None, chunksize=None):
- args = _convert_params(sql, params)
- cursor = self.execute(*args)
- columns = [col_desc[0] for col_desc in cursor.description]
- if chunksize is not None:
- return self._query_iterator(cursor, chunksize, columns,
- index_col=index_col,
- coerce_float=coerce_float,
- parse_dates=parse_dates)
- else:
- data = self._fetchall_as_list(cursor)
- cursor.close()
- frame = _wrap_result(data, columns, index_col=index_col,
- coerce_float=coerce_float,
- parse_dates=parse_dates)
- return frame
- def _fetchall_as_list(self, cur):
- result = cur.fetchall()
- if not isinstance(result, list):
- result = list(result)
- return result
- def to_sql(self, frame, name, if_exists='fail', index=True,
- index_label=None, schema=None, chunksize=None, dtype=None,
- method=None):
- """
- Write records stored in a DataFrame to a SQL database.
- Parameters
- ----------
- frame: DataFrame
- name: string
- Name of SQL table.
- if_exists: {'fail', 'replace', 'append'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if it does not exist.
- index : boolean, default True
- Write DataFrame index as a column
- index_label : string or sequence, default None
- Column label for index column(s). If None is given (default) and
- `index` is True, then the index names are used.
- A sequence should be given if the DataFrame uses MultiIndex.
- schema : string, default None
- Ignored parameter included for compatibility with SQLAlchemy
- version of ``to_sql``.
- chunksize : int, default None
- If not None, then rows will be written in batches of this
- size at a time. If None, all rows will be written at once.
- dtype : single type or dict of column name to SQL type, default None
- Optional specifying the datatype for columns. The SQL type should
- be a string. If all columns are of the same type, one single value
- can be used.
- method : {None, 'multi', callable}, default None
- Controls the SQL insertion clause used:
- * None : Uses standard SQL ``INSERT`` clause (one per row).
- * 'multi': Pass multiple values in a single ``INSERT`` clause.
- * callable with signature ``(pd_table, conn, keys, data_iter)``.
- Details and a sample callable implementation can be found in the
- section :ref:`insert method <io.sql.method>`.
- .. versionadded:: 0.24.0
- """
- if dtype and not is_dict_like(dtype):
- dtype = {col_name: dtype for col_name in frame}
- if dtype is not None:
- for col, my_type in dtype.items():
- if not isinstance(my_type, str):
- raise ValueError('{column} ({type!s}) not a string'.format(
- column=col, type=my_type))
- table = SQLiteTable(name, self, frame=frame, index=index,
- if_exists=if_exists, index_label=index_label,
- dtype=dtype)
- table.create()
- table.insert(chunksize, method)
- def has_table(self, name, schema=None):
- # TODO(wesm): unused?
- # escape = _get_valid_sqlite_name
- # esc_name = escape(name)
- wld = '?'
- query = ("SELECT name FROM sqlite_master "
- "WHERE type='table' AND name={wld};").format(wld=wld)
- return len(self.execute(query, [name, ]).fetchall()) > 0
- def get_table(self, table_name, schema=None):
- return None # not supported in fallback mode
- def drop_table(self, name, schema=None):
- drop_sql = "DROP TABLE {name}".format(
- name=_get_valid_sqlite_name(name))
- self.execute(drop_sql)
- def _create_sql_schema(self, frame, table_name, keys=None, dtype=None):
- table = SQLiteTable(table_name, self, frame=frame, index=False,
- keys=keys, dtype=dtype)
- return str(table.sql_schema())
- def get_schema(frame, name, keys=None, con=None, dtype=None):
- """
- Get the SQL db table schema for the given frame.
- Parameters
- ----------
- frame : DataFrame
- name : string
- name of SQL table
- keys : string or sequence, default: None
- columns to use a primary key
- con: an open SQL database connection object or a SQLAlchemy connectable
- Using SQLAlchemy makes it possible to use any DB supported by that
- library, default: None
- If a DBAPI2 object, only sqlite3 is supported.
- dtype : dict of column name to SQL type, default None
- Optional specifying the datatype for columns. The SQL type should
- be a SQLAlchemy type, or a string for sqlite3 fallback connection.
- """
- pandas_sql = pandasSQL_builder(con=con)
- return pandas_sql._create_sql_schema(frame, name, keys=keys, dtype=dtype)
|