#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Mon Sep 30 08:55:56 2019 @author: tanya """ import pandas as pd import numpy as np import os import sys sys.path.append(os.getcwd()) from libraries.db_migration.MigrationCleaning import MigrationCleaning class CleanTable(MigrationCleaning): ''' ''' def __init__(self, mapping_path: str, inconsist_report_table: str, filter_index_columns: (str, list), sort_columns: list = None, index_columns: list = None, log_name: str = "CleanProcessTable"): ''' ''' super().__init__( mapping_path=mapping_path, schema_paths=[os.path.join(".", "mongo_schema", "schema_process_instances.json"), os.path.join(".", "mongo_schema", "schema_wheelsets.json"), os.path.join(".", "mongo_schema", "schema_components.json")], inconsist_report_table=inconsist_report_table, filter_index_columns=filter_index_columns, log_name=log_name) self._tablename = os.path.basename(self._mapping_path)\ .split("_mapping")[0] self._sort_columns = sort_columns self._index_columns = index_columns from libraries.db_handlers.SQLHandler import SQLHandler self._sql_db = SQLHandler() def read_data(self, wheelsets): ''' ''' if len(wheelsets) > 1: query = "SELECT * FROM {0} WHERE radsatznummer in {1}"\ .format(self._tablename, tuple(wheelsets)) else: query = "SELECT * FROM {0} WHERE radsatznummer = '{1}'"\ .format(self._tablename, wheelsets[0]) return self._sql_db.read_sql_to_dataframe(query) def drop_duplicated_entries(self, data: pd.DataFrame, columns_to_ignore: list = None ) -> pd.DataFrame(): ''' ''' if columns_to_ignore is None: columns_to_ignore = ["ende_der_bearbeitung"] self.error_column_abscence(columns=columns_to_ignore, data=data) defining_columns = [c for c in data.columns if c not in columns_to_ignore] return data.drop_duplicates(subset=defining_columns)\ .reset_index(drop=True) @property def field_mapping(self): ''' ''' return self._mapping_parser.get_field_mapping() class CleanProcessTable(CleanTable): ''' ''' def __init__(self, mapping_path: str, inconsist_report_table: str = None, filter_index_columns=["radsatznummer"], sort_columns: list = None, index_columns: list = None, log_name: str = "CleanProcessTable"): ''' ''' super().__init__( mapping_path=mapping_path, sort_columns=sort_columns, index_columns=index_columns, inconsist_report_table=inconsist_report_table, filter_index_columns=filter_index_columns, log_name=log_name) def _get_next_station_start_time(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' self.error_column_abscence(columns=["radsatznummer", "positionsnummer", "begin_der_bearbeitung"], data=data) data.sort_values(by=["radsatznummer", "begin_der_bearbeitung"], inplace=True) start_time_next_station =\ data.groupby("radsatznummer")["begin_der_bearbeitung"].shift(-1)\ .fillna("temp") station_change = (data.groupby("radsatznummer")["positionsnummer"] .shift(-1) != data["positionsnummer"]) start_time_next_station.loc[~station_change] = np.nan start_time_next_station.fillna(method="bfill", inplace=True) start_time_next_station.loc[start_time_next_station == "temp"] = np.nan return pd.to_datetime(start_time_next_station)