CleanProcessTable.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Mon Sep 30 08:55:56 2019
  5. @author: tanya
  6. """
  7. import pandas as pd
  8. import numpy as np
  9. import os
  10. import sys
  11. sys.path.append(os.getcwd())
  12. from libraries.db_migration.MigrationCleaning import MigrationCleaning
  13. class CleanTable(MigrationCleaning):
  14. '''
  15. '''
  16. def __init__(self, mapping_path: str,
  17. inconsist_report_table: str,
  18. filter_index_columns: (str, list),
  19. sort_columns: list = None,
  20. index_columns: list = None,
  21. log_name: str = "CleanProcessTable"):
  22. '''
  23. '''
  24. super().__init__(
  25. mapping_path=mapping_path,
  26. schema_paths=[os.path.join(".", "mongo_schema",
  27. "schema_process_instances.json"),
  28. os.path.join(".", "mongo_schema",
  29. "schema_wheelsets.json"),
  30. os.path.join(".", "mongo_schema",
  31. "schema_components.json")],
  32. inconsist_report_table=inconsist_report_table,
  33. filter_index_columns=filter_index_columns,
  34. log_name=log_name)
  35. self._tablename = os.path.basename(self._mapping_path)\
  36. .split("_mapping")[0]
  37. self._sort_columns = sort_columns
  38. self._index_columns = index_columns
  39. from libraries.db_handlers.SQLHandler import SQLHandler
  40. self._sql_db = SQLHandler()
  41. def read_data(self, wheelsets):
  42. '''
  43. '''
  44. if len(wheelsets) > 1:
  45. query = "SELECT * FROM {0} WHERE radsatznummer in {1}"\
  46. .format(self._tablename, tuple(wheelsets))
  47. else:
  48. query = "SELECT * FROM {0} WHERE radsatznummer = '{1}'"\
  49. .format(self._tablename, wheelsets[0])
  50. return self._sql_db.read_sql_to_dataframe(query)
  51. def drop_duplicated_entries(self, data: pd.DataFrame,
  52. columns_to_ignore: list = None
  53. ) -> pd.DataFrame():
  54. '''
  55. '''
  56. if columns_to_ignore is None:
  57. columns_to_ignore = ["ende_der_bearbeitung"]
  58. self.error_column_abscence(columns=columns_to_ignore, data=data)
  59. defining_columns = [c for c in data.columns
  60. if c not in columns_to_ignore]
  61. return data.drop_duplicates(subset=defining_columns)\
  62. .reset_index(drop=True)
  63. @property
  64. def field_mapping(self):
  65. '''
  66. '''
  67. return self._mapping_parser.get_field_mapping()
  68. class CleanProcessTable(CleanTable):
  69. '''
  70. '''
  71. def __init__(self, mapping_path: str,
  72. inconsist_report_table: str = None,
  73. filter_index_columns=["radsatznummer"],
  74. sort_columns: list = None,
  75. index_columns: list = None,
  76. log_name: str = "CleanProcessTable"):
  77. '''
  78. '''
  79. super().__init__(
  80. mapping_path=mapping_path,
  81. sort_columns=sort_columns,
  82. index_columns=index_columns,
  83. inconsist_report_table=inconsist_report_table,
  84. filter_index_columns=filter_index_columns,
  85. log_name=log_name)
  86. def _get_next_station_start_time(self, data: pd.DataFrame) -> pd.DataFrame:
  87. '''
  88. '''
  89. self.error_column_abscence(columns=["radsatznummer", "positionsnummer",
  90. "begin_der_bearbeitung"],
  91. data=data)
  92. data.sort_values(by=["radsatznummer", "begin_der_bearbeitung"],
  93. inplace=True)
  94. start_time_next_station =\
  95. data.groupby("radsatznummer")["begin_der_bearbeitung"].shift(-1)\
  96. .fillna("temp")
  97. station_change = (data.groupby("radsatznummer")["positionsnummer"]
  98. .shift(-1) != data["positionsnummer"])
  99. start_time_next_station.loc[~station_change] = np.nan
  100. start_time_next_station.fillna(method="bfill", inplace=True)
  101. start_time_next_station.loc[start_time_next_station == "temp"] = np.nan
  102. return pd.to_datetime(start_time_next_station)