|
@@ -15,49 +15,91 @@ class ParseMapping:
|
|
|
'''
|
|
|
'''
|
|
|
def __init__(self, mapping_path: str, log_name: str = "ParseMapping",
|
|
|
- source: str = "original_name", target: str = "mongo_name",
|
|
|
- target_collection: str = "mongo_collection"):
|
|
|
+ source_name_tag: str = "original_name", target_name_tag: str = "mongo_name",
|
|
|
+ target_collection_tag: str = "mongo_collection",
|
|
|
+ target_collection_name: str = None):
|
|
|
'''
|
|
|
'''
|
|
|
import json
|
|
|
from cdplib.log import Log
|
|
|
|
|
|
- self.log = Log('Parse Mapping')
|
|
|
+ self._log = Log('Parse Mapping')
|
|
|
|
|
|
if not os.path.isfile(mapping_path):
|
|
|
- err = "Mapping not found"
|
|
|
- self._log.error(err)
|
|
|
- raise FileNotFoundError(err)
|
|
|
-
|
|
|
+ self._log.log_and_raise_error("Mapping not found")
|
|
|
try:
|
|
|
with open(mapping_path, "r") as f:
|
|
|
self._mapping = json.load(f)
|
|
|
|
|
|
except Exception as e:
|
|
|
- err = ("Could not load mapping. "
|
|
|
- "Exit with error {}".format(e))
|
|
|
- self._log.error(err)
|
|
|
- raise Exception(err)
|
|
|
+ self._log.log_and_raise_error("Could not load mapping. "
|
|
|
+ "Exit with error {}".format(e))
|
|
|
+
|
|
|
+ self._mapping_path = mapping_path
|
|
|
+ self._source_name_tag = source_name_tag
|
|
|
+ self._target_name_tag = target_name_tag
|
|
|
+ self._target_collection_tag = target_collection_tag
|
|
|
+ self._target_collection_name = target_collection_name
|
|
|
|
|
|
- self._source = source
|
|
|
- self._target = target
|
|
|
- self._target_collection = target_collection
|
|
|
+ self._restrict_mapping_to_collection()
|
|
|
|
|
|
- def get_field_mapping(self) -> dict:
|
|
|
+ def _restrict_mapping_to_collection(self):
|
|
|
'''
|
|
|
'''
|
|
|
- assert(all([set([self._source, self._target]) <= set(d)
|
|
|
+ if self._target_collection_name is not None:
|
|
|
+
|
|
|
+ result = []
|
|
|
+
|
|
|
+ for d in self._mapping:
|
|
|
+
|
|
|
+ if self._target_collection_tag not in d:
|
|
|
+ continue
|
|
|
+
|
|
|
+ for key in [self._target_name_tag, self._target_collection_tag]:
|
|
|
+ if not isinstance(d[key], list):
|
|
|
+ d[key] = [d[key]]
|
|
|
+
|
|
|
+ if (len(d[self._target_collection_tag]) > 1) and (len(d[self._target_name_tag]) == 1):
|
|
|
+ d[self._target_name_tag] = d[self._target_name_tag]*len(d[self._target_collection_tag])
|
|
|
+
|
|
|
+ if len(d[self._target_collection_tag]) != len(d[self._target_name_tag]):
|
|
|
+ self._log.log_and_raise_error(("In the migration mapping '{0}' {1} "
|
|
|
+ "{2} has an unclear collection")
|
|
|
+ .format(self._mapping_path,
|
|
|
+ self._target_name_tag,
|
|
|
+ d[self._target_name_tag]))
|
|
|
+
|
|
|
+ if self._target_collection_name in d[self._target_collection_tag]:
|
|
|
+
|
|
|
+ d[self._target_name_tag] = d[self._target_name_tag][d[self._target_collection_tag].index(self._target_collection_name)]
|
|
|
+
|
|
|
+ d[self._target_collection_tag] = self._target_collection_name
|
|
|
+
|
|
|
+ result.append(d)
|
|
|
+
|
|
|
+ self._mapping = result
|
|
|
+
|
|
|
+ def get_field_mapping(self, source_name_tag: str = None, target_name_tag: str = None) -> dict:
|
|
|
+ '''
|
|
|
+ '''
|
|
|
+ if source_name_tag is None:
|
|
|
+ source_name_tag = self._source_name_tag
|
|
|
+
|
|
|
+ if target_name_tag is None:
|
|
|
+ target_name_tag = self._target_name_tag
|
|
|
+
|
|
|
+ assert(all([set([source_name_tag, target_name_tag]) <= set(d)
|
|
|
for d in self._mapping]))
|
|
|
|
|
|
- return {d[self._source]: d[self._target] for d in self._mapping}
|
|
|
+ return {d[source_name_tag]: d[self._target_name_tag] for d in self._mapping}
|
|
|
|
|
|
def _get_fields_satistisfying_condition(self, key: str, value) -> list:
|
|
|
'''
|
|
|
'''
|
|
|
- assert(all([self._source in d for d in self._mapping])),\
|
|
|
+ assert(all([self._source_name_tag in d for d in self._mapping])),\
|
|
|
"Invalid from field"
|
|
|
|
|
|
- return [d[self._source] for d in self._mapping
|
|
|
+ return [d[self._source_name_tag] for d in self._mapping
|
|
|
if (key in d) and (d[key] == value)]
|
|
|
|
|
|
def get_required_fields(self) -> list:
|
|
@@ -72,19 +114,28 @@ class ParseMapping:
|
|
|
return self._get_fields_satistisfying_condition(key="type",
|
|
|
value="Date")
|
|
|
|
|
|
+ """
|
|
|
def get_fields_restricted_to_collecton(self, collection_name: str) -> list:
|
|
|
'''
|
|
|
'''
|
|
|
- return self._get_fields_satistisfying_condition(key=self._target_collection,
|
|
|
- value=collection_name)
|
|
|
+ target_collection_tag_mapping = {d[self._source_name_tag]: d[self._target_collection_tag]
|
|
|
+ for d in self._mapping
|
|
|
+ if (self._target_collection_tag in d)}
|
|
|
+
|
|
|
+ target_collection_tag_mapping = {k: v if isinstance(v, list) else [v]
|
|
|
+ for k, v in target_collection_tag_mapping.items()}
|
|
|
+
|
|
|
+ return [k for k,v in target_collection_tag_mapping.items()
|
|
|
+ if collection_name in v]
|
|
|
+ """
|
|
|
|
|
|
def _get_info(self, key: str, value=None) -> dict:
|
|
|
'''
|
|
|
'''
|
|
|
- assert(all([self._source in d for d in self._mapping])),\
|
|
|
+ assert(all([self._source_name_tag in d for d in self._mapping])),\
|
|
|
"Invalid from field"
|
|
|
|
|
|
- return {d[self._source]: d[key] for d in self._mapping
|
|
|
+ return {d[self._source_name_tag]: d[key] for d in self._mapping
|
|
|
if (key in d) and ((value is not None)
|
|
|
and (d[key] == value)) or (key in d)}
|
|
|
|
|
@@ -134,7 +185,7 @@ class ParseMapping:
|
|
|
else:
|
|
|
err = ("Incorrectly filled mapping. Column numbers should ",
|
|
|
"either in all or in neither of the fields")
|
|
|
- self.log.err(err)
|
|
|
+ self._log.err(err)
|
|
|
raise Exception(err)
|
|
|
|
|
|
return column_numbers
|
|
@@ -148,8 +199,8 @@ if __name__ == "__main__":
|
|
|
|
|
|
print("found mapping path")
|
|
|
|
|
|
- parser = ParseMapping(mapping_path, source="internal_name",
|
|
|
- target="mongo_name")
|
|
|
+ parser = ParseMapping(mapping_path, source_name_tag="internal_name",
|
|
|
+ target_name_tag="mongo_name")
|
|
|
|
|
|
internal_to_mongo_mapping = parser.get_field_mapping()
|
|
|
|