#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Thu Jan 31 11:41:48 2019 @author: tanya """ import os import sys from copy import deepcopy import numpy as np import json import jsonref from pathlib import Path sys.path.append(os.getcwd()) from cdplib.db_migration.ParseDbSchema import ParseDbSchema class ParseJsonSchema(ParseDbSchema): ''' Class for retrieving column properties from mongodb jsonSchema ''' def __init__(self, schema_paths: [list, str], log_file: str = None): ''' ''' from cdplib.log import Log super().__init__(schema_paths=schema_paths, log_file=log_file) self._log = Log(name="ParseJsonSchema", log_file=log_file) # load schemas to dictionaries if they are valid json files assert(isinstance(schema_paths, (list, str))),\ "Schema paths must be either str or lists" if isinstance(schema_paths, str): schema_paths = [schema_paths] self._schema_paths = schema_paths self.schemas = [] for schema_path in schema_paths: try: with open(schema_path, "r") as f: schema = json.load(f) definitions_flag = self._analyze_schema(schema) if definitions_flag: schema = self._clean_desciptions_tags_from_single_quotes(schema) schema = self._dereference_schema(schema) # Need to do it again since sub schema could also contain # single quotes schema = self._clean_desciptions_tags_from_single_quotes(schema) self.schemas.append(schema) else: self.schemas.append(schema) except Exception as e: err = ("Could not load json schema:{1} , " "Obtained error {0}".format(e,schema_path)) self._log.error(err) raise Exception(err) @property def _collection_names(self) -> list: ''' ''' # Don't use strip() instaed of replace since schema_c.strip(schema_) # will discard the c as well which is not a appropriate output return [os.path.basename(p).replace("schema_","").split(".")[0] for p in self._schema_paths] def get_fields(self) -> list: ''' ''' return self._parse() def get_fields_restricted_to_collection(self, collection_name: str) -> list: ''' ''' schemas = [self.schemas[self._collection_names.index(collection_name)]] return self._parse(schemas=schemas) def get_required_fields(self) -> list: ''' ''' return self._parse(required_only=True) def get_mongo_types(self) -> dict: ''' ''' return self._parse(field_info="bsonType") def get_datetime_fields(self): ''' ''' mongo_types = self.get_mongo_types() return [k for k, v in mongo_types.items() if v in ["date", "timestamp", "Date", "Timestamp"]] def get_python_types(self) -> dict: ''' ''' mongo_types = self.get_mongo_types() python_types = {} bson_to_python_types = {"double": float, "decimal": float, "string": str, "object": object, "array": list, "bool": bool, "int": int, "long": int, "date": np.dtype(' dict: ''' ''' return self._parse(field_info="pattern") def get_default_values(self) -> dict: ''' ''' return self._parse(field_info="default") def get_allowed_values(self) -> dict: ''' ''' return self._parse(field_info="enum") def get_maximum_value(self) -> dict: ''' ''' return self._parse(field_info="maximum") def get_minimum_value(self) -> dict: ''' ''' return self._parse(field_info="minimum") def get_max_items(self) -> dict: ''' ''' return self._parse(field_info="maxItems") def get_min_items(self) -> dict: ''' ''' return self._parse(field_info="minItems") def get_field_descriptions(self) -> dict: ''' ''' return self._parse(field_info="description") def _parse(self, field_info: str = None, required_only: bool = False, schemas: list = None): ''' ''' if schemas is None: schemas = self.schemas result = self._parse_one(schema=schemas[0], field_info=field_info, required_only=required_only) for schema in schemas[1:]: next_result = self._parse_one(schema=schema, field_info=field_info, required_only=required_only) if isinstance(result, list): result.extend(next_result) else: result.update(next_result) return result def _parse_one(self, schema: dict, field_info: str = None, required_only: bool = False, super_field_name: str = None, already_parsed: (list, dict) = None) -> (list, dict): ''' Recursive function that returns a list of (nested) field names or a dictionary of (nested) field names with field characteristics. :param schema: if None => entire self.schema, or a sub-schema of self.schema :param field_info: optional, if provided a dictionary of field names with field characteristics is returned (for examples bsonType of each field), else a list of fields is returned :param required_only: when True, only returns fields marked as required in the mongo schema :param super_field_name: needed for recursion Example: the field 'article' has subfields 'id' and 'supplier'. If we parse the sub-document corresponding to article, then super_field_name is'article' and we might get an output like {'article.id': string, 'article.supplier': string} :param alread_parsed: needed for recursion ''' schema = deepcopy(schema) assert(isinstance(schema, dict)),\ "Parameter 'schema' must be a dict" if field_info is None: # parse a list of fields if already_parsed is None: already_parsed = [] else: assert(isinstance(already_parsed, list)),\ "Parameter 'already_parsed' must be of type list" else: # parse a dictionary of field names with field characteristics if already_parsed is None: already_parsed = {} else: assert(isinstance(already_parsed, dict)),\ "Parameter 'already_parsed' must be of type dict" # If schema is nested, then # either it is of bsonType object # and the field information is stored under the key 'properties' # or it is of bsonType array # and the field information is stored in sub-schemas # under the key 'items' # if schema is of bsonType object if "properties" in schema.keys(): if "required" in schema.keys(): required_subfields = schema["required"] else: required_subfields = [] for sub_field_name in schema["properties"].keys(): sub_schema = schema["properties"][sub_field_name] # only process fields that are required if required_only and\ (sub_field_name not in required_subfields): pass else: if super_field_name is not None: field_name = '.'.join([super_field_name, sub_field_name]) else: field_name = sub_field_name # if the given sub-field is nested, parse the # sub-schema corresponding to this sub-field self._parse_one( schema=sub_schema, super_field_name=field_name, field_info=field_info, already_parsed=already_parsed, required_only=required_only) # if schema is of bsonType array elif "items" in schema.keys(): # one schema for all items if isinstance(schema["items"], dict): sub_schema = schema["items"] self._parse_one(schema=sub_schema, super_field_name=super_field_name, field_info=field_info, already_parsed=already_parsed, required_only=required_only) # list of separate schemas for each item elif isinstance(schema["items"], list): for sub_schema in schema["items"]: self._parse_one(schema=sub_schema, super_field_name=super_field_name, field_info=field_info, already_parsed=already_parsed, required_only=required_only) else: raise Exception(('Schema is not composed correctly: ' 'items must be a dictionary or a list')) else: # If neither properties nor items is in schema keys # we reached the last level of nestedness, # field information is stored in the schema keys. field_name = super_field_name if field_info is None: already_parsed.append(field_name) else: if field_info in schema.keys(): already_parsed[field_name] = schema[field_info] else: pass return already_parsed def load_and_parse_schema_for_mongodb(self, schema_path: str) -> dict: ''' We need to deference json before import to Mongo DB pymongo can't deal with references :param str schema_path: path to the schema file. ''' assert(isinstance(schema_path, str)),\ "Parameter 'schema_path must be a string type" with open(schema_path) as json_file: schema = json.load(json_file) definitions_flag = self._analyze_schema(schema) if definitions_flag: schema = self._clean_desciptions_tags_from_single_quotes(schema) schema = self._dereference_schema(schema) # Need to do it again since sub schema could also contain # single quotes schema = self._clean_desciptions_tags_from_single_quotes(schema) schema = self._format_schema_for_mongo(schema) else: schema = self._format_schema_for_mongo(schema) return schema def _analyze_schema (self, schema: dict, definitions_flag: bool = False) -> dict: for key in list(schema): if key == '$ref': definitions_flag = True return definitions_flag if type(schema[key]) == dict: definitions_flag = self._analyze_schema(schema[key], definitions_flag) return definitions_flag def _clean_desciptions_tags_from_single_quotes(self, schema: dict) -> dict: ''' :param dict schema: dictonary containing schema ''' for key in list(schema): if key == 'description': cleaned_description = self._remove_single_quotes_from_description_tag(schema[key]) schema[key] = cleaned_description if type(schema[key]) == dict: self._clean_desciptions_tags_from_single_quotes(schema[key]) return schema def _format_schema_for_mongo(self, schema: dict) -> dict: ''' We use in the schema tags whih are not supported by mongo an threfore must be taken care of before setting the schema for mongo. :param str schema_path: path to the schema file. ''' for key in list(schema): if type(schema[key]) == dict: self._format_schema_for_mongo(schema[key]) if key == 'default' or key == 'default_values': self._remove_defaults(schema) if key == 'examples': self._remove_examples(schema) return schema def _dereference_schema(self, schema: dict) -> dict: ''' :param dict schema: dictionary containing a schema which uses references. ''' assert(isinstance(schema, dict)),\ "Parameter 'schema' must be a dictionary type" base_dir_url = Path(os.path.join(os.getcwd(), "mongo_schema")).as_uri() + '/' # json.load(f) convert double quotes into singel quotes. jsonref expects # the json in string format with double quotes. schema = str(schema).replace("'", "\"") schema = jsonref.loads(schema, base_uri=base_dir_url) schema = deepcopy(schema) return schema def _remove_defaults(self, schema: dict) -> dict: ''' :param dict schema: dictionary containing a schema which uses 'default' tags. ''' assert(isinstance(schema, dict)),\ "Parameter 'schema' must be a dictionary type" if 'default' in schema: del schema['default'] if 'default_values' in schema: del schema['default_values'] def _remove_examples(self, schema: dict) -> dict: ''' :param dict schema: dictionary containing a schema with 'examples' tags. ''' assert(isinstance(schema, dict)),\ "Parameter 'schema' must be a dictionary type" if 'examples' in schema: del schema['examples'] assert(isinstance(schema, dict)),\ "Parameter 'schema' must be a dictionary type" def _remove_single_quotes_from_description_tag(self, description: str) -> str: ''' :param dict schema: dictionary containing a schema with 'examples' tags. ''' assert(isinstance(description, str)),\ "Parameter 'description' must be a string type" description = description.replace("'", "") return description if __name__ == "__main__": # Only for testing schema_path = os.path.join(".", "mongo_schema", "schema_components.json") if os.path.isfile(schema_path): parse_obj = ParseJsonSchema(schema_paths=schema_path) fields = parse_obj.get_fields() required_fileds = parse_obj.get_required_fields() patterns = parse_obj.get_patterns() mongo_types = parse_obj.get_mongo_types() python_types_except_dates = parse_obj.get_python_types() datetime_fields = parse_obj.get_datetime_fields() allowed_values = parse_obj.get_allowed_values() descriptions = parse_obj.get_field_descriptions()