#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Thu Jan 31 11:41:48 2019 @author: tanya """ import os import sys from copy import deepcopy import numpy as np sys.path.append(os.getcwd()) from libraries.db_migration.ParseDbSchema import ParseDbSchema class ParseJsonSchema(ParseDbSchema): ''' Class for retrieving column properties from mongodb jsonSchema ''' def __init__(self, schema_paths: [list, str], log_file: str = None): ''' ''' import json from libraries.log import Log super().__init__(schema_paths=schema_paths, log_file=log_file) self._log = Log(name="ParseJsonSchema", log_file=log_file) # load schemas to dictionaries if they are valid json files assert(isinstance(schema_paths, (list, str))),\ "Schema paths must be either str or lists" if isinstance(schema_paths, str): schema_paths = [schema_paths] self.schemas = [] for schema_path in schema_paths: try: with open(schema_path, "r") as f: self.schemas.append(json.load(f)) except Exception as e: err = ("Could not load json schema, " "Obtained error {}".format(e)) self._log.error(err) raise Exception(err) def get_fields(self) -> list: ''' ''' return self._parse() def get_required_fields(self) -> list: ''' ''' return self._parse(required_only=True) def get_mongo_types(self) -> dict: ''' ''' return self._parse(field_info="bsonType") def get_datetime_fields(self): ''' ''' mongo_types = self.get_mongo_types() return [k for k, v in mongo_types.items() if v in ["date", "timestamp", "Date", "Timestamp"]] def get_python_types(self) -> dict: ''' ''' mongo_types = self.get_mongo_types() python_types = {} bson_to_python_types_except_dates = {"double": float, "decimal": float, "string": str, "object": object, "array": list, "bool": bool, "int": int, "long": int, "date": np.dtype(' dict: ''' ''' return self._parse(field_info="pattern") def get_default_values(self) -> dict: ''' ''' return self._parse(field_info="default") def get_allowed_values(self) -> dict: ''' ''' return self._parse(field_info="enum") def get_maximum_value(self) -> dict: ''' ''' return self._parse(field_info="maximum") def get_minimum_value(self) -> dict: ''' ''' return self._parse(field_info="minimum") def get_max_items(self) -> dict: ''' ''' return self._parse(field_info="maxItems") def get_min_items(self) -> dict: ''' ''' return self._parse(field_info="minItems") def get_field_descriptions(self) -> dict: ''' ''' return self._parse(field_info="description") def _parse(self, field_info: str = None, required_only: bool = False): ''' ''' result = self._parse_one(schema=self.schemas[0], field_info=field_info, required_only=required_only) for schema in self.schemas[1:]: next_result = self._parse_one(schema=schema, field_info=field_info, required_only=required_only) if isinstance(result, list): result.extend(next_result) else: result.update(next_result) return result def _parse_one(self, schema: dict, field_info: str = None, required_only: bool = False, super_field_name: str = None, already_parsed: (list, dict) = None) -> (list, dict): ''' Recursive function that returns a list of (nested) field names or a dictionary of (nested) field names with field characteristics. :param schema: if None => entire self.schema, or a sub-schema of self.schema :param field_info: optional, if provided a dictionary of field names with field characteristics is returned (for examples bsonType of each field), else a list of fields is returned :param required_only: when True, only returns fields marked as required in the mongo schema :param super_field_name: needed for recursion Example: the field 'article' has subfields 'id' and 'supplier'. If we parse the sub-document corresponding to article, then super_field_name is'article' and we might get an output like {'article.id': string, 'article.supplier': string} :param alread_parsed: needed for recursion ''' schema = deepcopy(schema) assert(isinstance(schema, dict)),\ "Parameter 'schema' must be a dict" if field_info is None: # parse a list of fields if already_parsed is None: already_parsed = [] else: assert(isinstance(already_parsed, list)),\ "Parameter 'already_parsed' must be of type list" else: # parse a dictionary of field names with field characteristics if already_parsed is None: already_parsed = {} else: assert(isinstance(already_parsed, dict)),\ "Parameter 'already_parsed' must be of type dict" # If schema is nested, then # either it is of bsonType object # and the field information is stored under the key 'properties' # or it is of bsonType array # and the field information is stored in sub-schemas # under the key 'items' # if schema is of bsonType object if "properties" in schema.keys(): if "required" in schema.keys(): required_subfields = schema["required"] for sub_field_name in schema["properties"].keys(): sub_schema = schema["properties"][sub_field_name] # only process fields that are required if required_only and\ (sub_field_name not in required_subfields): pass else: if super_field_name is not None: field_name = '.'.join([super_field_name, sub_field_name]) else: field_name = sub_field_name # if the given sub-field is nested, parse the # sub-schema corresponding to this sub-field self._parse_one( schema=sub_schema, super_field_name=field_name, field_info=field_info, already_parsed=already_parsed, required_only=required_only) # if schema is of bsonType array elif "items" in schema.keys(): # one schema for all items if isinstance(schema["items"], dict): sub_schema = schema["items"] self._parse_one(schema=sub_schema, super_field_name=super_field_name, field_info=field_info, already_parsed=already_parsed, required_only=required_only) # list of separate schemas for each item elif isinstance(schema["items"], list): for sub_schema in schema["items"]: self._parse_one(schema=sub_schema, super_field_name=super_field_name, field_info=field_info, already_parsed=already_parsed, required_only=required_only) else: raise Exception(('Schema is not composed correctly: ' 'items must be a dictionary or a list')) else: # If neither properties nor items is in schema keys # we reached the last level of nestedness, # field information is stored in the schema keys. field_name = super_field_name if field_info is None: already_parsed.append(field_name) else: if field_info in schema.keys(): already_parsed[field_name] = schema[field_info] else: pass return already_parsed if __name__ == "__main__": # Only for testing schema_path = os.path.join(".", "mongo_schema", "schema_wheelsets.json") if os.path.isfile(schema_path): parse_obj = ParseJsonSchema(schema_paths=schema_path) fields = parse_obj.get_fields() required_fileds = parse_obj.get_required_fields() patterns = parse_obj.get_patterns() mongo_types = parse_obj.get_mongo_types() python_types_except_dates = parse_obj.get_python_types() datetime_fields = parse_obj.get_datetime_fields() allowed_values = parse_obj.get_allowed_values() descriptions = parse_obj.get_field_descriptions()