ParseMapping.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Fri Sep 20 15:33:17 2019
  5. @author: tanya
  6. """
  7. import os
  8. import sys
  9. import numpy as np
  10. sys.path.append(os.getcwd())
  11. class ParseMapping:
  12. '''
  13. '''
  14. def __init__(self, mapping_path: str, log_name: str = "ParseMapping",
  15. source: str = "original_name", target: str = "mongo_name",
  16. target_collection: str = "mongo_collection"):
  17. '''
  18. '''
  19. import json
  20. from cdplib.log import Log
  21. self._log = Log('Parse Mapping')
  22. if not os.path.isfile(mapping_path):
  23. err = "Mapping not found "+mapping_path
  24. self._log.error(err)
  25. raise FileNotFoundError(err)
  26. try:
  27. with open(mapping_path, "r") as f:
  28. self._mapping = json.load(f)
  29. except Exception as e:
  30. err = ("Could not load mapping. " + mapping_path +
  31. "Exit with error {}".format(e))
  32. self._log.error(err)
  33. raise Exception(err)
  34. self._source = source
  35. self._target = target
  36. self._target_collection = target_collection
  37. def get_field_mapping(self) -> dict:
  38. '''
  39. '''
  40. assert(all([set([self._source, self._target]) <= set(d)
  41. for d in self._mapping]))
  42. return {d[self._source]: d[self._target] for d in self._mapping}
  43. def _get_fields_satistisfying_condition(self, key: str, value) -> list:
  44. '''
  45. '''
  46. assert(all([self._source in d for d in self._mapping])),\
  47. "Invalid from field"
  48. return [d[self._source] for d in self._mapping
  49. if (key in d) and (d[key] == value)]
  50. def get_required_fields(self) -> list:
  51. '''
  52. '''
  53. return self._get_fields_satistisfying_condition(key="required",
  54. value=1)
  55. def get_date_fields(self) -> list:
  56. '''
  57. '''
  58. return self._get_fields_satistisfying_condition(key="type",
  59. value="Date")
  60. def get_fields_restricted_to_collecton(self, collection_name: str) -> list:
  61. '''
  62. '''
  63. return self._get_fields_satistisfying_condition(key=self._target_collection,
  64. value=collection_name)
  65. def _get_info(self, key: str, value=None) -> dict:
  66. '''
  67. '''
  68. assert(all([self._source in d for d in self._mapping])),\
  69. "Invalid from field"
  70. return {d[self._source]: d[key] for d in self._mapping
  71. if (key in d) and ((value is not None)
  72. and (d[key] == value)) or (key in d)}
  73. def get_default_values(self) -> dict:
  74. '''
  75. '''
  76. return self._get_info(key="default_values")
  77. def get_date_formats(self) -> dict:
  78. '''
  79. '''
  80. return self._get_info(key="date_format")
  81. def get_internal_names(self) -> dict:
  82. '''
  83. '''
  84. if all(["internal_name" in d for d in self._mapping]):
  85. internal_names = [d["internal_name"] for d in self._mapping]
  86. elif all(["internal_name" not in d for d in self._mapping]):
  87. internal_names = list(range(len(self._mapping)))
  88. else:
  89. err = ("Incorrectly filled mapping. Internal names should "
  90. "either be in all or in neither of the fields")
  91. self._log.error(err)
  92. raise Exception(err)
  93. return internal_names
  94. def get_mongo_names(self) -> dict:
  95. '''
  96. '''
  97. if all(["mongo_name" in d for d in self._mapping]):
  98. mongo_names = [d["mongo_name"] for d in self._mapping]
  99. elif all(["mongo_name" not in d for d in self._mapping]):
  100. mongo_names = list(range(len(self._mapping)))
  101. else:
  102. err = ("Incorrectly filled mapping. Mongo names should "
  103. "either be in all or in neither of the fields")
  104. self._log.error(err)
  105. raise Exception(err)
  106. return mongo_names
  107. def get_types(self) -> dict:
  108. '''
  109. '''
  110. return self._get_info(key="type")
  111. def get_python_types(self) -> dict:
  112. '''
  113. '''
  114. sql_to_python_dtypes = {
  115. "Text": str,
  116. "Date": np.dtype('<M8[ns]'),
  117. "Double": float,
  118. "Integer": int
  119. }
  120. sql_types = self.get_types()
  121. return {k: sql_to_python_dtypes[v] for k, v in sql_types.items()}
  122. def get_value_mappings(self) -> dict:
  123. '''
  124. '''
  125. return self._get_info(key="value_mapping")
  126. def get_column_numbers(self) -> list:
  127. '''
  128. '''
  129. if all(["column_number" in d for d in self._mapping]):
  130. column_numbers = [d["column_number"] for d in self._mapping]
  131. elif all(["column_number" not in d for d in self._mapping]):
  132. column_numbers = list(range(len(self._mapping)))
  133. else:
  134. err = ("Incorrectly filled mapping. Column numbers should ",
  135. "either in all or in neither of the fields")
  136. self._log.err(err)
  137. raise Exception(err)
  138. return column_numbers
  139. if __name__ == "__main__":
  140. mapping_path = os.path.join(".", "migration_mappings", "rs0_mapping.json")
  141. if os.path.isfile(mapping_path):
  142. print("found mapping path")
  143. parser = ParseMapping(mapping_path, source="internal_name",
  144. target="mongo_name")
  145. internal_to_mongo_mapping = parser.get_field_mapping()
  146. original_to_internal_mapping = parser.get_field_mapping()
  147. default_values = parser.get_default_values()
  148. types = parser.get_types()
  149. column_numbers = parser.get_column_numbers()
  150. print("Done testing!")