123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Thu Oct 29 13:58:23 2020
- @author: tanya
- @description:
- scenario 1:
-
- You have a train set and a validation set and you tune the probability
- threshold on the validation set:
-
- X = X_train,
- y = y_train,
- X_val = X_val,
- y_val = y_val
-
- and you set to None:
-
- cv = None,
- X_val_threshold = None,
- y_val_threshold = None,
- cv_threshold = None
-
- Downsides:
-
- 1) You return a single validation score
- (cross validation score would be more robust)
-
- 2) You fine tune on the same validation set as you calculate
- the score on. Using an independent validation data set for fine tuning would be
- more robust
-
- 3) You fine tune the probability threshold on a single dataset.
- It would be more robust to tune on several independent datasets
- and take the average probability threshold.
-
- scenario 2:
-
- You have a train set and a validation set and you tune the probability
- threshold an independent set. You need to pass the independent data set
- to the X_val_threshold and y_val_threshold parameter
-
- X = X_train,
- y = y_train,
- X_val = X_val,
- y_val = y_val,
- X_val_thresold = X_val_indpendent,
- y_val_threshold = y_val_independent
-
- and you set to None:
-
- cv = None,
- cv_threshold = None
-
- Downsides:
-
- 1) You return a single validation score
- (cross validation score would be more robust)
-
- 2) You fine tune the probability threshold on a single dataset.
- It would be more robust to tune on several independent datasets
- and take the average probability threshold.
-
-
- scenario 3:
-
- You have a dataset on which you want to calculate the cross-validation
- score and a cv object. You fine tune the probability threshold on each fold,
- using the validation part of the fold.
-
- X = X_train,
- y = y_train,
- cv = cv
-
- and you set to None:
-
- X_val = None,
- y_val = None,
- X_val_thresold = None,
- y_val_threshold = None
- cv_threshold = None
-
- Downsides:
-
- 2) In each fold, you fine tune on the same validation set as you calculate
- the score on. Using an independent validation data set for fine tuning would be
- more robust
-
- 3) In each fold, you fine tune the probability threshold on a single dataset.
- It would be more robust to tune on several independent datasets
- and take the average probability threshold.
-
-
- scenario 4:
-
- You have a dataset on which you want to calculate the cross-validation
- score and a cv object. You fine tune the probability threshold on independent
- dataset (or multiple datasets) in each fold.
-
- You need to have a cv_threshold object that tells you have to
- split each of the folds of you cv.
-
- Example 1:
-
- cv = [((1, 2, 3, 4), (5, 6, 7)),
- ((5, 6, 7, 8), (9, 10))]
-
- cv_threshold = [ [(1,2), (3, 4)],
- [(5, 6), (7, 8)]
- ]
-
- Example 2:
-
- cv = 3
- cv_threshold = [4, 4, 4]
-
-
- Example 3:
- cv = [((1, 2, 3, 4, 5, 6), (7, 8, 9))]
-
- cv_threshold = [ [((1, 2), (3, 4, 5)),
- ((2, 3), (4, 5, 6))
- ]
- ]
-
- #####################
-
- X = X_train,
- y = y_train,
- cv = cv,
- cv_threshold = cv_threshold
-
- and you set to None:
-
- X_val = None,
- y_val = None,
- X_val_thresold = None,
- y_val_threshold = None
-
- Downsides:
-
- 2) In each fold, you fine tune on the same validation set as you calculate
- the score on. Using an independent validation data set for fine tuning would be
- more robust
-
- 3) In each fold, you fine tune the probability threshold on a single dataset.
- It would be more robust to tune on several independent datasets
- and take the average probability threshold.
-
-
-
- """
- import sys
- import numpy as np
- from itertools import zip_longest
- # from numpy.typing import ArrayLike
- if (sys.version_info.major == 3) & (sys.version_info.minor >= 8):
- from typing import Callable, Dict, Iterable, Union
- else:
- from typing_extensions import Callable, Dict, Iterable, Union
- from copy import deepcopy
- from sklearn.model_selection import StratifiedKFold
- from cdplib.log import Log
- from cdplib.ml_validation.CVComposer import CVComposer
- from cdplib.ml_validation.fine_tuning import get_optimal_proba_threshold
- def cross_validate_with_optimal_threshold(
- score_func_threshold: Callable,
- estimator: object,
- # X: ArrayLike,
- # y: ArrayLike = None,
- # groups: ArrayLike = None,
- X,
- y = None,
- groups = None,
- scoring: Union[Callable, Dict] = None,
- cv: Union[Iterable, int, None] = None,
- n_jobs: int = None,
- verbose: int = None,
- fit_params: Dict = None,
- pre_dispatch: int = None,
- return_train_score: bool = False,
- return_estimator: bool = False,
- error_score: float = np.nan,
- # X_val: ArrayLike = None,
- # y_val: ArrayLike = None,
- # X_val_threshold: ArrayLike = None,
- # y_val_threshold: ArrayLike = None,
- X_val = None,
- y_val = None,
- X_val_threshold = None,
- y_val_threshold = None,
- cv_threshold: Union[Iterable, int, None] = None,
- threshold_set: Union[Iterable, None] = None,
- scores: Dict = None)-> Dict:
- """
-
- """
- logger = Log("cross_validate_with_optimal_threshold:")
- X_train = deepcopy(X)
- y_train = deepcopy(y)
- X_val = deepcopy(X_val)
- y_val = deepcopy(y_val)
- X_val_threshold = deepcopy(X_val_threshold)
- y_val_threshold = deepcopy(y_val_threshold)
- scores = scores or {"test_threshold": [],
- "test_score_threshold": [],
- "train_score_threshold": []}
- scoring = scoring or {}
- for metric_name, metric in scoring.items():
- if "test_" + metric_name not in scores:
- scores["test_" + metric_name] = []
- scores["train_" + metric_name] = []
- if cv is None:
- # test score is calculated on X_vals
- assert((X_val is not None) and (y_val is not None)),\
- "Validation set must be set"
- if cv_threshold is None:
- refit = (X_val_threshold is not None)
- # if a validation set for proba threshold tuning is not given,
- # we use the validation set on which we calculate the test score
- # (this might lead to overfitting)
- X_val_threshold = X_val_threshold if refit else deepcopy(X_val)
- y_val_threshold = y_val_threshold if refit else deepcopy(y_val)
- cv_threshold, X_train, y_train =\
- CVComposer().dummy_cv_and_concatenated_data_set(
- X_train=X_train,
- X_test=X_val_threshold,
- y_train=y_train,
- y_test=y_val_threshold)
- else:
- # if cv_threshold is given, we find the optimal threshold
- # on each fold and output the average value for the threshold
- if (X_val_threshold is not None):
- logger.log_and_throw_warning((
- "X_val_threshold is set "
- "but cv_threshold will be used"))
- if isinstance(cv_threshold, int):
- cv_threshold = StratifiedKFold(n_splits=cv_threshold)\
- .split(X=X_train, y=y_train)
- refit = True
- thresholds = []
- for train_inds, val_inds in cv_threshold:
- X_train_fold, X_val_fold, y_train_fold, y_val_fold =\
- CVComposer().cv_slice_dataset(
- X=X_train,
- y=y_train,
- train_inds=train_inds,
- test_inds=val_inds)
- estimator.fit(X_train_fold, y_train_fold)
- proba_val = estimator.predict_proba(X_val_fold)[:, 1]
- threshold = get_optimal_proba_threshold(
- score_func=score_func_threshold,
- y_true=y_val_fold,
- proba=proba_val)
- thresholds.append(threshold)
- scores["test_threshold"].append(np.mean(thresholds))
- if refit:
- estimator.fit(X_train, y_train)
- proba_val = estimator.predict_proba(X_val)[:, 1]
- proba_train = estimator.predict_proba(X_train)[:, 1]
- pred_train = (proba_train >= threshold)
- pred_val = (proba_val >= threshold)
- train_score = score_func_threshold(y_train, pred_train)
- test_score = score_func_threshold(y_val, pred_val)
- for metric_name, metric in scoring.items():
- scores["train_" + metric_name].append(metric(y_train, pred_train))
- scores["test_" + metric_name].append(metric(y_val, pred_val))
- scores["train_score_threshold"].append(train_score)
- scores["test_score_threshold"].append(test_score)
- return scores
- else:
- if isinstance(cv, int):
- cv = StratifiedKFold(n_splits=cv).split(X=X_train, y=y_train)
- cv_threshold = cv_threshold or []
- for (train_inds, val_inds), cv_fold in zip_longest(cv, cv_threshold):
- X_train_fold, X_val_fold, y_train_fold, y_val_fold =\
- CVComposer().cv_slice_dataset(
- X=X_train,
- y=y_train,
- train_inds=train_inds,
- test_inds=val_inds)
- scores = cross_validate_with_optimal_threshold(
- estimator=estimator,
- score_func_threshold=score_func_threshold,
- X=X_train_fold,
- y=y_train_fold,
- X_val=X_val_fold,
- y_val=y_val_fold,
- cv_threshold=cv_fold,
- scoring=scoring,
- threshold_set=threshold_set,
- scores=scores)
- return scores
- if __name__ == "__main__":
- from sklearn.metrics import accuracy_score, precision_score
- from sklearn.datasets import load_breast_cancer
- from xgboost import XGBRFClassifier
- from sklearn.model_selection import train_test_split
- data_loader = load_breast_cancer()
- X = data_loader["data"]
- y = data_loader["target"]
- X_train, X_val, y_train, y_val = train_test_split(X, y)
- estimator = XGBRFClassifier(use_label_encoder=False,
- eval_metric="logloss")
- score_func = accuracy_score
- scoring = {"precision": precision_score}
- averaged_scores = []
- averaged_thresholds = []
- print("\nTesting cv=None, cv_threshold=None, X_val_threshold=None\n")
- scores = cross_validate_with_optimal_threshold(
- score_func_threshold=accuracy_score,
- estimator=estimator,
- X=X_train,
- y=y_train,
- scoring=scoring,
- cv=None,
- X_val=X_val,
- y_val=y_val,
- X_val_threshold=None,
- y_val_threshold=None,
- cv_threshold=None)
- print("\nScores:", scores)
- averaged_scores.append(np.mean(scores["test_score_threshold"]))
- averaged_thresholds.append(np.mean(scores["test_threshold"]))
- print("\n ########################################################## \n")
- X_train, X_val_threshold, y_train, y_val_threshold =\
- train_test_split(X_train, y_train)
- print("\nTesting cv=None, cv_threshold=None, X_val_threshold\n")
- scores = cross_validate_with_optimal_threshold(
- score_func_threshold=accuracy_score,
- estimator=estimator,
- X=X_train,
- y=y_train,
- scoring=scoring,
- cv=None,
- X_val=X_val,
- y_val=y_val,
- X_val_threshold=X_val_threshold,
- y_val_threshold=y_val_threshold,
- cv_threshold=None)
- print("\nScores:", scores)
- averaged_scores.append(np.mean(scores["test_score_threshold"]))
- averaged_thresholds.append(np.mean(scores["test_threshold"]))
- print("\n ########################################################## \n")
- print("\nTesting cv=None, cv_threshold=3 \n")
- scores = cross_validate_with_optimal_threshold(
- score_func_threshold=accuracy_score,
- estimator=estimator,
- X=X_train,
- y=y_train,
- scoring=scoring,
- cv=None,
- X_val=X_val,
- y_val=y_val,
- X_val_threshold=X_val_threshold,
- y_val_threshold=y_val_threshold,
- cv_threshold=3)
- print("\nScores:", scores)
- averaged_scores.append(np.mean(scores["test_score_threshold"]))
- averaged_thresholds.append(np.mean(scores["test_threshold"]))
- print("\n ########################################################## \n")
- print("\nTesting cv=3, cv_threshold=None \n")
- scores = cross_validate_with_optimal_threshold(
- score_func_threshold=accuracy_score,
- estimator=estimator,
- X=X_train,
- y=y_train,
- scoring=scoring,
- cv=3,
- X_val=None,
- y_val=None,
- X_val_threshold=None,
- y_val_threshold=None,
- cv_threshold=None)
- print("\nScores:", scores)
- print("\n ########################################################## \n")
- print("\nTesting cv=3, cv_threshold=[3, 3, 3] \n")
- scores = cross_validate_with_optimal_threshold(
- score_func_threshold=accuracy_score,
- estimator=estimator,
- X=X_train,
- y=y_train,
- scoring=scoring,
- cv=3,
- X_val=X_val,
- y_val=y_val,
- X_val_threshold=X_val_threshold,
- y_val_threshold=y_val_threshold,
- cv_threshold=[3, 3, 3])
- print("\nScores:", scores)
- averaged_scores.append(np.mean(scores["test_score_threshold"]))
- averaged_thresholds.append(np.mean(scores["test_threshold"]))
- print("\n ########################################################## \n")
- # TODO: check overwriting X_train,
- # additional metrics append instead of overwrite
- # check the length of cv_threshold
- # test custom cv, cv_threshold
- print("\n Averaged test score:", averaged_scores)
- print("\n Averaged threshold:", averaged_thresholds)
|