# Copyright 2020 QuantumBlack Visual Analytics Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
# NONINFRINGEMENT. IN NO EVENT WILL THE LICENSOR OR OTHER CONTRIBUTORS
# BE LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF, OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# The QuantumBlack Visual Analytics Limited ("QuantumBlack") name and logo
# (either separately or in combination, "QuantumBlack Trademarks") are
# trademarks of QuantumBlack. The License does not grant you any right or
# license to the QuantumBlack Trademarks. You may not use the QuantumBlack
# Trademarks or any confusingly similar mark as a trademark for your product,
# or use the QuantumBlack Trademarks in any other manner that might cause
# confusion in the marketplace, including but not limited to in advertising,
# on websites, or on software.
#
# See the License for the specific language governing permissions and
# limitations under the License.
"""``ParquetLocalDataSet`` is a data set used to load and save
data to local parquet files. It uses the ``pyarrow`` implementation,
which allows for multiple parts, compression, column selection etc.
Documentation on the PyArrow library features, compatibility
list and known caveats can also be found on their official guide at:
https://arrow.apache.org/docs/python/index.html
"""
import copy
from pathlib import Path
from typing import Any, Dict
import pandas as pd
from kedro.io.core import AbstractVersionedDataSet, Version, deprecation_warning
[docs]class ParquetLocalDataSet(AbstractVersionedDataSet):
"""``AbstractDataSet`` with functionality for handling local parquet files.
Example:
::
>>> from kedro.io import ParquetLocalDataSet
>>> import pandas as pd
>>>
>>> data = pd.DataFrame({'col1': [1, 2], 'col2': [4, 5],
>>> 'col3': [5, 6]})
>>> data_set = ParquetLocalDataSet('myFile')
>>> data_set.save(data)
>>> loaded_data = data_set.load()
>>> assert data.equals(loaded_data)
"""
DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {"compression": None} # type: Dict[str, Any]
# pylint: disable=too-many-arguments
[docs] def __init__(
self,
filepath: str,
engine: str = "auto",
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
version: Version = None,
) -> None:
"""Creates a new instance of ``ParquetLocalDataSet`` pointing to a
concrete filepath.
Args:
filepath: Path to a parquet file or a metadata file of a multipart
parquet collection or the directory of a multipart parquet.
engine: The engine to use, one of: `auto`, `fastparquet`,
`pyarrow`. If `auto`, then the default behavior is to try
`pyarrow`, falling back to `fastparquet` if `pyarrow` is
unavailable.
load_args: Additional loading options `pyarrow`:
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html
or `fastparquet`:
https://fastparquet.readthedocs.io/en/latest/api.html#fastparquet.ParquetFile.to_pandas
save_args: Additional saving options for `pyarrow`:
https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.from_pandas
or `fastparquet`:
https://fastparquet.readthedocs.io/en/latest/api.html#fastparquet.write
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
"""
deprecation_warning(self.__class__.__name__)
super().__init__(Path(filepath), version)
self._engine = engine
# Handle default load and save arguments
self._load_args = copy.deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = copy.deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)
def _describe(self) -> Dict[str, Any]:
return dict(
filepath=self._filepath,
engine=self._engine,
load_args=self._load_args,
save_args=self._save_args,
version=self._version,
)
def _load(self) -> pd.DataFrame:
load_path = Path(self._get_load_path())
return pd.read_parquet(load_path, engine=self._engine, **self._load_args)
def _save(self, data: pd.DataFrame) -> None:
save_path = Path(self._get_save_path())
save_path.parent.mkdir(parents=True, exist_ok=True)
data.to_parquet(save_path, engine=self._engine, **self._save_args)
def _exists(self) -> bool:
path = self._get_load_path()
return Path(path).is_file()