123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Wed Dec 9 10:27:39 2020
- @author: tanya
- """
- from typing import Union, Iterable, Tuple, List, NewType
- import pandas as pd
- import numpy as np
- from itertools import accumulate, repeat, takewhile, chain
- from cdplib.log import Log
- CVType = NewType("CVType", Iterable[Tuple[List]])
- DataSetType = NewType("DataSetType",
- Union[pd.DataFrame, pd.Series, np.ndarray, List])
- class CVComposer:
- """
- Groups methods for composing cv objects
- that follow standards from sklearn,
- these cv objects can be passed to algorithms like gridsearch, etc
- """
- def __init__(self):
- """
- """
- self._logger = Log("CVComposer: ")
- def dummy_cv(
- self,
- train_set_size: Union[int, None] = None,
- train_index: Union[pd.Series, np.ndarray, None] = None,
- test_set_size: Union[int, None] = None,
- test_index: DataSetType = None) -> CVType:
- """
- """
- assert((train_index is None) != (train_set_size is None)),\
- "Set train_index or train_set_size"
- assert((test_index is None) != (test_set_size is None)),\
- "Set train_index or train_set_size"
- train_index = train_index if (train_index is not None)\
- else list(range(train_set_size))
- test_index = test_index if (test_index is not None)\
- else list(range(train_set_size, train_set_size + test_set_size))
- return [(train_index, test_index)]
- def dummy_cv_and_concatenated_data_set(
- self,
- X_train: DataSetType,
- X_test: DataSetType,
- y_train: Union[DataSetType, None] = None,
- y_test: Union[DataSetType, None] = None)\
- -> Tuple[DataSetType, DataSetType, CVType]:
- """
- """
- assert((y_test is None) == (y_train is None))
- use_index = (isinstance(X_train, pd.DataFrame) and
- isinstance(X_test, pd.DataFrame) and
- (len(set(X_train.index) and set(X_test.index)) == 0))
- if use_index:
- cv = self.dummy_cv(train_set_index=X_train.index,
- test_set_index=X_test.index)
- X = pd.concat([X_train, X_test], ignore_index=False, axis=0)
- else:
- cv = self.dummy_cv(train_set_size=len(X_train),
- test_set_size=len(X_test))
- X = np.concatenate([X_train, X_test])
- use_target_index = use_index and (
- isinstance(y_train, pd.Series) and
- isinstance(y_test, pd.Series) and
- (X_train.index.equals(y_train.index)) and
- (X_test.index.equals(y_test.index)))
- if use_target_index:
- y = pd.concat([y_train, y_test], ignore_index=False, axis=0)
- else:
- y = np.concatenate([y_train, y_test]) if (y_train is not None)\
- else None
- result_to_np = (
- (isinstance(X_train, pd.DataFrame) !=
- isinstance(X_test, pd.DataFrame)) or
- (isinstance(X_train, pd.DataFrame)) and
- (len(set(X_train.index) and set(X_test.index)) != 0))
- if result_to_np:
- self._logger.log_and_throw_warning(
- "The concatenated dataframe is converted to numpy")
- return cv, X, y
- def expanding_cv(self, test_proportion: float,
- start_train_proportion: float,
- step_proportion: float = None,
- expanding_test_size: bool = False,
- data_set_size: Union[float, None] = None,
- index: Union[pd.Series, np.ndarray, list, None] = None)\
- -> Union[Iterable[Tuple[List]], None]:
- """
- """
- try:
- assert((index is None) != (data_set_size is None)),\
- "Set index or data_set_size"
- index = pd.Series(index) if (index is not None)\
- else pd.Series(range(data_set_size))
- data_set_size = data_set_size or len(index)
- start_train_size = int(start_train_proportion * data_set_size)
- step_size = int(step_proportion * data_set_size)
- test_size = int(test_proportion * data_set_size)
- train_inds_set = (list(range(train_size))
- for train_size in
- takewhile(
- lambda x: x <= data_set_size - test_size,
- accumulate(repeat(start_train_size),
- lambda x, _: x + step_size)))
- for train_inds in train_inds_set:
- if expanding_test_size:
- yield (index[train_inds],
- index[train_inds[-1] + 1:
- train_inds[-1] + 1
- + int(test_proportion*len(train_inds))])
- else:
- yield (index[train_inds],
- index[train_inds[-1] + 1:
- train_inds[-1] + 1 + test_size])
- except Exception as e:
- self._logger.log_and_raise_error(("Failed to make expanding cv. "
- "Exit with error: {}".format(e)))
- def sliding_window_cv(
- self,
- test_proportion: float,
- train_proportion: float,
- step_proportion: float = None,
- data_set_size: Union[float, None] = None,
- index: Union[pd.Series, np.ndarray, list, None] = None)\
- -> Union[Iterable[Tuple[List]], None]:
- """
- """
- try:
- assert((index is None) != (data_set_size is None)),\
- "Set index or data_set_size"
- index = pd.Series(index) if (index is not None)\
- else pd.Series(range(data_set_size))
- data_set_size = data_set_size or len(index)
- train_size = int(train_proportion * data_set_size)
- test_size = int(test_proportion * data_set_size)
- step_size = int(step_proportion * data_set_size)
- train_sizes = takewhile(lambda x: x <= data_set_size - test_size,
- accumulate(repeat(train_size),
- lambda x, _: x + step_size))
- train_starts = takewhile(lambda x: x <= data_set_size
- - train_size - test_size,
- accumulate(repeat(step_size),
- lambda x, _: x + step_size))
- train_starts = chain([0], train_starts)
- train_inds_set = list(range(train_start, train_size)
- for train_start, train_size in
- zip(train_starts, train_sizes))
- cv = ((index[train_inds], index[train_inds[-1] + 1:
- train_inds[-1] + 1 + test_size])
- for train_inds in train_inds_set)
- return cv
- except Exception as e:
- self._logger.log_and_raise_error(
- ("Failed to make sliding window cv. "
- "Exit with error: {}".format(e)))
-
- def nested_expanding_cv(self,
- test_proportion: float,
- start_train_proportion: float,
- step_proportion: float = None,
- expanding_test_size: bool = False,
- data_set_size: Union[float, None] = None,
- index: Union[pd.Series, np.ndarray, list, None] = None)\
- -> Iterable[Tuple[List]]:
- """
- """
- logger = Log("make_nested_expanding_cv:")
-
- try:
- cv = self.expanding_cv(test_proportion=test_proportion,
- start_train_proportion=start_train_proportion,
- step_proportion=step_proportion,
- expanding_test_size=expanding_test_size,
- data_set_size=data_set_size,
- index=index)
-
- nested_cv = []
-
- for train_inds, test_inds in cv:
-
- fold_index = train_inds if index is not None\
- else None
-
- fold_size = len(train_inds) if index is None else None
-
- fold_cv = self.expanding_cv(
- test_proportion=test_proportion,
- start_train_proportion=start_train_proportion,
- step_proportion=step_proportion,
- expanding_test_size=expanding_test_size,
- data_set_size=fold_size,
- index=fold_index)
-
- nested_cv.append(list(fold_cv))
-
- return nested_cv
-
- except Exception as e:
- logger.log_and_raise_error(("Failed to make nested expanding cv. "
- "Exit with error: {}".format(e)))
-
-
- def cv_slice_dataset(self, X, y, train_inds, test_inds)\
- -> Tuple[Union[pd.DataFrame, np.ndarray],
- Union[pd.Series, np.ndarray]]:
- """
- """
- if isinstance(X, pd.DataFrame):
- X_train = X.loc[train_inds]
- X_val = X.loc[test_inds]
- else:
- X_train = X[train_inds]
- X_val = X[test_inds]
-
- if y is not None:
- y_train = y[train_inds]
- y_val = y[test_inds]
-
- return X_train, X_val, y_train, y_val
|