ParseMapping.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Fri Sep 20 15:33:17 2019
  5. @author: tanya
  6. """
  7. import os
  8. import sys
  9. import numpy as np
  10. import json
  11. from cdplib.log import Log
  12. sys.path.append(os.getcwd())
  13. class ParseMapping:
  14. '''
  15. '''
  16. def __init__(self, mapping_paths: (str, list), log_name: str = "ParseMapping",
  17. source: str = "original_name", target: str = "mongo_name",
  18. target_collections: str = "mongo_collection"):
  19. '''
  20. '''
  21. self._log = Log('Parse Mapping')
  22. assert(isinstance(mapping_paths, (list, str))),\
  23. "Mapping_paths must be either str or lists"
  24. if isinstance(mapping_paths, str):
  25. mapping_paths = [mapping_paths]
  26. self._mapping_paths = mapping_paths
  27. self._source = source
  28. self._target = target
  29. self._target_collections = target_collections
  30. self._update_mapping()
  31. def _update_mapping(self):
  32. '''
  33. Since we can have multiple mappings per table we need to add them to
  34. the object. I concatenated the mapping so that we don't have to adjust
  35. all function of the class to accept also list input. The class could
  36. be adjusted to accept list or even a dictornary with the key name as
  37. name of the mapping and value the json mapping.
  38. !!! WARNING !!!!
  39. Since the mapping are just concatenated there is right now
  40. no way to ditinguish from the object itself which item belongs to which
  41. mapping file.
  42. '''
  43. mappings = []
  44. for mapping_path in self._mapping_paths:
  45. try:
  46. with open(mapping_path, "r") as f:
  47. mapping = json.load(f)
  48. mappings.append(mapping)
  49. except Exception as e:
  50. err = ("Could not load json schema:{1} , "
  51. "Obtained error {0}".format(e, mapping_path))
  52. self._log.error(err)
  53. raise Exception(err)
  54. if len(mappings) > 1:
  55. concatenate_mapping = []
  56. for mapping in mappings:
  57. if not concatenate_mapping:
  58. concatenate_mapping = mapping
  59. else:
  60. concatenate_mapping.extend(mapping)
  61. self._mapping = concatenate_mapping
  62. else:
  63. self._mapping = mappings[0]
  64. def get_field_mapping(self) -> dict:
  65. '''
  66. '''
  67. assert(all([set([self._source, self._target]) <= set(d)
  68. for d in self._mapping]))
  69. return {d[self._source]: d[self._target] for d in self._mapping}
  70. def _get_fields_satistisfying_condition(self, key: str, value) -> list:
  71. '''
  72. '''
  73. assert(all([self._source in d for d in self._mapping])),\
  74. "Invalid from field"
  75. return [d[self._source] for d in self._mapping
  76. if (key in d) and (value in d[key])]
  77. def get_required_fields(self) -> list:
  78. '''
  79. '''
  80. return self._get_fields_satistisfying_condition(key="required",
  81. value=1)
  82. def get_date_fields(self) -> list:
  83. '''
  84. '''
  85. return self._get_fields_satistisfying_condition(key="type",
  86. value="Date")
  87. def get_fields_restricted_to_collection(self, collection_name: str) -> list:
  88. '''
  89. '''
  90. return self._get_fields_satistisfying_condition(key=self._target_collections,
  91. value=collection_name)
  92. def _get_info(self, key: str, value=None) -> dict:
  93. '''
  94. '''
  95. assert(all([self._source in d for d in self._mapping])),\
  96. "Invalid from field"
  97. result = {}
  98. for d in self._mapping:
  99. if key in d and d[key] is not None and d[key]:
  100. result.update({d[self._source]: d[key]})
  101. return result
  102. def get_default_values(self) -> dict:
  103. '''
  104. '''
  105. return self._get_info(key="default_values")
  106. def get_date_formats(self) -> dict:
  107. '''
  108. '''
  109. return self._get_info(key="date_format")
  110. def get_internal_names(self) -> dict:
  111. '''
  112. '''
  113. if all(["internal_name" in d for d in self._mapping]):
  114. internal_names = [d["internal_name"] for d in self._mapping]
  115. elif all(["internal_name" not in d for d in self._mapping]):
  116. internal_names = list(range(len(self._mapping)))
  117. else:
  118. err = ("Incorrectly filled mapping. Internal names should "
  119. "either be in all or in neither of the fields")
  120. self._log.error(err)
  121. raise Exception(err)
  122. return internal_names
  123. def get_mongo_names(self) -> dict:
  124. '''
  125. '''
  126. if all(["mongo_name" in d for d in self._mapping]):
  127. mongo_names = [d["mongo_name"] for d in self._mapping]
  128. elif all(["mongo_name" not in d for d in self._mapping]):
  129. mongo_names = list(range(len(self._mapping)))
  130. else:
  131. err = ("Incorrectly filled mapping. Mongo names should "
  132. "either be in all or in neither of the fields")
  133. self._log.error(err)
  134. raise Exception(err)
  135. return mongo_names
  136. def get_types(self) -> dict:
  137. '''
  138. '''
  139. return self._get_info(key="type")
  140. def get_python_types(self) -> dict:
  141. '''
  142. '''
  143. sql_to_python_dtypes = {
  144. "Text": str,
  145. "Date": np.dtype('<M8[ns]'),
  146. "Double": float,
  147. "Integer": int
  148. }
  149. sql_types = self.get_types()
  150. return {k: sql_to_python_dtypes[v] for k, v in sql_types.items()}
  151. def get_value_mappings(self) -> dict:
  152. '''
  153. '''
  154. return self._get_info(key="value_mapping")
  155. def get_column_numbers(self) -> list:
  156. '''
  157. '''
  158. if all(["column_number" in d for d in self._mapping]):
  159. column_numbers = [d["column_number"] for d in self._mapping]
  160. elif all(["column_number" not in d for d in self._mapping]):
  161. column_numbers = list(range(len(self._mapping)))
  162. else:
  163. err = ("Incorrectly filled mapping. Column numbers should ",
  164. "either in all or in neither of the fields")
  165. self._log.err(err)
  166. raise Exception(err)
  167. return column_numbers
  168. if __name__ == "__main__":
  169. mapping_path = os.path.join(".", "migration_mappings", "rs0_mapping.json")
  170. if os.path.isfile(mapping_path):
  171. print("found mapping path")
  172. parser = ParseMapping(mapping_path, source="internal_name",
  173. target="mongo_name")
  174. internal_to_mongo_mapping = parser.get_field_mapping()
  175. original_to_internal_mapping = parser.get_field_mapping()
  176. default_values = parser.get_default_values()
  177. types = parser.get_types()
  178. column_numbers = parser.get_column_numbers()
  179. print("Done testing!")