123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Thu Apr 23 08:51:53 2020
- @author: tanya
- @description: class for fine-tuning a sklearn classifier
- (optimizing the probability threshold)
- """
- import pandas as pd
- import numpy as np
- from typing import Callable
- from sklearn.base import (BaseEstimator, ClassifierMixin,
- clone, MetaEstimatorMixin)
- from cdplib.log import Log
- from cdplib.utils.TyperConverter import TypeConverter
- class FineTunedClassifierCV(BaseEstimator, ClassifierMixin,
- MetaEstimatorMixin):
- """
- Probability threshold tuning for a given estimator.
- Overrides the method predict of the given sklearn classifer
- and returns predictions with the optimal value of
- the probability threshold.
- An object of this class can be passed to an sklearn Pipeline
- """
- def __init__(self, estimator, cost_func: Callable, greater_is_better: bool,
- cv=None, threshold_step: float = 0.1):
- """
- """
- self.estimator = estimator
- self.is_fitted = False
- self.greater_is_better = greater_is_better
- if cv is None:
- self.cv = ...
- else:
- self.cv = cv
- self.cost_func = cost_func
- self.threshold_step = threshold_step
- self.optimal_threshold = 0.5
- self._logger = Log("FineTunedClassifyCV")
- def _get_best_threshold(self, y_val: (pd.DataFrame, np.array),
- proba_pred: (pd.DataFrame, np.array)):
- '''
- '''
- costs = {}
- for t in np.arange(self.threshold_step, 1, self.threshold_step):
- costs[t] = self.cost_func(y_val, (proba_pred >= t).astype(int))
- if self.greater_is_better:
- return max(costs, key=costs.get)
- else:
- return min(costs, key=costs.get)
- def fit(self, X: (pd.DataFrame, np.array),
- y: (pd.DataFrame, np.array) = None,
- **fit_args):
- """
- """
- X = TypeConverter().convert_to_ndarray(X)
- if y is not None:
- y = TypeConverter().convert_to_ndarray(X)
- optimal_thrs_per_fold = []
- for train_inds, val_inds in self.cv:
- X_train, X_val = X[train_inds], X[val_inds]
- if y is not None:
- y_train, y_val = y[train_inds], y[val_inds]
- else:
- y_train, y_val = None, None
- estimator = clone(fine_tuned_clf.estimator)
- estimator.fit(X_train, y_train, **fit_args)
- proba_pred = estimator.predict_proba(X_val)
- optimal_thr = self._get_best_threshold(y_val, proba_pred)
- optimal_thrs_per_fold.append(optimal_thr)
- self.optimal_threshold = np.mean(optimal_thrs_per_fold)
- self.estimator.fit(X, **fit_args)
- def predict(self, X: (pd.DataFrame, np.array)) -> np.array:
- """
- """
- if self.is_fitted:
- proba_pred = self.estimator.predict_proba(X)
- return (proba_pred >= self.optimal_threshold).astype(int)
- else:
- self._logger.warn("You should fit first")
- def get_params(self):
- """
- """
- params = self.estimator.get_params()
- params.update({"cv": self.cv, "cost_func": self.cost_func})
- return params
- def set_params(self, **params: dict):
- """
- """
- for param in params:
- if param == "cv":
- self.cv = params[param]
- params.pop(param)
- elif param == "cost_func":
- self.cost_func = params[param]
- params.pop(param)
- self.estimator.set_params(**params)
- if __name__ == "__main__":
- # test
- from sklearn.datasets import load_iris
- from sklearn.metrics import accuracy_score
- import gc
- from xgboost import XGBRFClassifier
- data = load_iris()
- X, y = data["data"], data["target"]
- y = (y==1).astype(int)
- del data
- gc.collect()
- # make a custom cv object
- val_len = len(X)//10
- split_inds = range(len(X)//2, len(X), val_len)
- cv = []
- for i in split_inds:
- train_inds = list(range(i))
- val_inds = list(range(i, i + val_len))
- cv.append((train_inds, val_inds))
- clf = XGBRFClassifier()
- fine_tuned_clf = FineTunedClassifierCV(estimator=clf,
- cv=cv,
- greater_is_better=True,
- cost_func=accuracy_score)
- fine_tuned_clf.fit(X=X, y=y)
|