123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Thu Jan 31 11:41:48 2019
- @author: tanya
- """
- import os
- import sys
- from copy import deepcopy
- import numpy as np
- sys.path.append(os.getcwd())
- from libraries.db_migration.ParseDbSchema import ParseDbSchema
- class ParseJsonSchema(ParseDbSchema):
- '''
- Class for retrieving column properties from mongodb jsonSchema
- '''
- def __init__(self, schema_paths: [list, str], log_file: str = None):
- '''
- '''
- import json
- from libraries.log import Log
- super().__init__(schema_paths=schema_paths, log_file=log_file)
- self._log = Log(name="ParseJsonSchema", log_file=log_file)
- # load schemas to dictionaries if they are valid json files
- assert(isinstance(schema_paths, (list, str))),\
- "Schema paths must be either str or lists"
- if isinstance(schema_paths, str):
- schema_paths = [schema_paths]
- self.schemas = []
- for schema_path in schema_paths:
- try:
- with open(schema_path, "r") as f:
- self.schemas.append(json.load(f))
- except Exception as e:
- err = ("Could not load json schema, "
- "Obtained error {}".format(e))
- self._log.error(err)
- raise Exception(err)
- def get_fields(self) -> list:
- '''
- '''
- return self._parse()
- def get_required_fields(self) -> list:
- '''
- '''
- return self._parse(required_only=True)
- def get_mongo_types(self) -> dict:
- '''
- '''
- return self._parse(field_info="bsonType")
- def get_datetime_fields(self):
- '''
- '''
- mongo_types = self.get_mongo_types()
- return [k for k, v in mongo_types.items()
- if v in ["date", "timestamp", "Date", "Timestamp"]]
- def get_python_types(self) -> dict:
- '''
- '''
- mongo_types = self.get_mongo_types()
- python_types = {}
- bson_to_python_types_except_dates = {"double": float,
- "decimal": float,
- "string": str,
- "object": object,
- "array": list,
- "bool": bool,
- "int": int,
- "long": int,
- "date": np.dtype('<M8[ns]'),
- "timestamp": np.dtype('<M8[ns]')
- }
- for k, v in mongo_types.items():
- if isinstance(v, list):
- if ("date" in v) or ("timestamp" in v):
- v = "date"
- elif "string" in v:
- v = "string"
- elif ("double" in v) or ("decimal" in v):
- v = "double"
- elif ("null" in v) and (len(v) == 2) and ("int" not in v):
- v = [t for t in v if type != "null"][0]
- else:
- err = "Type {0}: {1} not convertibale".format(k, v)
- self._log.error(err)
- raise Exception(err)
- if v in bson_to_python_types_except_dates:
- python_types[k] = bson_to_python_types_except_dates[v]
- return python_types
- def get_patterns(self) -> dict:
- '''
- '''
- return self._parse(field_info="pattern")
- def get_default_values(self) -> dict:
- '''
- '''
- return self._parse(field_info="default")
- def get_allowed_values(self) -> dict:
- '''
- '''
- return self._parse(field_info="enum")
- def get_maximum_value(self) -> dict:
- '''
- '''
- return self._parse(field_info="maximum")
- def get_minimum_value(self) -> dict:
- '''
- '''
- return self._parse(field_info="minimum")
- def get_max_items(self) -> dict:
- '''
- '''
- return self._parse(field_info="maxItems")
- def get_min_items(self) -> dict:
- '''
- '''
- return self._parse(field_info="minItems")
- def get_field_descriptions(self) -> dict:
- '''
- '''
- return self._parse(field_info="description")
- def _parse(self,
- field_info: str = None,
- required_only: bool = False):
- '''
- '''
- result = self._parse_one(schema=self.schemas[0],
- field_info=field_info,
- required_only=required_only)
- for schema in self.schemas[1:]:
- next_result = self._parse_one(schema=schema,
- field_info=field_info,
- required_only=required_only)
- if isinstance(result, list):
- result.extend(next_result)
- else:
- result.update(next_result)
- return result
- def _parse_one(self,
- schema: dict,
- field_info: str = None,
- required_only: bool = False,
- super_field_name: str = None,
- already_parsed: (list, dict) = None) -> (list, dict):
- '''
- Recursive function that returns a list of (nested) field names or
- a dictionary of (nested) field names with field characteristics.
- :param schema: if None => entire self.schema, or a sub-schema
- of self.schema
- :param field_info: optional, if provided a dictionary of field
- names with field characteristics is returned (for examples
- bsonType of each field), else a list of fields is returned
- :param required_only: when True, only returns fields marked as
- required in the mongo schema
- :param super_field_name: needed for recursion
- Example: the field 'article' has
- subfields 'id' and 'supplier'.
- If we parse the sub-document corresponding to article, then
- super_field_name is'article' and we might get an output like
- {'article.id': string, 'article.supplier': string}
- :param alread_parsed: needed for recursion
- '''
- schema = deepcopy(schema)
- assert(isinstance(schema, dict)),\
- "Parameter 'schema' must be a dict"
- if field_info is None:
- # parse a list of fields
- if already_parsed is None:
- already_parsed = []
- else:
- assert(isinstance(already_parsed, list)),\
- "Parameter 'already_parsed' must be of type list"
- else:
- # parse a dictionary of field names with field characteristics
- if already_parsed is None:
- already_parsed = {}
- else:
- assert(isinstance(already_parsed, dict)),\
- "Parameter 'already_parsed' must be of type dict"
- # If schema is nested, then
- # either it is of bsonType object
- # and the field information is stored under the key 'properties'
- # or it is of bsonType array
- # and the field information is stored in sub-schemas
- # under the key 'items'
- # if schema is of bsonType object
- if "properties" in schema.keys():
- if "required" in schema.keys():
- required_subfields = schema["required"]
- for sub_field_name in schema["properties"].keys():
- sub_schema = schema["properties"][sub_field_name]
- # only process fields that are required
- if required_only and\
- (sub_field_name not in required_subfields):
- pass
- else:
- if super_field_name is not None:
- field_name = '.'.join([super_field_name,
- sub_field_name])
- else:
- field_name = sub_field_name
- # if the given sub-field is nested, parse the
- # sub-schema corresponding to this sub-field
- self._parse_one(
- schema=sub_schema,
- super_field_name=field_name,
- field_info=field_info,
- already_parsed=already_parsed,
- required_only=required_only)
- # if schema is of bsonType array
- elif "items" in schema.keys():
- # one schema for all items
- if isinstance(schema["items"], dict):
- sub_schema = schema["items"]
- self._parse_one(schema=sub_schema,
- super_field_name=super_field_name,
- field_info=field_info,
- already_parsed=already_parsed,
- required_only=required_only)
- # list of separate schemas for each item
- elif isinstance(schema["items"], list):
- for sub_schema in schema["items"]:
- self._parse_one(schema=sub_schema,
- super_field_name=super_field_name,
- field_info=field_info,
- already_parsed=already_parsed,
- required_only=required_only)
- else:
- raise Exception(('Schema is not composed correctly: '
- 'items must be a dictionary or a list'))
- else:
- # If neither properties nor items is in schema keys
- # we reached the last level of nestedness,
- # field information is stored in the schema keys.
- field_name = super_field_name
- if field_info is None:
- already_parsed.append(field_name)
- else:
- if field_info in schema.keys():
- already_parsed[field_name] = schema[field_info]
- else:
- pass
- return already_parsed
- if __name__ == "__main__":
- # Only for testing
- schema_path = os.path.join(".", "mongo_schema", "schema_wheelsets.json")
- if os.path.isfile(schema_path):
- parse_obj = ParseJsonSchema(schema_paths=schema_path)
- fields = parse_obj.get_fields()
- required_fileds = parse_obj.get_required_fields()
- patterns = parse_obj.get_patterns()
- mongo_types = parse_obj.get_mongo_types()
- python_types_except_dates = parse_obj.get_python_types()
- datetime_fields = parse_obj.get_datetime_fields()
- allowed_values = parse_obj.get_allowed_values()
- descriptions = parse_obj.get_field_descriptions()
|