ParseJsonSchema.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Thu Jan 31 11:41:48 2019
  5. @author: tanya
  6. """
  7. import os
  8. import sys
  9. from copy import deepcopy
  10. import numpy as np
  11. import json
  12. import jsonref
  13. from pathlib import Path
  14. sys.path.append(os.getcwd())
  15. from cdplib.db_migration.ParseDbSchema import ParseDbSchema
  16. class ParseJsonSchema(ParseDbSchema):
  17. '''
  18. Class for retrieving column properties from mongodb jsonSchema
  19. '''
  20. def __init__(self, schema_paths: [list, str], log_file: str = None):
  21. '''
  22. '''
  23. from cdplib.log import Log
  24. super().__init__(schema_paths=schema_paths, log_file=log_file)
  25. self._log = Log(name="ParseJsonSchema", log_file=log_file)
  26. # load schemas to dictionaries if they are valid json files
  27. assert(isinstance(schema_paths, (list, str))),\
  28. "Schema paths must be either str or lists"
  29. if isinstance(schema_paths, str):
  30. schema_paths = [schema_paths]
  31. self._schema_paths = schema_paths
  32. self.schemas = []
  33. for schema_path in schema_paths:
  34. try:
  35. with open(schema_path, "r") as f:
  36. # Load schmea dereferenced and cleaned by default values
  37. self.schemas.append(self._dereference_schema(schema_path))
  38. except Exception as e:
  39. err = ("Could not load json schema {0}, "
  40. "Obtained error {1}".format(schema_path, e))
  41. self._log.error(err)
  42. raise Exception(err)
  43. @property
  44. def _collection_names(self) -> list:
  45. '''
  46. '''
  47. # Don't use strip() instaed of replace since schema_c.strip(schema_)
  48. # will discard the c as well which is not a appropriate output
  49. return [os.path.basename(p).replace("schema_", "").split(".")[0] for p in self._schema_paths]
  50. def get_fields(self) -> list:
  51. '''
  52. '''
  53. return self._parse()
  54. def get_fields_restricted_to_collection(self, collection_name: str) -> list:
  55. '''
  56. '''
  57. schemas = [self.schemas[self._collection_names.index(collection_name)]]
  58. return self._parse(schemas=schemas)
  59. def get_required_fields(self) -> list:
  60. '''
  61. '''
  62. return self._parse(required_only=True)
  63. def get_mongo_types(self) -> dict:
  64. '''
  65. '''
  66. return self._parse(field_info="bsonType")
  67. def get_datetime_fields(self):
  68. '''
  69. '''
  70. mongo_types = self.get_mongo_types()
  71. return [k for k, v in mongo_types.items()
  72. if v in ["date", "timestamp", "Date", "Timestamp"]]
  73. def get_python_types(self) -> dict:
  74. '''
  75. '''
  76. mongo_types = self.get_mongo_types()
  77. python_types = {}
  78. bson_to_python_types = {"double": float,
  79. "decimal": float,
  80. "string": str,
  81. "object": object,
  82. "array": list,
  83. "bool": bool,
  84. "int": int,
  85. "long": int,
  86. "date": np.dtype('<M8[ns]'),
  87. "timestamp": np.dtype('<M8[ns]')
  88. }
  89. for k, v in mongo_types.items():
  90. if isinstance(v, list):
  91. if ("date" in v) or ("timestamp" in v):
  92. v = "date"
  93. elif "string" in v:
  94. v = "string"
  95. elif ("double" in v) or ("decimal" in v):
  96. v = "double"
  97. elif ("null" in v) and (len(v) == 2) and ("int" not in v):
  98. v = [t for t in v if type != "null"][0]
  99. else:
  100. err = "Type {0}: {1} not convertibale".format(k, v)
  101. self._log.error(err)
  102. raise Exception(err)
  103. if v in bson_to_python_types:
  104. python_types[k] = bson_to_python_types[v]
  105. return python_types
  106. def get_patterns(self) -> dict:
  107. '''
  108. '''
  109. return self._parse(field_info="pattern")
  110. def get_default_values(self) -> dict:
  111. '''
  112. '''
  113. return self._parse(field_info="default")
  114. def get_allowed_values(self) -> dict:
  115. '''
  116. '''
  117. return self._parse(field_info="enum")
  118. def get_maximum_value(self) -> dict:
  119. '''
  120. '''
  121. return self._parse(field_info="maximum")
  122. def get_minimum_value(self) -> dict:
  123. '''
  124. '''
  125. return self._parse(field_info="minimum")
  126. def get_max_items(self) -> dict:
  127. '''
  128. '''
  129. return self._parse(field_info="maxItems")
  130. def get_min_items(self) -> dict:
  131. '''
  132. '''
  133. return self._parse(field_info="minItems")
  134. def get_field_descriptions(self) -> dict:
  135. '''
  136. '''
  137. return self._parse(field_info="description")
  138. def _parse(self,
  139. field_info: str = None,
  140. required_only: bool = False,
  141. schemas: list = None):
  142. '''
  143. '''
  144. if schemas is None:
  145. schemas = self.schemas
  146. result = self._parse_one(schema=schemas[0],
  147. field_info=field_info,
  148. required_only=required_only)
  149. for schema in schemas[1:]:
  150. next_result = self._parse_one(schema=schema,
  151. field_info=field_info,
  152. required_only=required_only)
  153. if isinstance(result, list):
  154. result.extend(next_result)
  155. else:
  156. result.update(next_result)
  157. return result
  158. def _parse_one(self,
  159. schema: dict,
  160. field_info: str = None,
  161. required_only: bool = False,
  162. super_field_name: str = None,
  163. already_parsed: (list, dict) = None) -> (list, dict):
  164. '''
  165. Recursive function that returns a list of (nested) field names or
  166. a dictionary of (nested) field names with field characteristics.
  167. :param schema: if None => entire self.schema, or a sub-schema
  168. of self.schema
  169. :param field_info: optional, if provided a dictionary of field
  170. names with field characteristics is returned (for examples
  171. bsonType of each field), else a list of fields is returned
  172. :param required_only: when True, only returns fields marked as
  173. required in the mongo schema
  174. :param super_field_name: needed for recursion
  175. Example: the field 'article' has
  176. subfields 'id' and 'supplier'.
  177. If we parse the sub-document corresponding to article, then
  178. super_field_name is'article' and we might get an output like
  179. {'article.id': string, 'article.supplier': string}
  180. :param alread_parsed: needed for recursion
  181. '''
  182. schema = deepcopy(schema)
  183. assert(isinstance(schema, dict)),\
  184. "Parameter 'schema' must be a dict"
  185. if field_info is None:
  186. # parse a list of fields
  187. if already_parsed is None:
  188. already_parsed = []
  189. else:
  190. assert(isinstance(already_parsed, list)),\
  191. "Parameter 'already_parsed' must be of type list"
  192. else:
  193. # parse a dictionary of field names with field characteristics
  194. if already_parsed is None:
  195. already_parsed = {}
  196. else:
  197. assert(isinstance(already_parsed, dict)),\
  198. "Parameter 'already_parsed' must be of type dict"
  199. # If schema is nested, then
  200. # either it is of bsonType object
  201. # and the field information is stored under the key 'properties'
  202. # or it is of bsonType array
  203. # and the field information is stored in sub-schemas
  204. # under the key 'items'
  205. # if schema is of bsonType object
  206. if "properties" in schema.keys():
  207. if "required" in schema.keys():
  208. required_subfields = schema["required"]
  209. else:
  210. required_subfields = []
  211. for sub_field_name in schema["properties"].keys():
  212. sub_schema = schema["properties"][sub_field_name]
  213. # only process fields that are required
  214. if required_only and\
  215. (sub_field_name not in required_subfields):
  216. pass
  217. else:
  218. if super_field_name is not None:
  219. field_name = '.'.join([super_field_name,
  220. sub_field_name])
  221. else:
  222. field_name = sub_field_name
  223. # if the given sub-field is nested, parse the
  224. # sub-schema corresponding to this sub-field
  225. self._parse_one(
  226. schema=sub_schema,
  227. super_field_name=field_name,
  228. field_info=field_info,
  229. already_parsed=already_parsed,
  230. required_only=required_only)
  231. # if schema is of bsonType array
  232. elif "items" in schema.keys():
  233. # one schema for all items
  234. if isinstance(schema["items"], dict):
  235. sub_schema = schema["items"]
  236. self._parse_one(schema=sub_schema,
  237. super_field_name=super_field_name,
  238. field_info=field_info,
  239. already_parsed=already_parsed,
  240. required_only=required_only)
  241. # list of separate schemas for each item
  242. elif isinstance(schema["items"], list):
  243. for sub_schema in schema["items"]:
  244. self._parse_one(schema=sub_schema,
  245. super_field_name=super_field_name,
  246. field_info=field_info,
  247. already_parsed=already_parsed,
  248. required_only=required_only)
  249. else:
  250. raise Exception(('Schema is not composed correctly: '
  251. 'items must be a dictionary or a list'))
  252. else:
  253. # If neither properties nor items is in schema keys
  254. # we reached the last level of nestedness,
  255. # field information is stored in the schema keys.
  256. field_name = super_field_name
  257. if field_info is None:
  258. already_parsed.append(field_name)
  259. else:
  260. if field_info in schema.keys():
  261. already_parsed[field_name] = schema[field_info]
  262. else:
  263. pass
  264. return already_parsed
  265. def _dereference_schema(self, schema_path: str) -> dict:
  266. '''
  267. :param dict schema: dictionary containing a schema which uses references.
  268. '''
  269. assert(isinstance(schema_path, str)),\
  270. "Parameter 'schema_path' must be a string type"
  271. base_dir_url = Path(os.path.join(os.getcwd(), "mongo_schema")).as_uri() + '/'
  272. schema = jsonref.loads(open(schema_path,"r").read(), base_uri=base_dir_url)
  273. schema = deepcopy(schema)
  274. schema.pop('definitions', None)
  275. return schema
  276. def _remove_defaults(self, schema: dict) -> dict:
  277. '''
  278. :param dict schema: dictionary containing a schema which uses references.
  279. '''
  280. if 'default' in schema:
  281. del schema['default']
  282. if 'default_values' in schema:
  283. del schema['default_values']
  284. return schema
  285. assert(isinstance(schema, dict)),\
  286. "Parameter 'schema' must be a dictionary type"
  287. # Need to parse schmema for importing to mongo db
  288. # Reason:
  289. # We need to drop default values since MongoDB can't handle them
  290. # We need to deference json before import to Mongo DB pymongo can't deal with references
  291. def read_schema_and_parse_for_mongodb(self, schema_path: str) -> dict:
  292. '''
  293. :param str schema_path: path to the schema file.
  294. '''
  295. assert(isinstance(schema_path, str)),\
  296. "Parameter 'schema_path must be a string type"
  297. with open(schema_path) as json_file:
  298. schema = json.load(json_file)
  299. definitions_flag = self._analyze_schema(schema)
  300. if definitions_flag:
  301. schema = self._dereference_schema(schema)
  302. return schema
  303. def _analyze_schema(self, schema: dict, definitions_flag: bool = False) -> dict:
  304. for key in schema:
  305. if key == '$ref':
  306. definitions_flag = True
  307. return definitions_flag
  308. if key == 'default' or key == 'default_values':
  309. return self._remove_defaults(schema)
  310. if type(schema[key]) == dict:
  311. definitions_flag = self._analyze_schema(schema[key], definitions_flag)
  312. return definitions_flag
  313. if __name__ == "__main__":
  314. # Only for testing
  315. schema_path = os.path.join(".", "mongo_schema", "schema_components.json")
  316. if os.path.isfile(schema_path):
  317. parse_obj = ParseJsonSchema(schema_paths=schema_path)
  318. fields = parse_obj.get_fields()
  319. required_fileds = parse_obj.get_required_fields()
  320. patterns = parse_obj.get_patterns()
  321. mongo_types = parse_obj.get_mongo_types()
  322. python_types_except_dates = parse_obj.get_python_types()
  323. datetime_fields = parse_obj.get_datetime_fields()
  324. allowed_values = parse_obj.get_allowed_values()
  325. descriptions = parse_obj.get_field_descriptions()