123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Mon Sep 30 08:55:56 2019
- @author: tanya
- """
- import pandas as pd
- import numpy as np
- import os
- import sys
- sys.path.append(os.getcwd())
- from libraries.db_migration.MigrationCleaning import MigrationCleaning
- class CleanTable(MigrationCleaning):
- '''
- '''
- def __init__(self, mapping_path: str,
- inconsist_report_table: str,
- filter_index_columns: (str, list),
- sort_columns: list = None,
- index_columns: list = None,
- log_name: str = "CleanProcessTable"):
- '''
- '''
- super().__init__(
- mapping_path=mapping_path,
- schema_paths=[os.path.join(".", "mongo_schema",
- "schema_process_instances.json"),
- os.path.join(".", "mongo_schema",
- "schema_wheelsets.json"),
- os.path.join(".", "mongo_schema",
- "schema_components.json")],
- inconsist_report_table=inconsist_report_table,
- filter_index_columns=filter_index_columns,
- log_name=log_name)
- self._tablename = os.path.basename(self._mapping_path)\
- .split("_mapping")[0]
- self._sort_columns = sort_columns
- self._index_columns = index_columns
- from libraries.db_handlers.SQLHandler import SQLHandler
- self._sql_db = SQLHandler()
- def read_data(self, wheelsets):
- '''
- '''
- if len(wheelsets) > 1:
- query = "SELECT * FROM {0} WHERE radsatznummer in {1}"\
- .format(self._tablename, tuple(wheelsets))
- else:
- query = "SELECT * FROM {0} WHERE radsatznummer = '{1}'"\
- .format(self._tablename, wheelsets[0])
- return self._sql_db.read_sql_to_dataframe(query)
- def drop_duplicated_entries(self, data: pd.DataFrame,
- columns_to_ignore: list = None
- ) -> pd.DataFrame():
- '''
- '''
- if columns_to_ignore is None:
- columns_to_ignore = ["ende_der_bearbeitung"]
- self.error_column_abscence(columns=columns_to_ignore, data=data)
- defining_columns = [c for c in data.columns
- if c not in columns_to_ignore]
- return data.drop_duplicates(subset=defining_columns)\
- .reset_index(drop=True)
- @property
- def field_mapping(self):
- '''
- '''
- return self._mapping_parser.get_field_mapping()
- class CleanProcessTable(CleanTable):
- '''
- '''
- def __init__(self, mapping_path: str,
- inconsist_report_table: str = None,
- filter_index_columns=["radsatznummer"],
- sort_columns: list = None,
- index_columns: list = None,
- log_name: str = "CleanProcessTable"):
- '''
- '''
- super().__init__(
- mapping_path=mapping_path,
- sort_columns=sort_columns,
- index_columns=index_columns,
- inconsist_report_table=inconsist_report_table,
- filter_index_columns=filter_index_columns,
- log_name=log_name)
- def _get_next_station_start_time(self, data: pd.DataFrame) -> pd.DataFrame:
- '''
- '''
- self.error_column_abscence(columns=["radsatznummer", "positionsnummer",
- "begin_der_bearbeitung"],
- data=data)
- data.sort_values(by=["radsatznummer", "begin_der_bearbeitung"],
- inplace=True)
- start_time_next_station =\
- data.groupby("radsatznummer")["begin_der_bearbeitung"].shift(-1)\
- .fillna("temp")
- station_change = (data.groupby("radsatznummer")["positionsnummer"]
- .shift(-1) != data["positionsnummer"])
- start_time_next_station.loc[~station_change] = np.nan
- start_time_next_station.fillna(method="bfill", inplace=True)
- start_time_next_station.loc[start_time_next_station == "temp"] = np.nan
- return pd.to_datetime(start_time_next_station)
|