ParseJsonSchema.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Thu Jan 31 11:41:48 2019
  5. @author: tanya
  6. """
  7. import os
  8. import sys
  9. from copy import deepcopy
  10. import numpy as np
  11. import json
  12. import jsonref
  13. from pathlib import Path
  14. sys.path.append(os.getcwd())
  15. from cdplib.db_migration.ParseDbSchema import ParseDbSchema
  16. class ParseJsonSchema(ParseDbSchema):
  17. '''
  18. Class for retrieving column properties from mongodb jsonSchema
  19. '''
  20. def __init__(self, schema_paths: [list, str], log_file: str = None):
  21. '''
  22. '''
  23. from cdplib.log import Log
  24. super().__init__(schema_paths=schema_paths, log_file=log_file)
  25. self._log = Log(name="ParseJsonSchema", log_file=log_file)
  26. # load schemas to dictionaries if they are valid json files
  27. assert(isinstance(schema_paths, (list, str))),\
  28. "Schema paths must be either str or lists"
  29. if isinstance(schema_paths, str):
  30. schema_paths = [schema_paths]
  31. self._schema_paths = schema_paths
  32. self.schemas = []
  33. for schema_path in schema_paths:
  34. try:
  35. with open(schema_path, "r") as f:
  36. schema = json.load(f)
  37. definitions_flag = self._analyze_schema(schema)
  38. if definitions_flag:
  39. schema = self._clean_desciptions_tags_from_single_quotes(schema)
  40. schema = self._dereference_schema(schema)
  41. # Need to do it again since sub schema could also contain
  42. # single quotes
  43. schema = self._clean_desciptions_tags_from_single_quotes(schema)
  44. self.schemas.append(schema)
  45. else:
  46. self.schemas.append(schema)
  47. except Exception as e:
  48. err = ("Could not load json schema:{1} , "
  49. "Obtained error {0}".format(e,schema_path))
  50. self._log.error(err)
  51. raise Exception(err)
  52. @property
  53. def _collection_names(self) -> list:
  54. '''
  55. '''
  56. # Don't use strip() instaed of replace since schema_c.strip(schema_)
  57. # will discard the c as well which is not a appropriate output
  58. return [os.path.basename(p).replace("schema_","").split(".")[0] for p in self._schema_paths]
  59. def get_fields(self) -> list:
  60. '''
  61. '''
  62. return self._parse()
  63. def get_fields_restricted_to_collection(self, collection_name: str) -> list:
  64. '''
  65. '''
  66. schemas = [self.schemas[self._collection_names.index(collection_name)]]
  67. return self._parse(schemas=schemas)
  68. def get_required_fields(self) -> list:
  69. '''
  70. '''
  71. return self._parse(required_only=True)
  72. def get_mongo_types(self) -> dict:
  73. '''
  74. '''
  75. return self._parse(field_info="bsonType")
  76. def get_datetime_fields(self):
  77. '''
  78. '''
  79. mongo_types = self.get_mongo_types()
  80. return [k for k, v in mongo_types.items()
  81. if v in ["date", "timestamp", "Date", "Timestamp"]]
  82. def get_python_types(self) -> dict:
  83. '''
  84. '''
  85. mongo_types = self.get_mongo_types()
  86. python_types = {}
  87. bson_to_python_types = {"double": float,
  88. "decimal": float,
  89. "string": str,
  90. "object": object,
  91. "array": list,
  92. "bool": bool,
  93. "int": int,
  94. "long": int,
  95. "date": np.dtype('<M8[ns]'),
  96. "timestamp": np.dtype('<M8[ns]')
  97. }
  98. for k, v in mongo_types.items():
  99. if isinstance(v, list):
  100. if ("date" in v) or ("timestamp" in v):
  101. v = "date"
  102. elif "string" in v:
  103. v = "string"
  104. elif ("double" in v) or ("decimal" in v):
  105. v = "double"
  106. elif ("null" in v) and (len(v) == 2) and ("int" not in v):
  107. v = [t for t in v if type != "null"][0]
  108. else:
  109. err = "Type {0}: {1} not convertibale".format(k, v)
  110. self._log.error(err)
  111. raise Exception(err)
  112. if v in bson_to_python_types:
  113. python_types[k] = bson_to_python_types[v]
  114. return python_types
  115. def get_patterns(self) -> dict:
  116. '''
  117. '''
  118. return self._parse(field_info="pattern")
  119. def get_default_values(self) -> dict:
  120. '''
  121. '''
  122. return self._parse(field_info="default")
  123. def get_allowed_values(self) -> dict:
  124. '''
  125. '''
  126. return self._parse(field_info="enum")
  127. def get_maximum_value(self) -> dict:
  128. '''
  129. '''
  130. return self._parse(field_info="maximum")
  131. def get_minimum_value(self) -> dict:
  132. '''
  133. '''
  134. return self._parse(field_info="minimum")
  135. def get_max_items(self) -> dict:
  136. '''
  137. '''
  138. return self._parse(field_info="maxItems")
  139. def get_min_items(self) -> dict:
  140. '''
  141. '''
  142. return self._parse(field_info="minItems")
  143. def get_field_descriptions(self) -> dict:
  144. '''
  145. '''
  146. return self._parse(field_info="description")
  147. def _parse(self,
  148. field_info: str = None,
  149. required_only: bool = False,
  150. schemas: list = None):
  151. '''
  152. '''
  153. if schemas is None:
  154. schemas = self.schemas
  155. result = self._parse_one(schema=schemas[0],
  156. field_info=field_info,
  157. required_only=required_only)
  158. for schema in schemas[1:]:
  159. next_result = self._parse_one(schema=schema,
  160. field_info=field_info,
  161. required_only=required_only)
  162. if isinstance(result, list):
  163. result.extend(next_result)
  164. else:
  165. result.update(next_result)
  166. return result
  167. def _parse_one(self,
  168. schema: dict,
  169. field_info: str = None,
  170. required_only: bool = False,
  171. super_field_name: str = None,
  172. already_parsed: (list, dict) = None) -> (list, dict):
  173. '''
  174. Recursive function that returns a list of (nested) field names or
  175. a dictionary of (nested) field names with field characteristics.
  176. :param schema: if None => entire self.schema, or a sub-schema
  177. of self.schema
  178. :param field_info: optional, if provided a dictionary of field
  179. names with field characteristics is returned (for examples
  180. bsonType of each field), else a list of fields is returned
  181. :param required_only: when True, only returns fields marked as
  182. required in the mongo schema
  183. :param super_field_name: needed for recursion
  184. Example: the field 'article' has
  185. subfields 'id' and 'supplier'.
  186. If we parse the sub-document corresponding to article, then
  187. super_field_name is'article' and we might get an output like
  188. {'article.id': string, 'article.supplier': string}
  189. :param alread_parsed: needed for recursion
  190. '''
  191. schema = deepcopy(schema)
  192. assert(isinstance(schema, dict)),\
  193. "Parameter 'schema' must be a dict"
  194. if field_info is None:
  195. # parse a list of fields
  196. if already_parsed is None:
  197. already_parsed = []
  198. else:
  199. assert(isinstance(already_parsed, list)),\
  200. "Parameter 'already_parsed' must be of type list"
  201. else:
  202. # parse a dictionary of field names with field characteristics
  203. if already_parsed is None:
  204. already_parsed = {}
  205. else:
  206. assert(isinstance(already_parsed, dict)),\
  207. "Parameter 'already_parsed' must be of type dict"
  208. # If schema is nested, then
  209. # either it is of bsonType object
  210. # and the field information is stored under the key 'properties'
  211. # or it is of bsonType array
  212. # and the field information is stored in sub-schemas
  213. # under the key 'items'
  214. # if schema is of bsonType object
  215. if "properties" in schema.keys():
  216. if "required" in schema.keys():
  217. required_subfields = schema["required"]
  218. else:
  219. required_subfields = []
  220. for sub_field_name in schema["properties"].keys():
  221. sub_schema = schema["properties"][sub_field_name]
  222. # only process fields that are required
  223. if required_only and\
  224. (sub_field_name not in required_subfields):
  225. pass
  226. else:
  227. if super_field_name is not None:
  228. field_name = '.'.join([super_field_name,
  229. sub_field_name])
  230. else:
  231. field_name = sub_field_name
  232. # if the given sub-field is nested, parse the
  233. # sub-schema corresponding to this sub-field
  234. self._parse_one(
  235. schema=sub_schema,
  236. super_field_name=field_name,
  237. field_info=field_info,
  238. already_parsed=already_parsed,
  239. required_only=required_only)
  240. # if schema is of bsonType array
  241. elif "items" in schema.keys():
  242. # one schema for all items
  243. if isinstance(schema["items"], dict):
  244. sub_schema = schema["items"]
  245. self._parse_one(schema=sub_schema,
  246. super_field_name=super_field_name,
  247. field_info=field_info,
  248. already_parsed=already_parsed,
  249. required_only=required_only)
  250. # list of separate schemas for each item
  251. elif isinstance(schema["items"], list):
  252. for sub_schema in schema["items"]:
  253. self._parse_one(schema=sub_schema,
  254. super_field_name=super_field_name,
  255. field_info=field_info,
  256. already_parsed=already_parsed,
  257. required_only=required_only)
  258. else:
  259. raise Exception(('Schema is not composed correctly: '
  260. 'items must be a dictionary or a list'))
  261. else:
  262. # If neither properties nor items is in schema keys
  263. # we reached the last level of nestedness,
  264. # field information is stored in the schema keys.
  265. field_name = super_field_name
  266. if field_info is None:
  267. already_parsed.append(field_name)
  268. else:
  269. if field_info in schema.keys():
  270. already_parsed[field_name] = schema[field_info]
  271. else:
  272. pass
  273. return already_parsed
  274. def load_and_parse_schema_for_mongodb(self, schema_path: str) -> dict:
  275. '''
  276. We need to deference json before import to Mongo DB pymongo can't deal with references
  277. :param str schema_path: path to the schema file.
  278. '''
  279. assert(isinstance(schema_path, str)),\
  280. "Parameter 'schema_path must be a string type"
  281. with open(schema_path) as json_file:
  282. schema = json.load(json_file)
  283. definitions_flag = self._analyze_schema(schema)
  284. if definitions_flag:
  285. schema = self._clean_desciptions_tags_from_single_quotes(schema)
  286. schema = self._dereference_schema(schema)
  287. # Need to do it again since sub schema could also contain
  288. # single quotes
  289. schema = self._clean_desciptions_tags_from_single_quotes(schema)
  290. schema = self._format_schema_for_mongo(schema)
  291. else:
  292. schema = self._format_schema_for_mongo(schema)
  293. return schema
  294. def _analyze_schema (self, schema: dict, definitions_flag: bool = False) -> dict:
  295. for key in list(schema):
  296. if key == '$ref':
  297. definitions_flag = True
  298. return definitions_flag
  299. if type(schema[key]) == dict:
  300. definitions_flag = self._analyze_schema(schema[key], definitions_flag)
  301. return definitions_flag
  302. def _clean_desciptions_tags_from_single_quotes(self, schema: dict) -> dict:
  303. '''
  304. :param dict schema: dictonary containing schema
  305. '''
  306. for key in list(schema):
  307. if key == 'description':
  308. cleaned_description = self._remove_single_quotes_from_description_tag(schema[key])
  309. schema[key] = cleaned_description
  310. if type(schema[key]) == dict:
  311. self._clean_desciptions_tags_from_single_quotes(schema[key])
  312. return schema
  313. def _format_schema_for_mongo(self, schema: dict) -> dict:
  314. '''
  315. We use in the schema tags whih are not supported by mongo an threfore
  316. must be taken care of before setting the schema for mongo.
  317. :param str schema_path: path to the schema file.
  318. '''
  319. for key in list(schema):
  320. if type(schema[key]) == dict:
  321. self._format_schema_for_mongo(schema[key])
  322. if key == 'default' or key == 'default_values':
  323. self._remove_defaults(schema)
  324. if key == 'examples':
  325. self._remove_examples(schema)
  326. return schema
  327. def _dereference_schema(self, schema: dict) -> dict:
  328. '''
  329. :param dict schema: dictionary containing a schema which uses references.
  330. '''
  331. assert(isinstance(schema, dict)),\
  332. "Parameter 'schema' must be a dictionary type"
  333. base_dir_url = Path(os.path.join(os.getcwd(), "mongo_schema")).as_uri() + '/'
  334. # json.load(f) convert double quotes into singel quotes. jsonref expects
  335. # the json in string format with double quotes.
  336. schema = str(schema).replace("'", "\"")
  337. schema = jsonref.loads(schema, base_uri=base_dir_url)
  338. schema = deepcopy(schema)
  339. return schema
  340. def _remove_defaults(self, schema: dict) -> dict:
  341. '''
  342. :param dict schema: dictionary containing a schema which uses 'default' tags.
  343. '''
  344. assert(isinstance(schema, dict)),\
  345. "Parameter 'schema' must be a dictionary type"
  346. if 'default' in schema:
  347. del schema['default']
  348. if 'default_values' in schema:
  349. del schema['default_values']
  350. def _remove_examples(self, schema: dict) -> dict:
  351. '''
  352. :param dict schema: dictionary containing a schema with 'examples' tags.
  353. '''
  354. assert(isinstance(schema, dict)),\
  355. "Parameter 'schema' must be a dictionary type"
  356. if 'examples' in schema:
  357. del schema['examples']
  358. assert(isinstance(schema, dict)),\
  359. "Parameter 'schema' must be a dictionary type"
  360. def _remove_single_quotes_from_description_tag(self, description: str) -> str:
  361. '''
  362. :param dict schema: dictionary containing a schema with 'examples' tags.
  363. '''
  364. assert(isinstance(description, str)),\
  365. "Parameter 'description' must be a string type"
  366. description = description.replace("'", "")
  367. return description
  368. if __name__ == "__main__":
  369. # Only for testing
  370. schema_path = os.path.join(".", "mongo_schema", "schema_components.json")
  371. if os.path.isfile(schema_path):
  372. parse_obj = ParseJsonSchema(schema_paths=schema_path)
  373. fields = parse_obj.get_fields()
  374. required_fileds = parse_obj.get_required_fields()
  375. patterns = parse_obj.get_patterns()
  376. mongo_types = parse_obj.get_mongo_types()
  377. python_types_except_dates = parse_obj.get_python_types()
  378. datetime_fields = parse_obj.get_datetime_fields()
  379. allowed_values = parse_obj.get_allowed_values()
  380. descriptions = parse_obj.get_field_descriptions()