ParseMapping.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Fri Sep 20 15:33:17 2019
  5. @author: tanya
  6. """
  7. import os
  8. import sys
  9. import numpy as np
  10. sys.path.append(os.getcwd())
  11. from libraries.log import Log
  12. class ParseMapping:
  13. '''
  14. '''
  15. def __init__(self, mapping_path: str, log_name: str = "ParseMapping",
  16. source: str = "original_name", target: str = "mongo_name",
  17. target_collection: str = "mongo_collection"):
  18. '''
  19. '''
  20. import json
  21. from libraries.log import Log
  22. log = Log('Parse Mapping')
  23. if not os.path.isfile(mapping_path):
  24. err = "Mapping not found"
  25. self._log.error(err)
  26. raise FileNotFoundError(err)
  27. try:
  28. with open(mapping_path, "r") as f:
  29. self._mapping = json.load(f)
  30. except Exception as e:
  31. err = ("Could not load mapping. "
  32. "Exit with error {}".format(e))
  33. self._log.error(err)
  34. raise Exception(err)
  35. self._source = source
  36. self._target = target
  37. self._target_collection = target_collection
  38. def get_field_mapping(self) -> dict:
  39. '''
  40. '''
  41. assert(all([set([self._source, self._target]) <= set(d)
  42. for d in self._mapping]))
  43. return {d[self._source]: d[self._target] for d in self._mapping}
  44. def _get_fields_satistisfying_condition(self, key: str, value) -> list:
  45. '''
  46. '''
  47. assert(all([self._source in d for d in self._mapping])),\
  48. "Invalid from field"
  49. return [d[self._source] for d in self._mapping
  50. if (key in d) and (d[key] == value)]
  51. def get_required_fields(self) -> list:
  52. '''
  53. '''
  54. return self._get_fields_satistisfying_condition(key="required",
  55. value=1)
  56. def get_date_fields(self) -> list:
  57. '''
  58. '''
  59. return self._get_fields_satistisfying_condition(key="type",
  60. value="Date")
  61. def get_fields_restricted_to_collecton(self, collection_name: str) -> list:
  62. '''
  63. '''
  64. return self._get_fields_satistisfying_condition(key=self._target_collection,
  65. value=collection_name)
  66. def _get_info(self, key: str, value=None) -> dict:
  67. '''
  68. '''
  69. assert(all([self._source in d for d in self._mapping])),\
  70. "Invalid from field"
  71. return {d[self._source]: d[key] for d in self._mapping
  72. if (key in d) and ((value is not None)
  73. and (d[key] == value)) or (key in d)}
  74. def get_default_values(self) -> dict:
  75. '''
  76. '''
  77. return self._get_info(key="default_values")
  78. def get_date_formats(self) -> dict:
  79. '''
  80. '''
  81. return self._get_info(key="date_format")
  82. def get_types(self) -> dict:
  83. '''
  84. '''
  85. return self._get_info(key="type")
  86. def get_python_types(self) -> dict:
  87. '''
  88. '''
  89. sql_to_python_dtypes = {
  90. "Text": str,
  91. "Date": np.dtype('<M8[ns]'),
  92. "Double": float,
  93. "Integer": int
  94. }
  95. sql_types = self.get_types()
  96. return {k: sql_to_python_dtypes[v] for k, v in sql_types.items()}
  97. def get_value_mappings(self) -> dict:
  98. '''
  99. '''
  100. return self._get_info(key="value_mapping")
  101. def get_column_numbers(self) -> list:
  102. '''
  103. '''
  104. if all(["column_number" in d for d in self._mapping]):
  105. column_numbers = [d["column_number"] for d in self._mapping]
  106. elif all(["column_number" not in d for d in self._mapping]):
  107. column_numbers = list(range(len(self._mapping)))
  108. else:
  109. err = ("Incorrectly filled mapping. Column numbers should ",
  110. "either in all or in neither of the fields")
  111. self.log.err(err)
  112. raise Exception(err)
  113. return column_numbers
  114. if __name__ == "__main__":
  115. mapping_path = os.path.join(".", "migration_mappings", "rs0_mapping.json")
  116. if os.path.isfile(mapping_path):
  117. print("found mapping path")
  118. parser = ParseMapping(mapping_path, source="internal_name",
  119. target="mongo_name")
  120. internal_to_mongo_mapping = parser.get_field_mapping()
  121. original_to_internal_mapping = parser.get_field_mapping()
  122. default_values = parser.get_default_values()
  123. types = parser.get_types()
  124. column_numbers = parser.get_column_numbers()
  125. print("Done testing!")