#!/usr/bin/env python3 # -*- coding: utf-8 -*- import sys import os import time from pprint import pprint import pandas as pd import numpy as np import datetime sys.path.append(os.getcwd()) from copy import deepcopy from cdplib.log import Log from cdplib.FlattenData import FlattenData from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, roc_auc_score class DataExplorer: def __init__(self): self._log = Log("Data Explorer") def calculate_correlation(self, data: pd.DataFrame, column_name_to_predict: str, prediction_name: str, verbose: bool = False): no_nan_data = data.fillna(0) correlations = [] pearson_correlation = self.calculate_big_matrix_correlation(data=data, column_name_to_predict=column_name_to_predict, method='pearson') self._log.info('Calculated Pearson correlation') if verbose: kendall_correlation = self.calculate_big_matrix_correlation(data=data, column_name_to_predict=column_name_to_predict, method='kendall') correlations.append(kendall_correlation) self._log.info('Calculated Kendall correlation') spearman_correlation = self.calculate_big_matrix_correlation(data=data, column_name_to_predict=column_name_to_predict, method='spearman') correlations.append(spearman_correlation) self._log.info('Calculated Spearman correlation') cosine_similarity = self.calculate_cosine_similarity_for_dataframe_columns(no_nan_data, column_name_to_predict) correlations.append(cosine_similarity) self._log.info('Calculated cosine similarity') chi2_independence = self.calculate_chi2_independence_for_dataframe_columns(data, column_name_to_predict, verbose) correlations.append(chi2_independence) self._log.info('Calculated chi2 independence') fisher_exact_test = self.calculate_fisher_exact_for_dataframe_column(data, column_name_to_predict) correlations.append(fisher_exact_test) self._log.info('Calculated Fisher Exact Test') gert_correlation = self.calculate_feature_and_schrott_occurunces(data, column_name_to_predict, prediction_name, verbose) correlations.append(gert_correlation) self._log.info('Calculated Gert correlation') merged_data = pearson_correlation for correlation in correlations: if len(correlation.index) > 0: merged_data = merged_data.join(correlation, how='left') return merged_data def calculate_big_matrix_correlation(self, data: pd.DataFrame, column_name_to_predict: str, method: str='pearson') -> pd.DataFrame(): num_columns = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64', 'bool'] result_data = {} for column in data.columns: if data[column].dtype in num_columns: result_data[column] = data[column_name_to_predict].corr(data[column], method=method) result_df = pd.DataFrame.from_dict(result_data, orient='index') label_string = method + ' correlation' result_df.columns = [label_string] return result_df def calculate_feature_and_schrott_occurunces(self, data: pd.DataFrame, column_name_to_predict: str, prediction_name: str, verbose: bool = False) -> pd.DataFrame(): result_dict = {} len_data = len(data.index) total_schrott = len(data[data[column_name_to_predict] == 1].index) counter = 0 for column in data.columns: occurunces = 0 schrott_occurunces = 0 temp_data = data[data[column] == 1] #print('Temp Data:', temp_data) if len(temp_data.index) > 0: occurunces = len(temp_data.index) schrott_occurunces = len(temp_data[temp_data[column_name_to_predict] == 1].index) schrott_wheelsets = list(temp_data[temp_data[column_name_to_predict] == 1].index) non_schrott = occurunces - schrott_occurunces non_occurunces = len_data - occurunces percentage_of_occurances_schrott = round((schrott_occurunces/occurunces)*100, 2) percentage_of_occurances_not_schrott = round((non_schrott/occurunces)*100, 2) if verbose: result_dict[column] = { 'Occurs':occurunces, '%_Occurs': occurunces/len_data, prediction_name + '_Occurs':schrott_occurunces, '%_' + prediction_name + '_Occurs': percentage_of_occurances_schrott, '%_Total_' + prediction_name: round((schrott_occurunces/total_schrott)*100, 2), '!Occurs': non_occurunces, '%_!Occurs': round((non_occurunces/len_data)*100, 2), 'Occurs_!' + prediction_name: non_schrott, '%_Occurs_!' + prediction_name: percentage_of_occurances_not_schrott, '%_Total_!' + prediction_name: round((non_schrott/len_data)*100, 2), 'Wheelsets_!' + prediction_name: [wheelset for wheelset in list(temp_data.index) if wheelset not in schrott_wheelsets], 'Wheelsets_' + prediction_name: schrott_wheelsets } else: result_dict[column] = { 'Occurs':occurunces, '%_Occurs': occurunces/len_data, prediction_name + '_Occurs':schrott_occurunces, '%_' + prediction_name + '_Occurs': percentage_of_occurances_schrott, '%_Total_' + prediction_name: round((schrott_occurunces/total_schrott)*100, 2), } counter+=1 if counter % 100 == 0: print(('Calculated {} / {}').format(counter, len(data.columns))) return pd.DataFrame.from_dict(result_dict, orient='index') def calculate_fisher_exact_for_dataframe_column(self, data: pd.DataFrame, column_name_to_predict: str) -> pd.DataFrame(): from scipy.stats import fisher_exact result_dict = {} predict_count = data[column_name_to_predict] for column in data.columns: fisher_data = pd.crosstab(predict_count, data[column]) if fisher_data.shape == (2, 2): oddsratio, pvalue = fisher_exact(fisher_data) result_dict[column] = { 'Fisher_Exact_pvalue': pvalue, 'Fisher_Exact_oddsratio': oddsratio } else: result_dict[column] = { 'Fisher_Exact_pvalue': None, 'Fisher_Exact_oddsratio': None } return pd.DataFrame.from_dict(result_dict, orient='index') def calculate_cosine_similarity_for_dataframe_columns(self, data: pd.DataFrame, column_name_to_predict: str) -> pd.DataFrame(): from sklearn.metrics.pairwise import cosine_similarity cosine_corr = cosine_similarity(data.transpose()) cosine_df = pd.DataFrame(cosine_corr, columns=data.columns, index=data.columns) cosine_df = pd.DataFrame(cosine_df[column_name_to_predict]) cosine_df.columns = ['Cosine Similarity'] return cosine_df def calculate_chi2_independence_for_dataframe_columns(self, data: pd.DataFrame, column_name_to_predict: str, verbose: bool = False) -> pd.DataFrame(): from scipy.stats import chi2_contingency result_dict = {} Y = data[column_name_to_predict] for column in data.columns: X = data[column] observed = pd.crosstab(Y,X) chi2=None p=None dof=None expected=None if observed.size == 4: chi2, p, dof, expected = chi2_contingency(observed.values) if dof > 1: self._log.warning(('Calculation contained DOF > 1, this is the data:\n {}').format(X)) if verbose: result_dict[column] = { 'Chi2': chi2, 'Chi2, p':p, 'Chi2, dof':dof, 'Chi2, expected':expected } else: result_dict[column] = { 'Chi2, p':p, } else: if verbose: result_dict[column] = { 'Chi2': None, 'Chi2, p':None, 'Chi2, dof':None, 'Chi2, expected':None } else: result_dict[column] = { 'Chi2, p':None, } return pd.DataFrame.from_dict(result_dict, orient='index') def calculate_machine_learning_scores(self, predictions, true_values, y_train, to_date:str = None): ''' Calculated several different measures on the predictions compared to the true values, returns 2 DataFrames, one to be submitted to Mongodb and one to be saved to excel. :param predictions: The machine learning predictions :param true_values: The correct predictions for the dataset :param y_train: The correct predictions for the training data. ''' from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, roc_auc_score, balanced_accuracy_score if to_date is None: to_date = str(datetime.datetime.utcnow()) confusion_matrix = confusion_matrix(true_values, predictions) #normalized_confusion_matrix = confusion_matrix(true_values, predictions, normalize=True) normalized_confusion_matrix = confusion_matrix.astype('float') / confusion_matrix.sum(axis=1)[:, np.newaxis] classification_report = classification_report(true_values, predictions) accuracy_score = round(accuracy_score(true_values, predictions)*100, 2) roc_auc_score = round(roc_auc_score(true_values, predictions)*100,2) balanced_custom_cost_score = self.balanced_custom_cost_function(true_values, predictions) custom_cost_score = self.custom_cost_function(true_values, predictions) #roc_auc_score = 'Unknown' balanced_accuracy = round(balanced_accuracy_score(true_values, predictions)*100, 2) print("Timestamp\n", to_date) print('\n') print(('% True Positives = {}').format(confusion_matrix[1][1]/(confusion_matrix[1][1]+confusion_matrix[1][0]))) print(('% False Positives = {}').format(confusion_matrix[0][1]/(confusion_matrix[0][1]+confusion_matrix[0][0]))) print('\n') print('Accuracy:\n', accuracy_score) print('\n') print('Balanced Accuracy:\n', balanced_accuracy) print('\n') print('ROC AUC Score:\n', roc_auc_score) print('\n') print('Balanced Custom Cost Function:\n', balanced_custom_cost_score) print('\n') print('Custom Cost Function:\n', custom_cost_score) print('\n') print('Confusion Matrix:\n', confusion_matrix) print('\n') print('Normalized Confusion Matrix:\n', normalized_confusion_matrix) print('\n') print('Classification Report:\n', classification_report) result_dict = { 'Size of training set:': len(y_train), 'Number of Schrott in training set': y_train.value_counts().loc[True], 'Size of prediction set:': len(true_values), 'Number of Schrott in prediction set': true_values.value_counts().loc[True], 'Confusion Matrix': confusion_matrix, 'Normalized Confusion Matrix': normalized_confusion_matrix, 'Accuracy': accuracy_score, 'Balanced Accuracy': balanced_accuracy, 'ROC AUC Score:': roc_auc_score, 'Balanced Custom Cost Score:': balanced_custom_cost_score, 'Custom Cost Score:': custom_cost_score } confusion_matrix = confusion_matrix.ravel() database_dict = { "timestamp": to_date, "result": { "training_set":{ "amount": int(len(y_train)), "schrott": int(y_train.value_counts().loc[True]) }, "test_set":{ "amount": int(len(true_values)), "schrott": int(true_values.value_counts().loc[True]) }, "confusion_matrix":[ ("True Negatives", int(confusion_matrix[0])), ("False Positives", int(confusion_matrix[1])), ("False Negatives", int(confusion_matrix[2])), ("True Positives", int(confusion_matrix[3])) ], "normalized_confusion_matrix":[ ("True Negatives", round(normalized_confusion_matrix[0][0]*100, 2)), ("False Positives", round(normalized_confusion_matrix[0][1]*100, 2)), ("False Negatives", round(normalized_confusion_matrix[1][0]*100, 2)), ("True Positives", round(normalized_confusion_matrix[1][1]*100, 2)) ], "accuracy": accuracy_score, "balanced_accuracy": balanced_accuracy, "ROC_AUC_score": roc_auc_score, 'balanced_custom_cost_score': int(balanced_custom_cost_score), 'custom_cost_score': int(custom_cost_score) } } pprint(database_dict) #result_dict={} #database_dict={} return result_dict, database_dict def get_total_prediction_results(self, in_data, limit: float = 0.5, station = None): data = deepcopy(in_data) data_flattener = FlattenData() data['ist_schrott'] = data_flattener.flatten(data['final_state']) predictions_lists = [] stations_lists = [] num_correct_predictions = [] num_wrong_predictions = [] for true_value, predictions in zip(data['ist_schrott'], data['process']): predictions_list, stations_list= self.filter_by_threshold_and_station(predictions, limit, station) predictions_lists.append(predictions_list) stations_lists.append(stations_list) num_true = sum(predictions_list) num_false = len(predictions_list) - sum(predictions_list) if true_value: num_correct_predictions.append(num_true) num_wrong_predictions.append(num_false) else: num_correct_predictions.append(num_false) num_wrong_predictions.append(num_true) # Fill DataFrame data['stations'] = stations_lists data['predictions'] = predictions_lists data['num_predictions'] = data['predictions'].str.len() data['num_correct_predictions'] = num_correct_predictions data['%_correct_predictions'] = round(data['num_correct_predictions'] / data['num_predictions'], 3)*100 data['num_wrong_predictions'] = num_wrong_predictions data['%_wrong_predictions'] = round(data['num_wrong_predictions'] / data['num_predictions'], 3)*100 ''' print('LIMIT:', limit) num_wheelsets = len(data.index) # Print results print('Predictions for station:', station if not None else 'All stations') print('Wheelsets predicted:', num_wheelsets) print('Predictions made:', data['num_predictions'].sum()) ''' return_dict = {} if station is None: station = 'All Stations' return_dict[str(station)] = {'result': self.calculate_statistics_for_all_predictions(data) } return_dict[station+' per Wheelset'] = {'result': self.calculate_statistics_per_wheelset(data)} else: return_dict[str(station)] = {'result': self.calculate_statistics_for_all_predictions(data) } data.drop(['final_state', 'process'], axis=1, inplace=True) return return_dict def filter_by_threshold_and_station(self, schrott_list: list, limit: float = 0.5, station=None): prediction_list = [] station_list = [] for value in schrott_list: if value and 'schrott' in value.keys(): if station is None: station_list.append(value['stationsnummer']) prediction_list.append(value['schrott'] > limit) elif value['stationsnummer'] == station: station_list.append(value['stationsnummer']) prediction_list.append(value['schrott'] > limit) return (prediction_list, station_list) def calculate_statistics_for_all_predictions(self, data: pd.DataFrame): correct_predictions = data['num_correct_predictions'].sum() wrong_predictions = data['num_wrong_predictions'].sum() true_positives =data[data['ist_schrott']==True]['num_correct_predictions'].sum() false_negatives = data[data['ist_schrott']==True]['num_wrong_predictions'].sum() true_negatives = data[data['ist_schrott']==False]['num_correct_predictions'].sum() false_positives = data[data['ist_schrott']==False]['num_wrong_predictions'].sum() confusion_matrix = [true_negatives, false_positives, false_negatives, true_positives] if (true_negatives + false_positives) != 0 and (false_negatives + true_positives) != 0: normalized_confusion_matrix = [ round(true_negatives/(true_negatives + false_positives),3),\ round(false_positives/(true_negatives + false_positives),3),\ round(false_negatives/(false_negatives + true_positives),3),\ round(true_positives/(false_negatives + true_positives),3)] else: normalized_confusion_matrix = 0 accuracy = round(data['num_correct_predictions'].sum()/(data['num_predictions'].sum()), 3) *100 balanced_accuracy = round( (normalized_confusion_matrix[0]+ normalized_confusion_matrix[-1])/2, 2) *100 ''' print('\n', '*-*'*20) print('Statistics for all predictions') print('Correct predictions:', correct_predictions) print('Wrong predictions:', correct_predictions) print('Confusion Matrix:', confusion_matrix) print('Normalized Confusion Matrix:', normalized_confusion_matrix) print('Accuracy:', accuracy, '%') print('Balanced Accuracy:', balanced_accuracy, '%') ''' return_dict = { 'correct_predictions': str(correct_predictions), 'wrong_predictions': str(wrong_predictions), 'confusion_matrix': [ ('True Negatives', int(confusion_matrix[0])), ('False Positives', int(confusion_matrix[1])), ('False Negatives', int(confusion_matrix[2])), ('True Positives', int(confusion_matrix[3])) ], 'normalized_confusion_matrix': [ ('True Negatives', round(normalized_confusion_matrix[0]*100,2)), ('False Positives', round(normalized_confusion_matrix[1]*100,2)), ('False Negatives', round(normalized_confusion_matrix[2]*100,2)), ('True Positives', round(normalized_confusion_matrix[3]*100,2)) ], 'accuracy': accuracy, 'balanced_accuracy': balanced_accuracy } return return_dict def calculate_statistics_per_wheelset(self, data: pd.DataFrame): num_wheelsets = len(data.index) schrott_data = data[data['ist_schrott'] == True] not_schrott_data = data[data['ist_schrott'] == False] wheelset_true_positives = len(schrott_data[schrott_data['num_correct_predictions']>0].index) wheelset_false_positives = len(not_schrott_data[not_schrott_data['num_wrong_predictions']>0].index) wheelset_true_negatives = len(not_schrott_data[not_schrott_data['num_wrong_predictions']==0].index) wheelset_false_negatives = len(schrott_data[schrott_data['num_correct_predictions']==0].index) wheelset_correct = wheelset_true_positives + wheelset_true_negatives wheelset_wrong = wheelset_false_positives + wheelset_false_negatives wheelset_confusion_matrix = [wheelset_true_negatives, wheelset_false_positives, wheelset_false_negatives, wheelset_true_positives] if (wheelset_true_negatives + wheelset_false_positives) != 0 and (wheelset_false_negatives + wheelset_true_positives) != 0: wheelset_normalized_confusion_matrix = [round(wheelset_true_negatives/(wheelset_true_negatives + wheelset_false_positives),3),\ round(wheelset_false_positives/(wheelset_true_negatives + wheelset_false_positives),3),\ round(wheelset_false_negatives/(wheelset_false_negatives + wheelset_true_positives),3),\ round(wheelset_true_positives/(wheelset_false_negatives + wheelset_true_positives),3)] else: wheelset_normalized_confusion_matrix = 0 wheelset_accuracy = round(wheelset_correct/num_wheelsets, 3) *100 if type(wheelset_normalized_confusion_matrix) == list: wheelset_balanced_accuracy = round( (wheelset_normalized_confusion_matrix[0]+ wheelset_normalized_confusion_matrix[-1])/2, 2) *100 else: wheelset_balanced_accuracy = 'could not be calculates' print('\n', '*-*'*20) print('Statistics per wheelset') print('Correct predictions:', wheelset_correct) print('Wrong predictions:', wheelset_wrong) print('Confusion Matrix:', [['True Negatives', str(wheelset_confusion_matrix[0])], ['False Positives', str(wheelset_confusion_matrix[1])],\ ['False Negatives', str(wheelset_confusion_matrix[2])], ['True Positives', str(wheelset_confusion_matrix[3])]]) print('Normalized Confusion Matrix:', [['True Negatives', str(wheelset_normalized_confusion_matrix[0])], ['False Positives', str(wheelset_normalized_confusion_matrix[1])],\ ['False Negatives', str(wheelset_normalized_confusion_matrix[2])], ['True Positives', str(wheelset_normalized_confusion_matrix[3])]]) print('Accuracy:', wheelset_accuracy, '%') print('Balanced Accuracy:', wheelset_balanced_accuracy, '%') return_dict = { 'correct_predictions': str(wheelset_correct), 'wrong_predictions': str(wheelset_wrong), 'confusion_matrix':[ ('True Negatives', int(wheelset_confusion_matrix[0])), ('False Positives', int(wheelset_confusion_matrix[1])), ('False Negatives', int(wheelset_confusion_matrix[2])), ('True Positives', int(wheelset_confusion_matrix[3])) ], 'normalized_confusion_matrix': [ ('True Negatives', round(wheelset_normalized_confusion_matrix[0]*100, 2)), ('False Positives', round(wheelset_normalized_confusion_matrix[1]*100, 2)), ('False Negatives', round(wheelset_normalized_confusion_matrix[2]*100, 2)), ('True Positives', round(wheelset_normalized_confusion_matrix[3]*100, 2)) ], 'accuracy': wheelset_accuracy, 'balanced_accuracy': wheelset_balanced_accuracy } return return_dict # Cost for new wheelset - 2000€ # Average cost for overhauling - ~3h = €250 # True positive: 250 # False positive: -2000 # True negative: 0 # False negative: -250 def custom_cost_function(self, correct_values, predictions, weights_list: list = [0, -2000, 0, 250], normalized: bool = False): from sklearn.metrics import confusion_matrix tn, fp, fn, tp = confusion_matrix(correct_values, predictions).ravel() tn_weight, fp_weight, fn_weight, tp_weight = weights_list max_score = (tn+fp)*tn_weight+(tp+fn)*tp_weight score = tn*tn_weight + fp*fp_weight + fn*fn_weight + tp*tp_weight if normalized: return score/max_score else: return score # old weights_list: list = [1.2, -500, -0.2, 2] def balanced_custom_cost_function(self, correct_values, predictions, weights_list: list = [0, -2000, 0, 250], normalized: bool = False): from sklearn.metrics import confusion_matrix #print(confusion_matrix(correct_values, predictions)) tn, fp, fn, tp = confusion_matrix(correct_values, predictions).ravel() tn_weight, fp_weight, fn_weight, tp_weight = weights_list #print(('tn {}, fp {}, fn {}, tp {}').format(tn, fp, fn, tp)) max_score_negatives = (tn+fp)*tn_weight #print('max_score_negatives', max_score_negatives) max_score_positives = (tp+fn)*tp_weight #print('max_score_positives', max_score_positives) score_negatives = tn*tn_weight + fp*fp_weight #print('score_negatives', score_negatives) score_positives = fn*fn_weight + tp*tp_weight #print('score_positives', score_positives) if normalized: if max_score_negatives != 0: normalized_negatives = score_negatives / max_score_negatives #print('normalized_negatives', normalized_negatives) else: normalized_negatives = score_negatives / (max_score_negatives+0.00001) if max_score_positives != 0: normalized_positives = score_positives / max_score_positives #print('normalized_positives', normalized_positives) else: normalized_positives = score_positives / (max_score_positives+0.00001) return (normalized_negatives + normalized_positives) / 2 #print('balanced_normalized_score', balanced_normalized_score) else: return (score_negatives + score_positives) / 2 if __name__ == "__main__": from libraries.db_handlers.OebbMongodbHandler import OebbMongodbHandler mongodb_handler = OebbMongodbHandler() data_explorer = DataExplorer() from_date = str(datetime.datetime(2017, 1, 1)) to_date = str(datetime.datetime(2018, 1, 1)) find_query = { 'process.beginn_der_bearbeitung':{ '$gt': from_date, '$lt': to_date }, 'process.schrott':{ '$exists': True } } data = mongodb_handler.query_data_and_generate_dataframe('process_instances', find_query=find_query, return_values={'radsatznummer':1, 'process.schrott':1, 'process.stationsnummer':1, 'final_state.ist_schrott':1, '_id':0}, index='radsatznummer') stations = [421, 110, 130, 140, 680, 410, 510, 520, 320, 480, 490, 595, 535, None] #stations = [130, 520, 140, 535, 410, 421, 680, 320, 595, 480, 490, 110, 510, None] result_dict = {} for station in stations: result_dict.update(data_explorer.get_total_prediction_results(data, 0.5, station))#limit/10) result_df = pd.DataFrame.from_dict(result_dict, orient='index') mongodb_handler.insert_data_into_collection(result_dict, 'prediction_scores') result_df.index.name = 'station' print(result_df) result_df.to_excel(os.path.join('.', 'documentation', 'prediction_scores.xlsx'))