ParseJsonSchema.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Thu Jan 31 11:41:48 2019
  5. @author: tanya
  6. """
  7. import os
  8. import sys
  9. from copy import deepcopy
  10. import numpy as np
  11. import json
  12. import jsonref
  13. from pathlib import Path
  14. sys.path.append(os.getcwd())
  15. from cdplib.db_migration.ParseDbSchema import ParseDbSchema
  16. class ParseJsonSchema(ParseDbSchema):
  17. '''
  18. Class for retrieving column properties from mongodb jsonSchema
  19. '''
  20. def __init__(self, schema_paths: [list, str], log_file: str = None):
  21. '''
  22. '''
  23. from cdplib.log import Log
  24. super().__init__(schema_paths=schema_paths, log_file=log_file)
  25. self._log = Log(name="ParseJsonSchema", log_file=log_file)
  26. # load schemas to dictionaries if they are valid json files
  27. assert(isinstance(schema_paths, (list, str))),\
  28. "Schema paths must be either str or lists"
  29. if isinstance(schema_paths, str):
  30. schema_paths = [schema_paths]
  31. self._schema_paths = schema_paths
  32. self.schemas = []
  33. for schema_path in schema_paths:
  34. try:
  35. with open(schema_path, "r") as f:
  36. schema = json.load(f)
  37. ref_flag = self._analyze_schema(schema)
  38. if ref_flag:
  39. schema = self._format_schema_for_mongo(schema)
  40. schema = self._dereference_schema(schema)
  41. schema = self._format_schema_for_mongo(schema)
  42. self.schemas.append(schema)
  43. else:
  44. schema = self._format_schema_for_mongo(schema)
  45. self.schemas.append(schema)
  46. except Exception as e:
  47. err = ("Could not load json schema:{1} , "
  48. "Obtained error {0}".format(e,schema_path))
  49. self._log.error(err)
  50. raise Exception(err)
  51. @property
  52. def _collection_names(self) -> list:
  53. '''
  54. '''
  55. # Don't use strip() instaed of replace since schema_c.strip(schema_)
  56. # will discard the c as well which is not a appropriate output
  57. return [os.path.basename(p).replace("schema_","").split(".")[0] for p in self._schema_paths]
  58. def get_fields(self) -> list:
  59. '''
  60. '''
  61. return self._parse()
  62. def get_fields_restricted_to_collection(self, collection_name: str) -> list:
  63. '''
  64. '''
  65. schemas = [self.schemas[self._collection_names.index(collection_name)]]
  66. return self._parse(schemas=schemas)
  67. def get_required_fields(self) -> list:
  68. '''
  69. '''
  70. return self._parse(required_only=True)
  71. def get_mongo_types(self) -> dict:
  72. '''
  73. '''
  74. return self._parse(field_info="bsonType")
  75. def get_datetime_fields(self):
  76. '''
  77. '''
  78. mongo_types = self.get_mongo_types()
  79. return [k for k, v in mongo_types.items()
  80. if v in ["date", "timestamp", "Date", "Timestamp"]]
  81. def get_python_types(self) -> dict:
  82. '''
  83. '''
  84. mongo_types = self.get_mongo_types()
  85. python_types = {}
  86. bson_to_python_types = {"double": float,
  87. "decimal": float,
  88. "string": str,
  89. "object": object,
  90. "array": list,
  91. "bool": bool,
  92. "int": int,
  93. "long": int,
  94. "date": np.dtype('<M8[ns]'),
  95. "timestamp": np.dtype('<M8[ns]')
  96. }
  97. for k, v in mongo_types.items():
  98. if isinstance(v, list):
  99. if ("date" in v) or ("timestamp" in v):
  100. v = "date"
  101. elif "string" in v:
  102. v = "string"
  103. elif ("double" in v) or ("decimal" in v):
  104. v = "double"
  105. elif ("null" in v) and (len(v) == 2) and ("int" not in v):
  106. v = [t for t in v if type != "null"][0]
  107. else:
  108. err = "Type {0}: {1} not convertibale".format(k, v)
  109. self._log.error(err)
  110. raise Exception(err)
  111. if v in bson_to_python_types:
  112. python_types[k] = bson_to_python_types[v]
  113. return python_types
  114. def get_patterns(self) -> dict:
  115. '''
  116. '''
  117. return self._parse(field_info="pattern")
  118. def get_default_values(self) -> dict:
  119. '''
  120. '''
  121. return self._parse(field_info="default")
  122. def get_allowed_values(self) -> dict:
  123. '''
  124. '''
  125. return self._parse(field_info="enum")
  126. def get_maximum_value(self) -> dict:
  127. '''
  128. '''
  129. return self._parse(field_info="maximum")
  130. def get_minimum_value(self) -> dict:
  131. '''
  132. '''
  133. return self._parse(field_info="minimum")
  134. def get_max_items(self) -> dict:
  135. '''
  136. '''
  137. return self._parse(field_info="maxItems")
  138. def get_min_items(self) -> dict:
  139. '''
  140. '''
  141. return self._parse(field_info="minItems")
  142. def get_field_descriptions(self) -> dict:
  143. '''
  144. '''
  145. return self._parse(field_info="description")
  146. def _parse(self,
  147. field_info: str = None,
  148. required_only: bool = False,
  149. schemas: list = None):
  150. '''
  151. '''
  152. if schemas is None:
  153. schemas = self.schemas
  154. result = self._parse_one(schema=schemas[0],
  155. field_info=field_info,
  156. required_only=required_only)
  157. for schema in schemas[1:]:
  158. next_result = self._parse_one(schema=schema,
  159. field_info=field_info,
  160. required_only=required_only)
  161. if isinstance(result, list):
  162. result.extend(next_result)
  163. else:
  164. result.update(next_result)
  165. return result
  166. def _parse_one(self,
  167. schema: dict,
  168. field_info: str = None,
  169. required_only: bool = False,
  170. super_field_name: str = None,
  171. already_parsed: (list, dict) = None) -> (list, dict):
  172. '''
  173. Recursive function that returns a list of (nested) field names or
  174. a dictionary of (nested) field names with field characteristics.
  175. :param schema: if None => entire self.schema, or a sub-schema
  176. of self.schema
  177. :param field_info: optional, if provided a dictionary of field
  178. names with field characteristics is returned (for examples
  179. bsonType of each field), else a list of fields is returned
  180. :param required_only: when True, only returns fields marked as
  181. required in the mongo schema
  182. :param super_field_name: needed for recursion
  183. Example: the field 'article' has
  184. subfields 'id' and 'supplier'.
  185. If we parse the sub-document corresponding to article, then
  186. super_field_name is'article' and we might get an output like
  187. {'article.id': string, 'article.supplier': string}
  188. :param alread_parsed: needed for recursion
  189. '''
  190. schema = deepcopy(schema)
  191. assert(isinstance(schema, dict)),\
  192. "Parameter 'schema' must be a dict"
  193. if field_info is None:
  194. # parse a list of fields
  195. if already_parsed is None:
  196. already_parsed = []
  197. else:
  198. assert(isinstance(already_parsed, list)),\
  199. "Parameter 'already_parsed' must be of type list"
  200. else:
  201. # parse a dictionary of field names with field characteristics
  202. if already_parsed is None:
  203. already_parsed = {}
  204. else:
  205. assert(isinstance(already_parsed, dict)),\
  206. "Parameter 'already_parsed' must be of type dict"
  207. # If schema is nested, then
  208. # either it is of bsonType object
  209. # and the field information is stored under the key 'properties'
  210. # or it is of bsonType array
  211. # and the field information is stored in sub-schemas
  212. # under the key 'items'
  213. # if schema is of bsonType object
  214. if "properties" in schema.keys():
  215. if "required" in schema.keys():
  216. required_subfields = schema["required"]
  217. else:
  218. required_subfields = []
  219. for sub_field_name in schema["properties"].keys():
  220. sub_schema = schema["properties"][sub_field_name]
  221. # only process fields that are required
  222. if required_only and\
  223. (sub_field_name not in required_subfields):
  224. pass
  225. else:
  226. if super_field_name is not None:
  227. field_name = '.'.join([super_field_name,
  228. sub_field_name])
  229. else:
  230. field_name = sub_field_name
  231. # if the given sub-field is nested, parse the
  232. # sub-schema corresponding to this sub-field
  233. self._parse_one(
  234. schema=sub_schema,
  235. super_field_name=field_name,
  236. field_info=field_info,
  237. already_parsed=already_parsed,
  238. required_only=required_only)
  239. # if schema is of bsonType array
  240. elif "items" in schema.keys():
  241. # one schema for all items
  242. if isinstance(schema["items"], dict):
  243. sub_schema = schema["items"]
  244. self._parse_one(schema=sub_schema,
  245. super_field_name=super_field_name,
  246. field_info=field_info,
  247. already_parsed=already_parsed,
  248. required_only=required_only)
  249. # list of separate schemas for each item
  250. elif isinstance(schema["items"], list):
  251. for sub_schema in schema["items"]:
  252. self._parse_one(schema=sub_schema,
  253. super_field_name=super_field_name,
  254. field_info=field_info,
  255. already_parsed=already_parsed,
  256. required_only=required_only)
  257. else:
  258. raise Exception(('Schema is not composed correctly: '
  259. 'items must be a dictionary or a list'))
  260. else:
  261. # If neither properties nor items is in schema keys
  262. # we reached the last level of nestedness,
  263. # field information is stored in the schema keys.
  264. field_name = super_field_name
  265. if field_info is None:
  266. already_parsed.append(field_name)
  267. else:
  268. if field_info in schema.keys():
  269. already_parsed[field_name] = schema[field_info]
  270. else:
  271. pass
  272. return already_parsed
  273. def read_schema_and_parse_for_mongodb(self, schema_path: str) -> dict:
  274. '''
  275. We need to deference json before import to Mongo DB pymongo can't deal with references
  276. :param str schema_path: path to the schema file.
  277. '''
  278. assert(isinstance(schema_path, str)),\
  279. "Parameter 'schema_path must be a string type"
  280. with open(schema_path) as json_file:
  281. schema = json.load(json_file)
  282. definitions_flag = self._analyze_schema(schema)
  283. if definitions_flag:
  284. schema = self._format_schema_for_mongo(schema)
  285. schema = self._dereference_schema(schema)
  286. schema = self._format_schema_for_mongo(schema)
  287. return schema
  288. def _analyze_schema (self, schema: dict, definitions_flag: bool = False) -> dict:
  289. for key in list(schema):
  290. if key == '$ref':
  291. definitions_flag = True
  292. return definitions_flag
  293. if type(schema[key]) == dict:
  294. definitions_flag = self._analyze_schema(schema[key], definitions_flag)
  295. return definitions_flag
  296. def _format_schema_for_mongo(self, schema: dict) -> dict:
  297. '''
  298. We use in the schema tags whih are not supported by mongo an threfore
  299. must be taken care of before setting the schema for mongo.
  300. :param str schema_path: path to the schema file.
  301. '''
  302. for key in list(schema):
  303. if key == 'description':
  304. cleaned_description = self._remove_single_quotes_from_description_tag(schema[key])
  305. schema[key] = cleaned_description
  306. if type(schema[key]) == dict:
  307. self._format_schema_for_mongo(schema[key])
  308. if key == 'examples':
  309. self._remove_examples(schema)
  310. if key == 'default' or key == 'default_values':
  311. self._remove_defaults(schema)
  312. return schema
  313. def _dereference_schema(self, schema: dict) -> dict:
  314. '''
  315. :param dict schema: dictionary containing a schema which uses references.
  316. '''
  317. assert(isinstance(schema, dict)),\
  318. "Parameter 'schema' must be a dictionary type"
  319. base_dir_url = Path(os.path.join(os.getcwd(), "mongo_schema")).as_uri() + '/'
  320. # json.load(f) convert double quotes into singel quotes. jsonref expects
  321. # the json in string format with double quotes.
  322. schema = str(schema).replace("'", "\"")
  323. schema = jsonref.loads(schema, base_uri=base_dir_url)
  324. schema = deepcopy(schema)
  325. #schema.pop('definitions', None)
  326. return schema
  327. def _remove_defaults(self, schema: dict) -> dict:
  328. '''
  329. :param dict schema: dictionary containing a schema which uses 'default' tags.
  330. '''
  331. assert(isinstance(schema, dict)),\
  332. "Parameter 'schema' must be a dictionary type"
  333. if 'default' in schema:
  334. del schema['default']
  335. if 'default_values' in schema:
  336. del schema['default_values']
  337. def _remove_examples(self, schema: dict) -> dict:
  338. '''
  339. :param dict schema: dictionary containing a schema with 'examples' tags.
  340. '''
  341. assert(isinstance(schema, dict)),\
  342. "Parameter 'schema' must be a dictionary type"
  343. if 'examples' in schema:
  344. del schema['examples']
  345. assert(isinstance(schema, dict)),\
  346. "Parameter 'schema' must be a dictionary type"
  347. def _remove_single_quotes_from_description_tag(self, description: str) -> str:
  348. '''
  349. :param dict schema: dictionary containing a schema with 'examples' tags.
  350. '''
  351. assert(isinstance(description, str)),\
  352. "Parameter 'description' must be a string type"
  353. description = description.replace("'", "")
  354. return description
  355. if __name__ == "__main__":
  356. # Only for testing
  357. schema_path = os.path.join(".", "mongo_schema", "schema_components.json")
  358. if os.path.isfile(schema_path):
  359. parse_obj = ParseJsonSchema(schema_paths=schema_path)
  360. fields = parse_obj.get_fields()
  361. required_fileds = parse_obj.get_required_fields()
  362. patterns = parse_obj.get_patterns()
  363. mongo_types = parse_obj.get_mongo_types()
  364. python_types_except_dates = parse_obj.get_python_types()
  365. datetime_fields = parse_obj.get_datetime_fields()
  366. allowed_values = parse_obj.get_allowed_values()
  367. descriptions = parse_obj.get_field_descriptions()