CleanRs1.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Mon Sep 30 09:59:54 2019
  5. @author: tanya
  6. """
  7. import gc
  8. import pandas as pd
  9. import os
  10. import sys
  11. sys.path.append(os.getcwd())
  12. from libraries.import_process_instances.CleanProcessTable import CleanProcessTable
  13. class CleanRs1(CleanProcessTable):
  14. '''
  15. '''
  16. def __init__(self):
  17. '''
  18. '''
  19. super().__init__(
  20. mapping_path=os.path.join(".", "migration_mappings",
  21. "rs1_mapping.json"),
  22. inconsist_report_table="inconsist_rs1",
  23. sort_columns=["radsatznummer", "begin_der_bearbeitung"],
  24. index_columns=["radsatznummer"],
  25. log_name="CleanRs1")
  26. def clean_ende_der_bearbeitung(self, data: pd.DataFrame) -> pd.DataFrame:
  27. '''
  28. We filter all the data that has missing begin_der_bearbeitung
  29. (these cases should be very rare),
  30. if ende_der_bearbeitung is missing we should fill it by the
  31. begin_der_bearbeitung of the next station.
  32. '''
  33. self.error_column_abscence(columns=["radsatznummer",
  34. "ende_der_bearbeitung",
  35. "begin_der_bearbeitung",
  36. "status"],
  37. data=data)
  38. for time_column in ["ende_der_bearbeitung", "begin_der_bearbeitung"]:
  39. data[time_column] = pd.to_datetime(data[time_column])
  40. data.sort_values(by=self._sort_columns, inplace=True)
  41. start_time_next_station = self._get_next_station_start_time(data=data)
  42. data["ende_der_bearbeitung"].fillna(start_time_next_station,
  43. inplace=True)
  44. del start_time_next_station
  45. gc.collect()
  46. return data
  47. def filter_invalid_ende_der_bearbeitung(self, data: pd.DataFrame
  48. ) -> pd.DataFrame:
  49. '''
  50. '''
  51. is_invalid = (
  52. (data["ende_der_bearbeitung"].isnull() &
  53. (data["status"] != "Aktiv")) |
  54. (data["begin_der_bearbeitung"].isnull()) |
  55. (data["ende_der_bearbeitung"] < data["begin_der_bearbeitung"]))
  56. data = self._filter_invalid_data(
  57. data=data,
  58. invalid_mask=is_invalid,
  59. reason="invalid ende der bearbeitung")
  60. data.sort_values(by=self._sort_columns, inplace=True)
  61. return data
  62. def filter_invalid_status(self, data: pd.DataFrame) -> pd.DataFrame:
  63. '''
  64. We filter out the cases when work at a station was finished
  65. with the status "Aktiv" or "Abbruch". An exception is the very last
  66. station per wheel-set because it can a non-finished process.
  67. '''
  68. self.error_column_abscence(columns=["radsatznummer",
  69. "positionsnummer",
  70. "status"],
  71. data=data)
  72. data.sort_values(by=self._sort_columns, inplace=True)
  73. is_station_change = (data["positionsnummer"] !=
  74. data["positionsnummer"].shift(-1))
  75. is_last_station = (data["radsatznummer"] !=
  76. data["radsatznummer"].shift(-1))
  77. has_invalid_status = (
  78. is_station_change &
  79. (~is_last_station) &
  80. (data["status"].isin(["Aktiv", "Abbruch"])))
  81. data = self._filter_invalid_data(
  82. data=data,
  83. invalid_mask=has_invalid_status,
  84. reason="invalid status")
  85. return data
  86. def add_finished(self, data: pd.DataFrame) -> pd.DataFrame:
  87. '''
  88. We add a variable indicating if the process is finished or not
  89. '''
  90. mongo_name = "final_state.finished"
  91. self.error_column_abscence(columns=["radsatznummer", "status"],
  92. data=data)
  93. data.sort_values(by=self._sort_columns, inplace=True)
  94. not_finished = ["Aktiv", "Abbruch"]
  95. last_status_map = data.groupby("radsatznummer")["status"].last()
  96. data[mongo_name] = ~data["radsatznummer"].map(last_status_map)\
  97. .isin(not_finished)
  98. return data
  99. def add_stage(self, data: pd.DataFrame) -> pd.DataFrame():
  100. '''
  101. In the configuration we store the process stages definition in the form
  102. of the graph.
  103. '''
  104. from libraries.configuration import default as cfg
  105. mongo_name = "process.stage"
  106. self.error_column_abscence(columns=["radsatznummer", "positionsname"],
  107. data=data)
  108. data.sort_values(by=self._sort_columns, inplace=True)
  109. def cumsum_str(x):
  110. return x.cumsum()
  111. def break_cum_string_to_list(x):
  112. return [int(st) for st in x.split("|")[:-1]]
  113. previous_stations = data\
  114. .assign(positionsnummer=data.positionsnummer.astype(str).add("|"))\
  115. .groupby("radsatznummer")["positionsnummer"]\
  116. .apply(cumsum_str)\
  117. .apply(break_cum_string_to_list)
  118. for stage in cfg.process_stages.nodes():
  119. this_stage_stations = cfg.process_stages.nodes()[stage]["stations"]
  120. next_stage_stations = [item for next_stage
  121. in cfg.process_stages.successors(stage)
  122. for item in cfg.process_stages.nodes()
  123. [next_stage]["stations"]]
  124. def check_stage(x):
  125. return (len(set(this_stage_stations) & set(x)) != 0) and \
  126. (len(set(next_stage_stations) & set(x)) == 0)
  127. data.loc[previous_stations.apply(check_stage), mongo_name] = stage
  128. return data