123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Thu Oct 29 13:58:23 2020
- @author: tanya
- @description:
- * Input:
- - pipeline/hyperparameter space
- - data_train
- - cv
- - cv_folds
- * For each pipeline:
- -> Split data_train into folds according to cv
- -> For each fold:
- => get data_train_fold, data_test_fold, cv_fold
- => split data_train_fold into subfolds according to cv_fold
- => For each subfold:
- ==> get data_train_subfold, data_test_subfold
- ==> train pipeline on data_train_subfold
- ==> find best_threshold_subfold on data_test_subfold
- => Find averaged_threshold_fold averaged over best_threshold_subfold
- => train pipeline on data_train_fold
- => find score_fold on data_test_fold with proba_threshold_fold
- => find best_threshold_fold on data_test_fold
- -> find score averaged over score_fold
- -> find averaged_threshold averaged over best_threshold_fold
- * choose (pipeline/hyperparameters, threshold) in the space with best score
- """
- import sys
- import pandas as pd
- import numpy as np
- from itertools import zip_longest
- if sys.version_info >= (3, 8):
- from typing import Callable, Dict, Iterable, Union
- else:
- from typing_extensions import Callable, Dict, Iterable, Union
- from copy import deepcopy
- from sklearn.model_selection import StratifiedKFold
- from cdplib.log import Log
- from cdplib.ml_validation.CVComposer import CVComposer
- # TODO: write with yield !!!!
- def get_optimal_proba_threshold(score_func: Callable,
- y_true: Union[pd.Series, np.ndarray],
- proba: Union[pd.Series, np.ndarray],
- threshold_set: Union[Iterable, None] = None):
- """
- """
- scores = {}
- if threshold_set is None:
- threshold_set = np.arange(0, 1, 0.1)
- for threshold in threshold_set:
- y_pred = (proba >= threshold).astype(int)
- scores[threshold] = score_func(y_true, y_pred)
- return max(scores, key=scores.get)
- def cross_validate_with_optimal_threshold(
- estimator: object,
- score_func: Callable,
- X_train: Union[pd.DataFrame, np.ndarray],
- y_train: Union[pd.Series, np.ndarray, None] = None,
- X_val: Union[pd.DataFrame, np.ndarray, None] = None,
- y_val: Union[pd.Series, np.ndarray, None] = None,
- X_val_threshold: Union[pd.DataFrame, np.ndarray, None] = None,
- y_val_threshold: Union[pd.Series, np.ndarray, None] = None,
- cv: Union[Iterable, int, None] = None,
- cv_threshold: Union[Iterable, int, None] = None,
- additional_metrics: Union[Dict[str, Callable], None] = None,
- threshold_set: Union[Iterable, None] = None,
- scores: Dict = None)\
- -> Dict:
- """
- """
- logger = Log("cross_validate_with_optimal_threshold:")
- X_train = deepcopy(X_train)
- y_train = deepcopy(y_train)
- X_val = deepcopy(X_val)
- y_val = deepcopy(y_val)
- X_val_threshold = deepcopy(X_val_threshold)
- y_val_threshold = deepcopy(y_val_threshold)
- scores = scores or {"test_threshold": [],
- "test_score": [],
- "train_score": []}
- additional_metrics = additional_metrics or {}
- for metric_name, metric in additional_metrics.items():
- if "test_" + metric_name not in scores:
- scores["test_" + metric_name] = []
- scores["train_" + metric_name] = []
- if cv is None:
- # test score is calculated on X_vals
- assert((X_val is not None) and (y_val is not None)),\
- "Validation set must be set"
- if cv_threshold is None:
- refit = (X_val_threshold is not None)
- # if a validation set for proba threshold tuning is not given,
- # we use the validation set on which we calculate the test score
- # (this might lead to overfitting)
- X_val_threshold = X_val_threshold if refit else deepcopy(X_val)
- y_val_threshold = y_val_threshold if refit else deepcopy(y_val)
- cv_threshold, X_train, y_train =\
- CVComposer().dummy_cv_and_concatenated_data_set(
- X_train=X_train,
- X_test=X_val_threshold,
- y_train=y_train,
- y_test=y_val_threshold)
- else:
- # if cv_threshold is given, we find the optimal threshold
- # on each fold and output the average value for the threshold
- if (X_val_threshold is not None):
- logger.log_and_throw_warning((
- "X_val_threshold is set "
- "but cv_threshold will be used"))
- if isinstance(cv_threshold, int):
- cv_threshold = StratifiedKFold(n_splits=cv_threshold)\
- .split(X=X_train, y=y_train)
- refit = True
- thresholds = []
- for train_inds, val_inds in cv_threshold:
- print("----- In cv threshold fold")
- X_train_fold, X_val_fold, y_train_fold, y_val_fold =\
- CVComposer.cv_slice_dataset(
- X=X_train,
- y=y_train,
- train_inds=train_inds,
- test_inds=val_inds)
- estimator.fit(X_train_fold, y_train_fold)
- proba_val = estimator.predict_proba(X_val_fold)[:, 1]
- threshold = get_optimal_proba_threshold(score_func=score_func,
- y_true=y_val_fold,
- proba=proba_val)
- thresholds.append(threshold)
- print("----- Threshold:", threshold)
- scores["test_threshold"].append(np.mean(thresholds))
- if refit:
- estimator.fit(X_train, y_train)
- proba_val = estimator.predict_proba(X_val)[:, 1]
- proba_train = estimator.predict_proba(X_train)[:, 1]
- pred_train = (proba_train >= threshold)
- pred_val = (proba_val >= threshold)
- train_score = score_func(y_train, pred_train)
- test_score = score_func(y_val, pred_val)
- for metric_name, metric in additional_metrics.items():
- scores["train_" + metric_name].append(metric(y_train, pred_train))
- scores["test_" + metric_name].append(metric(y_val, pred_val))
- scores["train_score"].append(train_score)
- scores["test_score"].append(test_score)
- return scores
- else:
- if isinstance(cv, int):
- cv = StratifiedKFold(n_splits=cv).split(X=X_train, y=y_train)
- cv_threshold = cv_threshold or []
- for (train_inds, val_inds), cv_fold in zip_longest(cv, cv_threshold):
- print("=== In cv fold")
- X_train_fold, X_val_fold, y_train_fold, y_val_fold =\
- CVComposer().cv_slice_dataset(
- X=X_train,
- y=y_train,
- train_inds=train_inds,
- test_inds=val_inds)
- scores = cross_validate_with_optimal_threshold(
- estimator=estimator,
- score_func=score_func,
- X_train=X_train_fold,
- y_train=y_train_fold,
- X_val=X_val_fold,
- y_val=y_val_fold,
- cv_threshold=cv_fold,
- additional_metrics=additional_metrics,
- threshold_set=threshold_set,
- scores=scores)
- print("=== scores:", scores)
- return scores
- if __name__ == "__main__":
- from sklearn.metrics import accuracy_score, precision_score
- from sklearn.datasets import load_breast_cancer
- from xgboost import XGBRFClassifier
- from sklearn.model_selection import train_test_split
- data_loader = load_breast_cancer()
- X = data_loader["data"]
- y = data_loader["target"]
- X_train, X_val, y_train, y_val = train_test_split(X, y)
- estimator = XGBRFClassifier()
- score_func = accuracy_score
- additional_metrics = {"precision": precision_score}
- averaged_scores = []
- averaged_thresholds = []
- print("\nTesting cv=None, cv_threshold=None, X_val_threshold=None\n")
- scores = cross_validate_with_optimal_threshold(
- estimator=estimator,
- score_func=accuracy_score,
- X_train=X_train,
- y_train=y_train,
- X_val=X_val,
- y_val=y_val,
- X_val_threshold=None,
- y_val_threshold=None,
- cv=None,
- cv_threshold=None,
- additional_metrics=additional_metrics)
- print("\nScores:", scores)
- averaged_scores.append(np.mean(scores["test_score"]))
- averaged_thresholds.append(np.mean(scores["test_threshold"]))
- print("\n ########################################################## \n")
- X_train, X_val_threshold, y_train, y_val_threshold =\
- train_test_split(X_train, y_train)
- print("\nTesting cv=None, cv_threshold=None, X_val_threshold\n")
- scores = cross_validate_with_optimal_threshold(
- estimator=estimator,
- score_func=accuracy_score,
- X_train=X_train,
- y_train=y_train,
- X_val=X_val,
- y_val=y_val,
- X_val_threshold=X_val_threshold,
- y_val_threshold=y_val_threshold,
- cv=None,
- cv_threshold=None,
- additional_metrics=additional_metrics)
- print("\nScores:", scores)
- averaged_scores.append(np.mean(scores["test_score"]))
- averaged_thresholds.append(np.mean(scores["test_threshold"]))
- print("\n ########################################################## \n")
- print("\nTesting cv=None, cv_threshold=3 \n")
- scores = cross_validate_with_optimal_threshold(
- estimator=estimator,
- score_func=accuracy_score,
- X_train=X_train,
- y_train=y_train,
- X_val=X_val,
- y_val=y_val,
- X_val_threshold=X_val_threshold,
- y_val_threshold=y_val_threshold,
- cv=None,
- cv_threshold=3,
- additional_metrics=additional_metrics)
- print("\nScores:", scores)
- averaged_scores.append(np.mean(scores["test_score"]))
- averaged_thresholds.append(np.mean(scores["test_threshold"]))
- print("\n ########################################################## \n")
- print("\nTesting cv=3, cv_threshold=None \n")
- scores = cross_validate_with_optimal_threshold(
- estimator=estimator,
- score_func=accuracy_score,
- X_train=X_train,
- y_train=y_train,
- X_val=X_val,
- y_val=y_val,
- X_val_threshold=X_val_threshold,
- y_val_threshold=y_val_threshold,
- cv=3,
- cv_threshold=None,
- additional_metrics=additional_metrics)
- print("\nScores:", scores)
- print("\n ########################################################## \n")
- print("\nTesting cv=3, cv_threshold=[3, 3, 3] \n")
- scores = cross_validate_with_optimal_threshold(
- estimator=estimator,
- score_func=accuracy_score,
- X_train=X_train,
- y_train=y_train,
- X_val=X_val,
- y_val=y_val,
- X_val_threshold=X_val_threshold,
- y_val_threshold=y_val_threshold,
- cv=3,
- cv_threshold=[3, 3, 3],
- additional_metrics=additional_metrics)
- print("\nScores:", scores)
- averaged_scores.append(np.mean(scores["test_score"]))
- averaged_thresholds.append(np.mean(scores["test_threshold"]))
- print("\n ########################################################## \n")
- # TODO: check overwriting X_train,
- # additional metrics append instead of overwrite
- # check the length of cv_threshold
- # test custom cv, cv_threshold
- print("\n Averaged test score:", averaged_scores)
- print("\n Averaged threshold:", averaged_thresholds)
|