123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Mon Sep 30 09:59:54 2019
- @author: tanya
- """
- import gc
- import pandas as pd
- import os
- import sys
- sys.path.append(os.getcwd())
- from libraries.import_process_instances.CleanProcessTable import CleanProcessTable
- class CleanRs1(CleanProcessTable):
- '''
- '''
- def __init__(self):
- '''
- '''
- super().__init__(
- mapping_path=os.path.join(".", "migration_mappings",
- "rs1_mapping.json"),
- inconsist_report_table="inconsist_rs1",
- sort_columns=["radsatznummer", "begin_der_bearbeitung"],
- index_columns=["radsatznummer"],
- log_name="CleanRs1")
- def clean_ende_der_bearbeitung(self, data: pd.DataFrame) -> pd.DataFrame:
- '''
- We filter all the data that has missing begin_der_bearbeitung
- (these cases should be very rare),
- if ende_der_bearbeitung is missing we should fill it by the
- begin_der_bearbeitung of the next station.
- '''
- self.error_column_abscence(columns=["radsatznummer",
- "ende_der_bearbeitung",
- "begin_der_bearbeitung",
- "status"],
- data=data)
- for time_column in ["ende_der_bearbeitung", "begin_der_bearbeitung"]:
- data[time_column] = pd.to_datetime(data[time_column])
- data.sort_values(by=self._sort_columns, inplace=True)
- start_time_next_station = self._get_next_station_start_time(data=data)
- data["ende_der_bearbeitung"].fillna(start_time_next_station,
- inplace=True)
- del start_time_next_station
- gc.collect()
- return data
- def filter_invalid_ende_der_bearbeitung(self, data: pd.DataFrame
- ) -> pd.DataFrame:
- '''
- '''
- is_invalid = (
- (data["ende_der_bearbeitung"].isnull() &
- (data["status"] != "Aktiv")) |
- (data["begin_der_bearbeitung"].isnull()) |
- (data["ende_der_bearbeitung"] < data["begin_der_bearbeitung"]))
- data = self._filter_invalid_data(
- data=data,
- invalid_mask=is_invalid,
- reason="invalid ende der bearbeitung")
- data.sort_values(by=self._sort_columns, inplace=True)
- return data
- def filter_invalid_status(self, data: pd.DataFrame) -> pd.DataFrame:
- '''
- We filter out the cases when work at a station was finished
- with the status "Aktiv" or "Abbruch". An exception is the very last
- station per wheel-set because it can a non-finished process.
- '''
- self.error_column_abscence(columns=["radsatznummer",
- "positionsnummer",
- "status"],
- data=data)
- data.sort_values(by=self._sort_columns, inplace=True)
- is_station_change = (data["positionsnummer"] !=
- data["positionsnummer"].shift(-1))
- is_last_station = (data["radsatznummer"] !=
- data["radsatznummer"].shift(-1))
- has_invalid_status = (
- is_station_change &
- (~is_last_station) &
- (data["status"].isin(["Aktiv", "Abbruch"])))
- data = self._filter_invalid_data(
- data=data,
- invalid_mask=has_invalid_status,
- reason="invalid status")
- return data
- def add_finished(self, data: pd.DataFrame) -> pd.DataFrame:
- '''
- We add a variable indicating if the process is finished or not
- '''
- mongo_name = "final_state.finished"
- self.error_column_abscence(columns=["radsatznummer", "status"],
- data=data)
- data.sort_values(by=self._sort_columns, inplace=True)
- not_finished = ["Aktiv", "Abbruch"]
- last_status_map = data.groupby("radsatznummer")["status"].last()
- data[mongo_name] = ~data["radsatznummer"].map(last_status_map)\
- .isin(not_finished)
- return data
- def add_stage(self, data: pd.DataFrame) -> pd.DataFrame():
- '''
- In the configuration we store the process stages definition in the form
- of the graph.
- '''
- from libraries.configuration import default as cfg
- mongo_name = "process.stage"
- self.error_column_abscence(columns=["radsatznummer", "positionsname"],
- data=data)
- data.sort_values(by=self._sort_columns, inplace=True)
- def cumsum_str(x):
- return x.cumsum()
- def break_cum_string_to_list(x):
- return [int(st) for st in x.split("|")[:-1]]
- previous_stations = data\
- .assign(positionsnummer=data.positionsnummer.astype(str).add("|"))\
- .groupby("radsatznummer")["positionsnummer"]\
- .apply(cumsum_str)\
- .apply(break_cum_string_to_list)
- for stage in cfg.process_stages.nodes():
- this_stage_stations = cfg.process_stages.nodes()[stage]["stations"]
- next_stage_stations = [item for next_stage
- in cfg.process_stages.successors(stage)
- for item in cfg.process_stages.nodes()
- [next_stage]["stations"]]
- def check_stage(x):
- return (len(set(this_stage_stations) & set(x)) != 0) and \
- (len(set(next_stage_stations) & set(x)) == 0)
- data.loc[previous_stations.apply(check_stage), mongo_name] = stage
- return data
|