123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Fri Sep 20 15:33:17 2019
- @author: tanya
- """
- import os
- import sys
- import numpy as np
- import json
- from cdplib.log import Log
- sys.path.append(os.getcwd())
- class ParseMapping:
- '''
- '''
- def __init__(self, mapping_paths: (str, list), log_name: str = "ParseMapping",
- source: str = "original_name", target: str = "mongo_name",
- target_collections: str = "mongo_collection"):
- '''
- '''
- self._log = Log('Parse Mapping')
-
- assert(isinstance(mapping_paths, (list, str))),\
- "Mapping_paths must be either str or lists"
-
- if isinstance(mapping_paths, str):
- mapping_paths = [mapping_paths]
-
- self._mapping_paths = mapping_paths
- self._source = source
- self._target = target
- self._target_collections = target_collections
- self._update_mapping()
-
- def _update_mapping(self):
- '''
- Since we can have multiple mappings per table we need to add them to
- the object. I concatenated the mapping so that we don't have to adjust
- all function of the class to accept also list input. The class could
- be adjusted to accept list or even a dictornary with the key name as
- name of the mapping and value the json mapping.
- !!! WARNING !!!!
- Since the mapping are just concatenated there is right now
- no way to ditinguish from the object itself which item belongs to which
- mapping file.
- '''
- mappings = []
-
- for mapping_path in self._mapping_paths:
- try:
- with open(mapping_path, "r") as f:
- mapping = json.load(f)
- mappings.append(mapping)
-
- except Exception as e:
- err = ("Could not load json schema:{1} , "
- "Obtained error {0}".format(e, mapping_path))
- self._log.error(err)
- raise Exception(err)
-
- if len(mappings) > 1:
- concatenate_mapping = []
- for mapping in mappings:
- if not concatenate_mapping:
- concatenate_mapping = mapping
- else:
- concatenate_mapping.extend(mapping)
- self._mapping = concatenate_mapping
- else:
- self._mapping = mappings[0]
-
- def get_field_mapping(self) -> dict:
- '''
- '''
- assert(all([set([self._source, self._target]) <= set(d)
- for d in self._mapping]))
- return {d[self._source]: d[self._target] for d in self._mapping}
- def _get_fields_satistisfying_condition(self, key: str, value) -> list:
- '''
- '''
- assert(all([self._source in d for d in self._mapping])),\
- "Invalid from field"
- return [d[self._source] for d in self._mapping
- if (key in d) and (value in d[key])]
- def get_required_fields(self) -> list:
- '''
- '''
- return self._get_fields_satistisfying_condition(key="required",
- value=1)
- def get_date_fields(self) -> list:
- '''
- '''
- return self._get_fields_satistisfying_condition(key="type",
- value="Date")
- def get_fields_restricted_to_collection(self, collection_name: str) -> list:
- '''
- '''
- return self._get_fields_satistisfying_condition(key=self._target_collections,
- value=collection_name)
- def _get_info(self, key: str, value=None) -> dict:
- '''
- '''
- assert(all([self._source in d for d in self._mapping])),\
- "Invalid from field"
- result = {}
- for d in self._mapping:
- if key in d and d[key] is not None and d[key]:
- result.update({d[self._source]: d[key]})
-
- return result
- def get_default_values(self) -> dict:
- '''
- '''
- return self._get_info(key="default_values")
- def get_date_formats(self) -> dict:
- '''
- '''
- return self._get_info(key="date_format")
-
- def get_internal_names(self) -> dict:
- '''
- '''
-
- if all(["internal_name" in d for d in self._mapping]):
- internal_names = [d["internal_name"] for d in self._mapping]
-
- elif all(["internal_name" not in d for d in self._mapping]):
- internal_names = list(range(len(self._mapping)))
- else:
- err = ("Incorrectly filled mapping. Internal names should "
- "either be in all or in neither of the fields")
- self._log.error(err)
- raise Exception(err)
- return internal_names
- def get_mongo_names(self) -> dict:
- '''
- '''
- if all(["mongo_name" in d for d in self._mapping]):
- mongo_names = [d["mongo_name"] for d in self._mapping]
- elif all(["mongo_name" not in d for d in self._mapping]):
- mongo_names = list(range(len(self._mapping)))
- else:
- err = ("Incorrectly filled mapping. Mongo names should "
- "either be in all or in neither of the fields")
- self._log.error(err)
- raise Exception(err)
- return mongo_names
- def get_types(self) -> dict:
- '''
- '''
- return self._get_info(key="type")
- def get_python_types(self) -> dict:
- '''
- '''
- sql_to_python_dtypes = {
- "Text": str,
- "Date": np.dtype('<M8[ns]'),
- "Double": float,
- "Integer": int
- }
- sql_types = self.get_types()
- return {k: sql_to_python_dtypes[v] for k, v in sql_types.items()}
- def get_value_mappings(self) -> dict:
- '''
- '''
- return self._get_info(key="value_mapping")
- def get_column_numbers(self) -> list:
- '''
- '''
- if all(["column_number" in d for d in self._mapping]):
- column_numbers = [d["column_number"] for d in self._mapping]
- elif all(["column_number" not in d for d in self._mapping]):
- column_numbers = list(range(len(self._mapping)))
- else:
- err = ("Incorrectly filled mapping. Column numbers should ",
- "either in all or in neither of the fields")
- self._log.err(err)
- raise Exception(err)
- return column_numbers
- if __name__ == "__main__":
- mapping_path = os.path.join(".", "migration_mappings", "rs0_mapping.json")
- if os.path.isfile(mapping_path):
- print("found mapping path")
- parser = ParseMapping(mapping_path, source="internal_name",
- target="mongo_name")
- internal_to_mongo_mapping = parser.get_field_mapping()
- original_to_internal_mapping = parser.get_field_mapping()
- default_values = parser.get_default_values()
- types = parser.get_types()
- column_numbers = parser.get_column_numbers()
- print("Done testing!")
|