parquet.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. """ parquet compat """
  2. from distutils.version import LooseVersion
  3. from warnings import catch_warnings
  4. from pandas.compat import string_types
  5. from pandas.errors import AbstractMethodError
  6. from pandas import DataFrame, get_option
  7. from pandas.io.common import get_filepath_or_buffer, is_s3_url
  8. def get_engine(engine):
  9. """ return our implementation """
  10. if engine == 'auto':
  11. engine = get_option('io.parquet.engine')
  12. if engine == 'auto':
  13. # try engines in this order
  14. try:
  15. return PyArrowImpl()
  16. except ImportError:
  17. pass
  18. try:
  19. return FastParquetImpl()
  20. except ImportError:
  21. pass
  22. raise ImportError("Unable to find a usable engine; "
  23. "tried using: 'pyarrow', 'fastparquet'.\n"
  24. "pyarrow or fastparquet is required for parquet "
  25. "support")
  26. if engine not in ['pyarrow', 'fastparquet']:
  27. raise ValueError("engine must be one of 'pyarrow', 'fastparquet'")
  28. if engine == 'pyarrow':
  29. return PyArrowImpl()
  30. elif engine == 'fastparquet':
  31. return FastParquetImpl()
  32. class BaseImpl(object):
  33. api = None # module
  34. @staticmethod
  35. def validate_dataframe(df):
  36. if not isinstance(df, DataFrame):
  37. raise ValueError("to_parquet only supports IO with DataFrames")
  38. # must have value column names (strings only)
  39. if df.columns.inferred_type not in {'string', 'unicode'}:
  40. raise ValueError("parquet must have string column names")
  41. # index level names must be strings
  42. valid_names = all(
  43. isinstance(name, string_types)
  44. for name in df.index.names
  45. if name is not None
  46. )
  47. if not valid_names:
  48. raise ValueError("Index level names must be strings")
  49. def write(self, df, path, compression, **kwargs):
  50. raise AbstractMethodError(self)
  51. def read(self, path, columns=None, **kwargs):
  52. raise AbstractMethodError(self)
  53. class PyArrowImpl(BaseImpl):
  54. def __init__(self):
  55. # since pandas is a dependency of pyarrow
  56. # we need to import on first use
  57. try:
  58. import pyarrow
  59. import pyarrow.parquet
  60. except ImportError:
  61. raise ImportError(
  62. "pyarrow is required for parquet support\n\n"
  63. "you can install via conda\n"
  64. "conda install pyarrow -c conda-forge\n"
  65. "\nor via pip\n"
  66. "pip install -U pyarrow\n"
  67. )
  68. if LooseVersion(pyarrow.__version__) < '0.9.0':
  69. raise ImportError(
  70. "pyarrow >= 0.9.0 is required for parquet support\n\n"
  71. "you can install via conda\n"
  72. "conda install pyarrow -c conda-forge\n"
  73. "\nor via pip\n"
  74. "pip install -U pyarrow\n"
  75. )
  76. self.api = pyarrow
  77. def write(self, df, path, compression='snappy',
  78. coerce_timestamps='ms', index=None, partition_cols=None,
  79. **kwargs):
  80. self.validate_dataframe(df)
  81. path, _, _, _ = get_filepath_or_buffer(path, mode='wb')
  82. if index is None:
  83. from_pandas_kwargs = {}
  84. else:
  85. from_pandas_kwargs = {'preserve_index': index}
  86. table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
  87. if partition_cols is not None:
  88. self.api.parquet.write_to_dataset(
  89. table, path, compression=compression,
  90. coerce_timestamps=coerce_timestamps,
  91. partition_cols=partition_cols, **kwargs)
  92. else:
  93. self.api.parquet.write_table(
  94. table, path, compression=compression,
  95. coerce_timestamps=coerce_timestamps, **kwargs)
  96. def read(self, path, columns=None, **kwargs):
  97. path, _, _, should_close = get_filepath_or_buffer(path)
  98. kwargs['use_pandas_metadata'] = True
  99. result = self.api.parquet.read_table(path, columns=columns,
  100. **kwargs).to_pandas()
  101. if should_close:
  102. try:
  103. path.close()
  104. except: # noqa: flake8
  105. pass
  106. return result
  107. class FastParquetImpl(BaseImpl):
  108. def __init__(self):
  109. # since pandas is a dependency of fastparquet
  110. # we need to import on first use
  111. try:
  112. import fastparquet
  113. except ImportError:
  114. raise ImportError(
  115. "fastparquet is required for parquet support\n\n"
  116. "you can install via conda\n"
  117. "conda install fastparquet -c conda-forge\n"
  118. "\nor via pip\n"
  119. "pip install -U fastparquet"
  120. )
  121. if LooseVersion(fastparquet.__version__) < '0.2.1':
  122. raise ImportError(
  123. "fastparquet >= 0.2.1 is required for parquet "
  124. "support\n\n"
  125. "you can install via conda\n"
  126. "conda install fastparquet -c conda-forge\n"
  127. "\nor via pip\n"
  128. "pip install -U fastparquet"
  129. )
  130. self.api = fastparquet
  131. def write(self, df, path, compression='snappy', index=None,
  132. partition_cols=None, **kwargs):
  133. self.validate_dataframe(df)
  134. # thriftpy/protocol/compact.py:339:
  135. # DeprecationWarning: tostring() is deprecated.
  136. # Use tobytes() instead.
  137. if 'partition_on' in kwargs and partition_cols is not None:
  138. raise ValueError("Cannot use both partition_on and "
  139. "partition_cols. Use partition_cols for "
  140. "partitioning data")
  141. elif 'partition_on' in kwargs:
  142. partition_cols = kwargs.pop('partition_on')
  143. if partition_cols is not None:
  144. kwargs['file_scheme'] = 'hive'
  145. if is_s3_url(path):
  146. # path is s3:// so we need to open the s3file in 'wb' mode.
  147. # TODO: Support 'ab'
  148. path, _, _, _ = get_filepath_or_buffer(path, mode='wb')
  149. # And pass the opened s3file to the fastparquet internal impl.
  150. kwargs['open_with'] = lambda path, _: path
  151. else:
  152. path, _, _, _ = get_filepath_or_buffer(path)
  153. with catch_warnings(record=True):
  154. self.api.write(path, df, compression=compression,
  155. write_index=index, partition_on=partition_cols,
  156. **kwargs)
  157. def read(self, path, columns=None, **kwargs):
  158. if is_s3_url(path):
  159. # When path is s3:// an S3File is returned.
  160. # We need to retain the original path(str) while also
  161. # pass the S3File().open function to fsatparquet impl.
  162. s3, _, _, should_close = get_filepath_or_buffer(path)
  163. try:
  164. parquet_file = self.api.ParquetFile(path, open_with=s3.s3.open)
  165. finally:
  166. s3.close()
  167. else:
  168. path, _, _, _ = get_filepath_or_buffer(path)
  169. parquet_file = self.api.ParquetFile(path)
  170. return parquet_file.to_pandas(columns=columns, **kwargs)
  171. def to_parquet(df, path, engine='auto', compression='snappy', index=None,
  172. partition_cols=None, **kwargs):
  173. """
  174. Write a DataFrame to the parquet format.
  175. Parameters
  176. ----------
  177. path : str
  178. File path or Root Directory path. Will be used as Root Directory path
  179. while writing a partitioned dataset.
  180. .. versionchanged:: 0.24.0
  181. engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
  182. Parquet library to use. If 'auto', then the option
  183. ``io.parquet.engine`` is used. The default ``io.parquet.engine``
  184. behavior is to try 'pyarrow', falling back to 'fastparquet' if
  185. 'pyarrow' is unavailable.
  186. compression : {'snappy', 'gzip', 'brotli', None}, default 'snappy'
  187. Name of the compression to use. Use ``None`` for no compression.
  188. index : bool, default None
  189. If ``True``, include the dataframe's index(es) in the file output. If
  190. ``False``, they will not be written to the file. If ``None``, the
  191. engine's default behavior will be used.
  192. .. versionadded 0.24.0
  193. partition_cols : list, optional, default None
  194. Column names by which to partition the dataset
  195. Columns are partitioned in the order they are given
  196. .. versionadded:: 0.24.0
  197. kwargs
  198. Additional keyword arguments passed to the engine
  199. """
  200. impl = get_engine(engine)
  201. return impl.write(df, path, compression=compression, index=index,
  202. partition_cols=partition_cols, **kwargs)
  203. def read_parquet(path, engine='auto', columns=None, **kwargs):
  204. """
  205. Load a parquet object from the file path, returning a DataFrame.
  206. .. versionadded 0.21.0
  207. Parameters
  208. ----------
  209. path : string
  210. File path
  211. columns : list, default=None
  212. If not None, only these columns will be read from the file.
  213. .. versionadded 0.21.1
  214. engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
  215. Parquet library to use. If 'auto', then the option
  216. ``io.parquet.engine`` is used. The default ``io.parquet.engine``
  217. behavior is to try 'pyarrow', falling back to 'fastparquet' if
  218. 'pyarrow' is unavailable.
  219. kwargs are passed to the engine
  220. Returns
  221. -------
  222. DataFrame
  223. """
  224. impl = get_engine(engine)
  225. return impl.read(path, columns=columns, **kwargs)