ParseJsonSchema.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Thu Jan 31 11:41:48 2019
  5. @author: tanya
  6. """
  7. import os
  8. import sys
  9. from copy import deepcopy
  10. import numpy as np
  11. import json
  12. import jsonref
  13. from pathlib import Path
  14. sys.path.append(os.getcwd())
  15. from cdplib.db_migration.ParseDbSchema import ParseDbSchema
  16. class ParseJsonSchema(ParseDbSchema):
  17. '''
  18. Class for retrieving column properties from mongodb jsonSchema
  19. '''
  20. # def __init__(self, schema_paths: [list, str], log_file: str = None):
  21. def __init__(self, schema_paths, log_file: str = None):
  22. '''
  23. '''
  24. from cdplib.log import Log
  25. super().__init__(schema_paths=schema_paths, log_file=log_file)
  26. self._log = Log(name="ParseJsonSchema", log_file=log_file)
  27. # load schemas to dictionaries if they are valid json files
  28. assert(isinstance(schema_paths, (list, str))),\
  29. "Schema paths must be either str or lists"
  30. if isinstance(schema_paths, str):
  31. schema_paths = [schema_paths]
  32. self._schema_paths = schema_paths
  33. self.schemas = []
  34. for schema_path in schema_paths:
  35. try:
  36. with open(schema_path, "r") as f:
  37. schema = json.load(f)
  38. definitions_flag = self._analyze_schema(schema)
  39. if definitions_flag:
  40. schema = self._clean_desciptions_tags_from_single_quotes(schema)
  41. schema = self._dereference_schema(schema)
  42. # Need to do it again since sub schema could also contain
  43. # single quotes
  44. schema = self._clean_desciptions_tags_from_single_quotes(schema)
  45. self.schemas.append(schema)
  46. else:
  47. self.schemas.append(schema)
  48. except Exception as e:
  49. err = ("Could not load json schema:{1} , "
  50. "Obtained error {0}".format(e,schema_path))
  51. self._log.error(err)
  52. raise Exception(err)
  53. @property
  54. def _collection_names(self) -> list:
  55. '''
  56. '''
  57. # Don't use strip() instaed of replace since schema_c.strip(schema_)
  58. # will discard the c as well which is not a appropriate output
  59. return [os.path.basename(p).replace("schema_","").split(".")[0] for p in self._schema_paths]
  60. def get_fields(self) -> list:
  61. '''
  62. '''
  63. return self._parse()
  64. def get_fields_restricted_to_collection(self, collection_name: str) -> list:
  65. '''
  66. '''
  67. schemas = [self.schemas[self._collection_names.index(collection_name)]]
  68. return self._parse(schemas=schemas)
  69. def get_required_fields(self) -> list:
  70. '''
  71. '''
  72. return self._parse(required_only=True)
  73. def get_mongo_types(self) -> dict:
  74. '''
  75. '''
  76. return self._parse(field_info="bsonType")
  77. def get_datetime_fields(self):
  78. '''
  79. '''
  80. mongo_types = self.get_mongo_types()
  81. return [k for k, v in mongo_types.items()
  82. if v in ["date", "timestamp", "Date", "Timestamp"]]
  83. def get_python_types(self) -> dict:
  84. '''
  85. '''
  86. mongo_types = self.get_mongo_types()
  87. python_types = {}
  88. bson_to_python_types = {"double": float,
  89. "decimal": float,
  90. "string": str,
  91. "object": object,
  92. "array": list,
  93. "bool": bool,
  94. "int": int,
  95. "long": int,
  96. "date": np.dtype('<M8[ns]'),
  97. "timestamp": np.dtype('<M8[ns]')
  98. }
  99. for k, v in mongo_types.items():
  100. if isinstance(v, list):
  101. if ("date" in v) or ("timestamp" in v):
  102. v = "date"
  103. elif "string" in v:
  104. v = "string"
  105. elif ("double" in v) or ("decimal" in v):
  106. v = "double"
  107. elif ("null" in v) and (len(v) == 2) and ("int" not in v):
  108. v = [t for t in v if type != "null"][0]
  109. else:
  110. err = "Type {0}: {1} not convertibale".format(k, v)
  111. self._log.error(err)
  112. raise Exception(err)
  113. if v in bson_to_python_types:
  114. python_types[k] = bson_to_python_types[v]
  115. return python_types
  116. def get_patterns(self) -> dict:
  117. '''
  118. '''
  119. return self._parse(field_info="pattern")
  120. def get_default_values(self) -> dict:
  121. '''
  122. '''
  123. return self._parse(field_info="default")
  124. def get_allowed_values(self) -> dict:
  125. '''
  126. '''
  127. return self._parse(field_info="enum")
  128. def get_maximum_value(self) -> dict:
  129. '''
  130. '''
  131. return self._parse(field_info="maximum")
  132. def get_minimum_value(self) -> dict:
  133. '''
  134. '''
  135. return self._parse(field_info="minimum")
  136. def get_max_items(self) -> dict:
  137. '''
  138. '''
  139. return self._parse(field_info="maxItems")
  140. def get_min_items(self) -> dict:
  141. '''
  142. '''
  143. return self._parse(field_info="minItems")
  144. def get_field_descriptions(self) -> dict:
  145. '''
  146. '''
  147. return self._parse(field_info="description")
  148. def _parse(self,
  149. field_info: str = None,
  150. required_only: bool = False,
  151. schemas: list = None):
  152. '''
  153. '''
  154. if schemas is None:
  155. schemas = self.schemas
  156. result = self._parse_one(schema=schemas[0],
  157. field_info=field_info,
  158. required_only=required_only)
  159. for schema in schemas[1:]:
  160. next_result = self._parse_one(schema=schema,
  161. field_info=field_info,
  162. required_only=required_only)
  163. if isinstance(result, list):
  164. result.extend(next_result)
  165. else:
  166. result.update(next_result)
  167. return result
  168. # def _parse_one(self,
  169. # schema: dict,
  170. # field_info: str = None,
  171. # required_only: bool = False,
  172. # super_field_name: str = None,
  173. # already_parsed: (list, dict) = None) -> (list, dict):
  174. def _parse_one(self,
  175. schema: dict,
  176. field_info: str = None,
  177. required_only: bool = False,
  178. super_field_name: str = None,
  179. already_parsed = None):
  180. '''
  181. Recursive function that returns a list of (nested) field names or
  182. a dictionary of (nested) field names with field characteristics.
  183. :param schema: if None => entire self.schema, or a sub-schema
  184. of self.schema
  185. :param field_info: optional, if provided a dictionary of field
  186. names with field characteristics is returned (for examples
  187. bsonType of each field), else a list of fields is returned
  188. :param required_only: when True, only returns fields marked as
  189. required in the mongo schema
  190. :param super_field_name: needed for recursion
  191. Example: the field 'article' has
  192. subfields 'id' and 'supplier'.
  193. If we parse the sub-document corresponding to article, then
  194. super_field_name is'article' and we might get an output like
  195. {'article.id': string, 'article.supplier': string}
  196. :param alread_parsed: needed for recursion
  197. '''
  198. schema = deepcopy(schema)
  199. assert(isinstance(schema, dict)),\
  200. "Parameter 'schema' must be a dict"
  201. if field_info is None:
  202. # parse a list of fields
  203. if already_parsed is None:
  204. already_parsed = []
  205. else:
  206. assert(isinstance(already_parsed, list)),\
  207. "Parameter 'already_parsed' must be of type list"
  208. else:
  209. # parse a dictionary of field names with field characteristics
  210. if already_parsed is None:
  211. already_parsed = {}
  212. else:
  213. assert(isinstance(already_parsed, dict)),\
  214. "Parameter 'already_parsed' must be of type dict"
  215. # If schema is nested, then
  216. # either it is of bsonType object
  217. # and the field information is stored under the key 'properties'
  218. # or it is of bsonType array
  219. # and the field information is stored in sub-schemas
  220. # under the key 'items'
  221. # if schema is of bsonType object
  222. if "properties" in schema.keys():
  223. if "required" in schema.keys():
  224. required_subfields = schema["required"]
  225. else:
  226. required_subfields = []
  227. for sub_field_name in schema["properties"].keys():
  228. sub_schema = schema["properties"][sub_field_name]
  229. # only process fields that are required
  230. if required_only and\
  231. (sub_field_name not in required_subfields):
  232. pass
  233. else:
  234. if super_field_name is not None:
  235. field_name = '.'.join([super_field_name,
  236. sub_field_name])
  237. else:
  238. field_name = sub_field_name
  239. # if the given sub-field is nested, parse the
  240. # sub-schema corresponding to this sub-field
  241. self._parse_one(
  242. schema=sub_schema,
  243. super_field_name=field_name,
  244. field_info=field_info,
  245. already_parsed=already_parsed,
  246. required_only=required_only)
  247. # if schema is of bsonType array
  248. elif "items" in schema.keys():
  249. # one schema for all items
  250. if isinstance(schema["items"], dict):
  251. sub_schema = schema["items"]
  252. self._parse_one(schema=sub_schema,
  253. super_field_name=super_field_name,
  254. field_info=field_info,
  255. already_parsed=already_parsed,
  256. required_only=required_only)
  257. # list of separate schemas for each item
  258. elif isinstance(schema["items"], list):
  259. for sub_schema in schema["items"]:
  260. self._parse_one(schema=sub_schema,
  261. super_field_name=super_field_name,
  262. field_info=field_info,
  263. already_parsed=already_parsed,
  264. required_only=required_only)
  265. else:
  266. raise Exception(('Schema is not composed correctly: '
  267. 'items must be a dictionary or a list'))
  268. else:
  269. # If neither properties nor items is in schema keys
  270. # we reached the last level of nestedness,
  271. # field information is stored in the schema keys.
  272. field_name = super_field_name
  273. if field_info is None:
  274. already_parsed.append(field_name)
  275. else:
  276. if field_info in schema.keys():
  277. already_parsed[field_name] = schema[field_info]
  278. else:
  279. pass
  280. return already_parsed
  281. def load_and_parse_schema_for_mongodb(self, schema_path: str) -> dict:
  282. '''
  283. We need to deference json before import to Mongo DB pymongo can't deal with references
  284. :param str schema_path: path to the schema file.
  285. '''
  286. assert(isinstance(schema_path, str)),\
  287. "Parameter 'schema_path must be a string type"
  288. with open(schema_path) as json_file:
  289. schema = json.load(json_file)
  290. definitions_flag = self._analyze_schema(schema)
  291. if definitions_flag:
  292. schema = self._clean_desciptions_tags_from_single_quotes(schema)
  293. schema = self._dereference_schema(schema)
  294. # Need to do it again since sub schema could also contain
  295. # single quotes
  296. schema = self._clean_desciptions_tags_from_single_quotes(schema)
  297. schema = self._format_schema_for_mongo(schema)
  298. else:
  299. schema = self._format_schema_for_mongo(schema)
  300. return schema
  301. def _analyze_schema (self, schema: dict, definitions_flag: bool = False) -> dict:
  302. for key in list(schema):
  303. if key == '$ref':
  304. definitions_flag = True
  305. return definitions_flag
  306. if type(schema[key]) == dict:
  307. definitions_flag = self._analyze_schema(schema[key], definitions_flag)
  308. return definitions_flag
  309. def _clean_desciptions_tags_from_single_quotes(self, schema: dict) -> dict:
  310. '''
  311. :param dict schema: dictonary containing schema
  312. '''
  313. for key in list(schema):
  314. if key == 'description':
  315. cleaned_description = self._remove_single_quotes_from_description_tag(schema[key])
  316. schema[key] = cleaned_description
  317. if type(schema[key]) == dict:
  318. self._clean_desciptions_tags_from_single_quotes(schema[key])
  319. return schema
  320. def _format_schema_for_mongo(self, schema: dict) -> dict:
  321. '''
  322. We use in the schema tags whih are not supported by mongo an threfore
  323. must be taken care of before setting the schema for mongo.
  324. :param str schema_path: path to the schema file.
  325. '''
  326. for key in list(schema):
  327. if type(schema[key]) == dict:
  328. self._format_schema_for_mongo(schema[key])
  329. if key == 'default' or key == 'default_values':
  330. self._remove_defaults(schema)
  331. if key == 'examples':
  332. self._remove_examples(schema)
  333. return schema
  334. def _dereference_schema(self, schema: dict) -> dict:
  335. '''
  336. :param dict schema: dictionary containing a schema which uses references.
  337. '''
  338. assert(isinstance(schema, dict)),\
  339. "Parameter 'schema' must be a dictionary type"
  340. base_dir_url = Path(os.path.join(os.getcwd(), "mongo_schema")).as_uri() + '/'
  341. # json.load(f) convert double quotes into singel quotes. jsonref expects
  342. # the json in string format with double quotes.
  343. schema = str(schema).replace("'", "\"")
  344. schema = jsonref.loads(schema, base_uri=base_dir_url)
  345. schema = deepcopy(schema)
  346. return schema
  347. def _remove_defaults(self, schema: dict) -> dict:
  348. '''
  349. :param dict schema: dictionary containing a schema which uses 'default' tags.
  350. '''
  351. assert(isinstance(schema, dict)),\
  352. "Parameter 'schema' must be a dictionary type"
  353. if 'default' in schema:
  354. del schema['default']
  355. if 'default_values' in schema:
  356. del schema['default_values']
  357. def _remove_examples(self, schema: dict) -> dict:
  358. '''
  359. :param dict schema: dictionary containing a schema with 'examples' tags.
  360. '''
  361. assert(isinstance(schema, dict)),\
  362. "Parameter 'schema' must be a dictionary type"
  363. if 'examples' in schema:
  364. del schema['examples']
  365. assert(isinstance(schema, dict)),\
  366. "Parameter 'schema' must be a dictionary type"
  367. def _remove_single_quotes_from_description_tag(self, description: str) -> str:
  368. '''
  369. :param dict schema: dictionary containing a schema with 'examples' tags.
  370. '''
  371. assert(isinstance(description, str)),\
  372. "Parameter 'description' must be a string type"
  373. description = description.replace("'", "")
  374. return description
  375. if __name__ == "__main__":
  376. # Only for testing
  377. schema_path = os.path.join(".", "mongo_schema", "schema_components.json")
  378. if os.path.isfile(schema_path):
  379. parse_obj = ParseJsonSchema(schema_paths=schema_path)
  380. fields = parse_obj.get_fields()
  381. required_fileds = parse_obj.get_required_fields()
  382. patterns = parse_obj.get_patterns()
  383. mongo_types = parse_obj.get_mongo_types()
  384. python_types_except_dates = parse_obj.get_python_types()
  385. datetime_fields = parse_obj.get_datetime_fields()
  386. allowed_values = parse_obj.get_allowed_values()
  387. descriptions = parse_obj.get_field_descriptions()