ParseJsonSchema.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Thu Jan 31 11:41:48 2019
  5. @author: tanya
  6. """
  7. import os
  8. import sys
  9. from copy import deepcopy
  10. import numpy as np
  11. import json
  12. import jsonref
  13. from pathlib import Path
  14. sys.path.append(os.getcwd())
  15. from cdplib.db_migration.ParseDbSchema import ParseDbSchema
  16. class ParseJsonSchema(ParseDbSchema):
  17. '''
  18. Class for retrieving column properties from mongodb jsonSchema
  19. '''
  20. def __init__(self, schema_paths: [list, str], log_file: str = None):
  21. '''
  22. '''
  23. from cdplib.log import Log
  24. super().__init__(schema_paths=schema_paths, log_file=log_file)
  25. self._log = Log(name="ParseJsonSchema", log_file=log_file)
  26. # load schemas to dictionaries if they are valid json files
  27. assert(isinstance(schema_paths, (list, str))),\
  28. "Schema paths must be either str or lists"
  29. if isinstance(schema_paths, str):
  30. schema_paths = [schema_paths]
  31. self._schema_paths = schema_paths
  32. self.schemas = []
  33. for schema_path in schema_paths:
  34. try:
  35. with open(schema_path, "r") as f:
  36. self.schemas.append(json.load(f))
  37. # Load schmea dereferenced and cleaned by default values
  38. #self.schemas.append(self.read_schema_and_parse_for_mongodb(schema_path))
  39. except Exception as e:
  40. err = ("Could not load json schema, "
  41. "Obtained error {}".format(e))
  42. self._log.error(err)
  43. raise Exception(err)
  44. @property
  45. def _collection_names(self) -> list:
  46. '''
  47. '''
  48. # Don't use strip() instaed of replace since schema_c.strip(schema_)
  49. # will discard the c as well which is not a appropriate output
  50. return [os.path.basename(p).replace("schema_","").split(".")[0] for p in self._schema_paths]
  51. def get_fields(self) -> list:
  52. '''
  53. '''
  54. return self._parse()
  55. def get_fields_restricted_to_collection(self, collection_name: str) -> list:
  56. '''
  57. '''
  58. schemas = [self.schemas[self._collection_names.index(collection_name)]]
  59. return self._parse(schemas=schemas)
  60. def get_required_fields(self) -> list:
  61. '''
  62. '''
  63. return self._parse(required_only=True)
  64. def get_mongo_types(self) -> dict:
  65. '''
  66. '''
  67. return self._parse(field_info="bsonType")
  68. def get_datetime_fields(self):
  69. '''
  70. '''
  71. mongo_types = self.get_mongo_types()
  72. return [k for k, v in mongo_types.items()
  73. if v in ["date", "timestamp", "Date", "Timestamp"]]
  74. def get_python_types(self) -> dict:
  75. '''
  76. '''
  77. mongo_types = self.get_mongo_types()
  78. python_types = {}
  79. bson_to_python_types = {"double": float,
  80. "decimal": float,
  81. "string": str,
  82. "object": object,
  83. "array": list,
  84. "bool": bool,
  85. "int": int,
  86. "long": int,
  87. "date": np.dtype('<M8[ns]'),
  88. "timestamp": np.dtype('<M8[ns]')
  89. }
  90. for k, v in mongo_types.items():
  91. if isinstance(v, list):
  92. if ("date" in v) or ("timestamp" in v):
  93. v = "date"
  94. elif "string" in v:
  95. v = "string"
  96. elif ("double" in v) or ("decimal" in v):
  97. v = "double"
  98. elif ("null" in v) and (len(v) == 2) and ("int" not in v):
  99. v = [t for t in v if type != "null"][0]
  100. else:
  101. err = "Type {0}: {1} not convertibale".format(k, v)
  102. self._log.error(err)
  103. raise Exception(err)
  104. if v in bson_to_python_types:
  105. python_types[k] = bson_to_python_types[v]
  106. return python_types
  107. def get_patterns(self) -> dict:
  108. '''
  109. '''
  110. return self._parse(field_info="pattern")
  111. def get_default_values(self) -> dict:
  112. '''
  113. '''
  114. return self._parse(field_info="default")
  115. def get_allowed_values(self) -> dict:
  116. '''
  117. '''
  118. return self._parse(field_info="enum")
  119. def get_maximum_value(self) -> dict:
  120. '''
  121. '''
  122. return self._parse(field_info="maximum")
  123. def get_minimum_value(self) -> dict:
  124. '''
  125. '''
  126. return self._parse(field_info="minimum")
  127. def get_max_items(self) -> dict:
  128. '''
  129. '''
  130. return self._parse(field_info="maxItems")
  131. def get_min_items(self) -> dict:
  132. '''
  133. '''
  134. return self._parse(field_info="minItems")
  135. def get_field_descriptions(self) -> dict:
  136. '''
  137. '''
  138. return self._parse(field_info="description")
  139. def _parse(self,
  140. field_info: str = None,
  141. required_only: bool = False,
  142. schemas: list = None):
  143. '''
  144. '''
  145. if schemas is None:
  146. schemas = self.schemas
  147. result = self._parse_one(schema=schemas[0],
  148. field_info=field_info,
  149. required_only=required_only)
  150. for schema in schemas[1:]:
  151. next_result = self._parse_one(schema=schema,
  152. field_info=field_info,
  153. required_only=required_only)
  154. if isinstance(result, list):
  155. result.extend(next_result)
  156. else:
  157. result.update(next_result)
  158. return result
  159. def _parse_one(self,
  160. schema: dict,
  161. field_info: str = None,
  162. required_only: bool = False,
  163. super_field_name: str = None,
  164. already_parsed: (list, dict) = None) -> (list, dict):
  165. '''
  166. Recursive function that returns a list of (nested) field names or
  167. a dictionary of (nested) field names with field characteristics.
  168. :param schema: if None => entire self.schema, or a sub-schema
  169. of self.schema
  170. :param field_info: optional, if provided a dictionary of field
  171. names with field characteristics is returned (for examples
  172. bsonType of each field), else a list of fields is returned
  173. :param required_only: when True, only returns fields marked as
  174. required in the mongo schema
  175. :param super_field_name: needed for recursion
  176. Example: the field 'article' has
  177. subfields 'id' and 'supplier'.
  178. If we parse the sub-document corresponding to article, then
  179. super_field_name is'article' and we might get an output like
  180. {'article.id': string, 'article.supplier': string}
  181. :param alread_parsed: needed for recursion
  182. '''
  183. schema = deepcopy(schema)
  184. assert(isinstance(schema, dict)),\
  185. "Parameter 'schema' must be a dict"
  186. if field_info is None:
  187. # parse a list of fields
  188. if already_parsed is None:
  189. already_parsed = []
  190. else:
  191. assert(isinstance(already_parsed, list)),\
  192. "Parameter 'already_parsed' must be of type list"
  193. else:
  194. # parse a dictionary of field names with field characteristics
  195. if already_parsed is None:
  196. already_parsed = {}
  197. else:
  198. assert(isinstance(already_parsed, dict)),\
  199. "Parameter 'already_parsed' must be of type dict"
  200. # If schema is nested, then
  201. # either it is of bsonType object
  202. # and the field information is stored under the key 'properties'
  203. # or it is of bsonType array
  204. # and the field information is stored in sub-schemas
  205. # under the key 'items'
  206. # if schema is of bsonType object
  207. if "properties" in schema.keys():
  208. if "required" in schema.keys():
  209. required_subfields = schema["required"]
  210. else:
  211. required_subfields = []
  212. for sub_field_name in schema["properties"].keys():
  213. sub_schema = schema["properties"][sub_field_name]
  214. # only process fields that are required
  215. if required_only and\
  216. (sub_field_name not in required_subfields):
  217. pass
  218. else:
  219. if super_field_name is not None:
  220. field_name = '.'.join([super_field_name,
  221. sub_field_name])
  222. else:
  223. field_name = sub_field_name
  224. # if the given sub-field is nested, parse the
  225. # sub-schema corresponding to this sub-field
  226. self._parse_one(
  227. schema=sub_schema,
  228. super_field_name=field_name,
  229. field_info=field_info,
  230. already_parsed=already_parsed,
  231. required_only=required_only)
  232. # if schema is of bsonType array
  233. elif "items" in schema.keys():
  234. # one schema for all items
  235. if isinstance(schema["items"], dict):
  236. sub_schema = schema["items"]
  237. self._parse_one(schema=sub_schema,
  238. super_field_name=super_field_name,
  239. field_info=field_info,
  240. already_parsed=already_parsed,
  241. required_only=required_only)
  242. # list of separate schemas for each item
  243. elif isinstance(schema["items"], list):
  244. for sub_schema in schema["items"]:
  245. self._parse_one(schema=sub_schema,
  246. super_field_name=super_field_name,
  247. field_info=field_info,
  248. already_parsed=already_parsed,
  249. required_only=required_only)
  250. else:
  251. raise Exception(('Schema is not composed correctly: '
  252. 'items must be a dictionary or a list'))
  253. else:
  254. # If neither properties nor items is in schema keys
  255. # we reached the last level of nestedness,
  256. # field information is stored in the schema keys.
  257. field_name = super_field_name
  258. if field_info is None:
  259. already_parsed.append(field_name)
  260. else:
  261. if field_info in schema.keys():
  262. already_parsed[field_name] = schema[field_info]
  263. else:
  264. pass
  265. return already_parsed
  266. def _dereference_schema(self, schema: dict) -> dict:
  267. '''
  268. :param dict schema: dictionary containing a schema which uses references.
  269. '''
  270. assert(isinstance(schema, dict)),\
  271. "Parameter 'schema' must be a dictionary type"
  272. base_dir_url = Path(os.path.join(os.getcwd(), "mongo_schema")).as_uri() + '/'
  273. schema = jsonref.loads(str(schema).replace("'", "\""), base_uri=base_dir_url)
  274. schema = deepcopy(schema)
  275. schema.pop('definitions', None)
  276. return schema
  277. def _remove_defaults(self, schema: dict) -> dict:
  278. '''
  279. :param dict schema: dictionary containing a schema which uses references.
  280. '''
  281. if 'default' in schema:
  282. del schema['default']
  283. if 'default_values' in schema:
  284. del schema['default_values']
  285. return schema
  286. assert(isinstance(schema, dict)),\
  287. "Parameter 'schema' must be a dictionary type"
  288. # Need to parse schmema for importing to mongo db
  289. # Reason:
  290. # We need to drop default values since MongoDB can't handle them
  291. # We need to deference json before import to Mongo DB pymongo can't deal with references
  292. def read_schema_and_parse_for_mongodb(self, schema_path: str) -> dict:
  293. '''
  294. :param str schema_path: path to the schema file.
  295. '''
  296. assert(isinstance(schema_path, str)),\
  297. "Parameter 'schema_path must be a string type"
  298. with open(schema_path) as json_file:
  299. schema = json.load(json_file)
  300. definitions_flag = self._analyze_schema(schema)
  301. if definitions_flag:
  302. schema = self._dereference_schema(schema)
  303. return schema
  304. def _analyze_schema(self, schema: dict, definitions_flag: bool = False) -> dict:
  305. for key in schema:
  306. if key == '$ref':
  307. definitions_flag = True
  308. return definitions_flag
  309. if key == 'default' or key == 'default_values':
  310. return self._remove_defaults(schema)
  311. if type(schema[key]) == dict:
  312. definitions_flag = self._analyze_schema(schema[key], definitions_flag)
  313. return definitions_flag
  314. if __name__ == "__main__":
  315. # Only for testing
  316. schema_path = os.path.join(".", "mongo_schema", "schema_components.json")
  317. if os.path.isfile(schema_path):
  318. parse_obj = ParseJsonSchema(schema_paths=schema_path)
  319. fields = parse_obj.get_fields()
  320. required_fileds = parse_obj.get_required_fields()
  321. patterns = parse_obj.get_patterns()
  322. mongo_types = parse_obj.get_mongo_types()
  323. python_types_except_dates = parse_obj.get_python_types()
  324. datetime_fields = parse_obj.get_datetime_fields()
  325. allowed_values = parse_obj.get_allowed_values()
  326. descriptions = parse_obj.get_field_descriptions()