#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Mon Sep 30 09:59:54 2019 @author: tanya """ import gc import pandas as pd import os import sys sys.path.append(os.getcwd()) from libraries.import_process_instances.CleanProcessTable import CleanProcessTable class CleanRs1(CleanProcessTable): ''' ''' def __init__(self): ''' ''' super().__init__( mapping_path=os.path.join(".", "migration_mappings", "rs1_mapping.json"), inconsist_report_table="inconsist_rs1", sort_columns=["radsatznummer", "begin_der_bearbeitung"], index_columns=["radsatznummer"], log_name="CleanRs1") def clean_ende_der_bearbeitung(self, data: pd.DataFrame) -> pd.DataFrame: ''' We filter all the data that has missing begin_der_bearbeitung (these cases should be very rare), if ende_der_bearbeitung is missing we should fill it by the begin_der_bearbeitung of the next station. ''' self.error_column_abscence(columns=["radsatznummer", "ende_der_bearbeitung", "begin_der_bearbeitung", "status"], data=data) for time_column in ["ende_der_bearbeitung", "begin_der_bearbeitung"]: data[time_column] = pd.to_datetime(data[time_column]) data.sort_values(by=self._sort_columns, inplace=True) start_time_next_station = self._get_next_station_start_time(data=data) data["ende_der_bearbeitung"].fillna(start_time_next_station, inplace=True) del start_time_next_station gc.collect() return data def filter_invalid_ende_der_bearbeitung(self, data: pd.DataFrame ) -> pd.DataFrame: ''' ''' is_invalid = ( (data["ende_der_bearbeitung"].isnull() & (data["status"] != "Aktiv")) | (data["begin_der_bearbeitung"].isnull()) | (data["ende_der_bearbeitung"] < data["begin_der_bearbeitung"])) data = self._filter_invalid_data( data=data, invalid_mask=is_invalid, reason="invalid ende der bearbeitung") data.sort_values(by=self._sort_columns, inplace=True) return data def filter_invalid_status(self, data: pd.DataFrame) -> pd.DataFrame: ''' We filter out the cases when work at a station was finished with the status "Aktiv" or "Abbruch". An exception is the very last station per wheel-set because it can a non-finished process. ''' self.error_column_abscence(columns=["radsatznummer", "positionsnummer", "status"], data=data) data.sort_values(by=self._sort_columns, inplace=True) is_station_change = (data["positionsnummer"] != data["positionsnummer"].shift(-1)) is_last_station = (data["radsatznummer"] != data["radsatznummer"].shift(-1)) has_invalid_status = ( is_station_change & (~is_last_station) & (data["status"].isin(["Aktiv", "Abbruch"]))) data = self._filter_invalid_data( data=data, invalid_mask=has_invalid_status, reason="invalid status") return data def add_finished(self, data: pd.DataFrame) -> pd.DataFrame: ''' We add a variable indicating if the process is finished or not ''' mongo_name = "final_state.finished" self.error_column_abscence(columns=["radsatznummer", "status"], data=data) data.sort_values(by=self._sort_columns, inplace=True) not_finished = ["Aktiv", "Abbruch"] last_status_map = data.groupby("radsatznummer")["status"].last() data[mongo_name] = ~data["radsatznummer"].map(last_status_map)\ .isin(not_finished) return data def add_stage(self, data: pd.DataFrame) -> pd.DataFrame(): ''' In the configuration we store the process stages definition in the form of the graph. ''' from libraries.configuration import default as cfg mongo_name = "process.stage" self.error_column_abscence(columns=["radsatznummer", "positionsname"], data=data) data.sort_values(by=self._sort_columns, inplace=True) def cumsum_str(x): return x.cumsum() def break_cum_string_to_list(x): return [int(st) for st in x.split("|")[:-1]] previous_stations = data\ .assign(positionsnummer=data.positionsnummer.astype(str).add("|"))\ .groupby("radsatznummer")["positionsnummer"]\ .apply(cumsum_str)\ .apply(break_cum_string_to_list) for stage in cfg.process_stages.nodes(): this_stage_stations = cfg.process_stages.nodes()[stage]["stations"] next_stage_stations = [item for next_stage in cfg.process_stages.successors(stage) for item in cfg.process_stages.nodes() [next_stage]["stations"]] def check_stage(x): return (len(set(this_stage_stations) & set(x)) != 0) and \ (len(set(next_stage_stations) & set(x)) == 0) data.loc[previous_stations.apply(check_stage), mongo_name] = stage return data