123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Fri Sep 20 15:33:17 2019
- @author: tanya
- """
- import os
- import sys
- import numpy as np
- sys.path.append(os.getcwd())
- class ParseMapping:
- '''
- '''
- def __init__(self, mapping_path: str, log_name: str = "ParseMapping",
- source: str = "original_name", target: str = "original_name"):
- '''
- '''
- import json
- from libraries.log import Log
- self._log = Log(log_name)
- if not os.path.isfile(mapping_path):
- err = "Mapping not found"
- self._log.error(err)
- raise FileNotFoundError(err)
- try:
- with open(mapping_path, "r") as f:
- self._mapping = json.load(f)
- except Exception as e:
- err = ("Could not load mapping. "
- "Exit with error {}".format(e))
- self._log.error(err)
- raise Exception(err)
- self._source = source
- self._target = target
- def get_field_mapping(self) -> dict:
- '''
- '''
- assert(all([set([self._source, self._target]) <= set(d)
- for d in self._mapping]))
- return {d[self._source]: d[self._target] for d in self._mapping}
- def _get_fields_satistisfying_condition(self, key: str, value) -> list:
- '''
- '''
- assert(all([self._source in d for d in self._mapping])),\
- "Invalid from field"
- return [d[self._source] for d in self._mapping
- if (key in d) and (d[key] == value)]
- def get_required_fields(self) -> list:
- '''
- '''
- return self._get_fields_satistisfying_condition(key="required",
- value=1)
- def get_date_fields(self) -> list:
- '''
- '''
- return self._get_fields_satistisfying_condition(key="type",
- value="Date")
- def _get_info(self, key: str, value=None) -> dict:
- '''
- '''
- assert(all([self._source in d for d in self._mapping])),\
- "Invalid from field"
- return {d[self._source]: d[key] for d in self._mapping
- if (key in d) and ((value is not None)
- and (d[key] == value)) or (key in d)}
- def get_default_values(self) -> dict:
- '''
- '''
- return self._get_info(key="default_values")
- def get_date_formats(self) -> dict:
- '''
- '''
- return self._get_info(key="date_format")
- def get_types(self) -> dict:
- '''
- '''
- return self._get_info(key="type")
- def get_python_types(self) -> dict:
- '''
- '''
- sql_to_python_dtypes = {
- "Text": str,
- "Date": np.dtype('<M8[ns]'),
- "Double": float,
- "Integer": int
- }
- sql_types = self.get_types()
- return {k: sql_to_python_dtypes[v] for k, v in sql_types.items()}
- def get_value_mappings(self) -> dict:
- '''
- '''
- return self._get_info(key="value_mapping")
- def get_column_numbers(self) -> list:
- '''
- '''
- if all(["column_number" in d for d in self._mapping]):
- column_numbers = [d["column_number"] for d in self._mapping]
- elif all(["column_number" not in d for d in self._mapping]):
- column_numbers = list(range(len(self._mapping)))
- else:
- err = ("Incorrectly filled mapping. Column numbers should ",
- "either in all or in neither of the fields")
- self.log.err(err)
- raise Exception(err)
- return column_numbers
- if __name__ == "__main__":
- mapping_path = os.path.join(".", "migration_mappings", "rs0_mapping.json")
- if os.path.isfile(mapping_path):
- print("found mapping path")
- parser = ParseMapping(mapping_path, source="internal_name",
- target="mongo_name")
- internal_to_mongo_mapping = parser.get_field_mapping()
- original_to_internal_mapping = parser.get_field_mapping()
- default_values = parser.get_default_values()
- types = parser.get_types()
- column_numbers = parser.get_column_numbers()
- print("Done testing!")
|