#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Wed Dec 9 10:27:39 2020 @author: tanya """ from typing import Union, Iterable, Tuple, List, NewType import pandas as pd import numpy as np from itertools import accumulate, repeat, takewhile, chain from cdplib.log import Log CVType = NewType("CVType", Iterable[Tuple[List]]) DataSetType = NewType("DataSetType", Union[pd.DataFrame, pd.Sereis, np.ndarray, List]) class CVComposer: """ Groups methods for composing cv objects that follow standards from sklearn, these cv objects can be passed to algorithms like gridsearch, etc """ def __init__(self): """ """ self._logger = Log("CVComposer: ") def dummy_cv( self, train_set_size: Union[int, None] = None, train_index: Union[pd.Series, np.ndarray, None] = None, test_set_size: Union[int, None] = None, test_index: DataSetType = None) -> CVType: """ """ assert((train_index is None) != (train_set_size is None)),\ "Set train_index or train_set_size" assert((test_index is None) != (test_set_size is None)),\ "Set train_index or train_set_size" train_index = train_index if (train_index is not None)\ else list(range(train_set_size)) test_index = test_index if (test_index is not None)\ else list(range(train_set_size, train_set_size + test_set_size)) return [(train_index, test_index)] def dummy_cv_and_concatenated_data_set( self, X_train: DataSetType, y_train: Union[DataSetType, None] = None, X_test: DataSetType, y_test: Union[DataSetType, None] = None)\ -> Tuple[DataSetType, DataSetType, CVType]: """ """ assert((y_test is None) == (y_train is None)) use_index = (isinstance(X_train, pd.DataFrame) and isinstance(X_test, pd.DataFrame) and (len(set(X_train.index) and set(X_test.index)) == 0)) if use_index: cv = self.dummy_cv(train_index=X_train.index, test_index=X_test.index) X = pd.concat([X_train, X_test], ignore_index=False, axis=0) else: cv = self.dummy_cv(train_size=len(X_train), test_size=len(X_test)) X = np.concatenate([X_train, X_test]) use_target_index = use_index and ( isinstance(y_train, pd.Series) and isinstance(y_test, pd.Series) and (X_train.index.equals(y_train.index)) and (X_test.index.equals(y_test.index))) if use_target_index: y = pd.concat([y_train, y_test], ignore_index=False, axis=0) else: y = np.concatenate([y_train, y_test]) if (y_train is not None)\ else None result_to_np = ( (isinstance(X_train, pd.DataFrame) != isinstance(X_test, pd.DataFrame)) or (isinstance(X_train, pd.DataFrame)) and (len(set(X_train.index) and set(X_test.index)) != 0)) if result_to_np: self._logger.log_and_throw_warning( "The concatenated dataframe is converted to numpy") return cv, X, y def expanding_cv(self, test_proportion: float, start_train_proportion: float, step_proportion: float = None, expanding_test_size: bool = False, data_set_size: Union[float, None] = None, index: Union[pd.Series, np.ndarray, list, None] = None)\ -> Union[Iterable[Tuple[List]], None]: """ """ try: assert((index is None) != (data_set_size is None)),\ "Set index or data_set_size" index = pd.Series(index) if (index is not None)\ else pd.Series(range(data_set_size)) data_set_size = data_set_size or len(index) start_train_size = int(start_train_proportion * data_set_size) step_size = int(step_proportion * data_set_size) test_size = int(test_proportion * data_set_size) train_inds_set = (list(range(train_size)) for train_size in takewhile( lambda x: x <= data_set_size - test_size, accumulate(repeat(start_train_size), lambda x, _: x + step_size))) for train_inds in train_inds_set: if expanding_test_size: yield (index[train_inds], index[train_inds[-1] + 1: train_inds[-1] + 1 + int(test_proportion*len(train_inds))]) else: yield (index[train_inds], index[train_inds[-1] + 1: train_inds[-1] + 1 + test_size]) except Exception as e: self._logger.log_and_raise_error(("Failed to make expanding cv. " "Exit with error: {}".format(e))) def sliding_window_cv( self, test_proportion: float, train_proportion: float, step_proportion: float = None, data_set_size: Union[float, None] = None, index: Union[pd.Series, np.ndarray, list, None] = None)\ -> Union[Iterable[Tuple[List]], None]: """ """ try: assert((index is None) != (data_set_size is None)),\ "Set index or data_set_size" index = pd.Series(index) if (index is not None)\ else pd.Series(range(data_set_size)) data_set_size = data_set_size or len(index) train_size = int(train_proportion * data_set_size) test_size = int(test_proportion * data_set_size) step_size = int(step_proportion * data_set_size) train_sizes = takewhile(lambda x: x <= data_set_size - test_size, accumulate(repeat(train_size), lambda x, _: x + step_size)) train_starts = takewhile(lambda x: x <= data_set_size - train_size - test_size, accumulate(repeat(step_size), lambda x, _: x + step_size)) train_starts = chain([0], train_starts) train_inds_set = list(range(train_start, train_size) for train_start, train_size in zip(train_starts, train_sizes)) cv = ((index[train_inds], index[train_inds[-1] + 1: train_inds[-1] + 1 + test_size]) for train_inds in train_inds_set) return cv except Exception as e: self._logger.log_and_raise_error( ("Failed to make sliding window cv. " "Exit with error: {}".format(e)))