ParseJsonSchema.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Thu Jan 31 11:41:48 2019
  5. @author: tanya
  6. """
  7. import os
  8. import sys
  9. from copy import deepcopy
  10. import numpy as np
  11. sys.path.append(os.getcwd())
  12. from libraries.db_migration.ParseDbSchema import ParseDbSchema
  13. class ParseJsonSchema(ParseDbSchema):
  14. '''
  15. Class for retrieving column properties from mongodb jsonSchema
  16. '''
  17. def __init__(self, schema_paths: [list, str], log_file: str = None):
  18. '''
  19. '''
  20. import json
  21. from libraries.log import Log
  22. super().__init__(schema_paths=schema_paths, log_file=log_file)
  23. self._log = Log(name="ParseJsonSchema", log_file=log_file)
  24. # load schemas to dictionaries if they are valid json files
  25. assert(isinstance(schema_paths, (list, str))),\
  26. "Schema paths must be either str or lists"
  27. if isinstance(schema_paths, str):
  28. schema_paths = [schema_paths]
  29. self.schemas = []
  30. for schema_path in schema_paths:
  31. try:
  32. with open(schema_path, "r") as f:
  33. self.schemas.append(json.load(f))
  34. except Exception as e:
  35. err = ("Could not load json schema, "
  36. "Obtained error {}".format(e))
  37. self._log.error(err)
  38. raise Exception(err)
  39. def get_fields(self) -> list:
  40. '''
  41. '''
  42. return self._parse()
  43. def get_required_fields(self) -> list:
  44. '''
  45. '''
  46. return self._parse(required_only=True)
  47. def get_mongo_types(self) -> dict:
  48. '''
  49. '''
  50. return self._parse(field_info="bsonType")
  51. def get_datetime_fields(self):
  52. '''
  53. '''
  54. mongo_types = self.get_mongo_types()
  55. return [k for k, v in mongo_types.items()
  56. if v in ["date", "timestamp", "Date", "Timestamp"]]
  57. def get_python_types(self) -> dict:
  58. '''
  59. '''
  60. mongo_types = self.get_mongo_types()
  61. python_types = {}
  62. bson_to_python_types_except_dates = {"double": float,
  63. "decimal": float,
  64. "string": str,
  65. "object": object,
  66. "array": list,
  67. "bool": bool,
  68. "int": int,
  69. "long": int,
  70. "date": np.dtype('<M8[ns]'),
  71. "timestamp": np.dtype('<M8[ns]')
  72. }
  73. for k, v in mongo_types.items():
  74. if isinstance(v, list):
  75. if ("date" in v) or ("timestamp" in v):
  76. v = "date"
  77. elif "string" in v:
  78. v = "string"
  79. elif ("double" in v) or ("decimal" in v):
  80. v = "double"
  81. elif ("null" in v) and (len(v) == 2) and ("int" not in v):
  82. v = [t for t in v if type != "null"][0]
  83. else:
  84. err = "Type {0}: {1} not convertibale".format(k, v)
  85. self._log.error(err)
  86. raise Exception(err)
  87. if v in bson_to_python_types_except_dates:
  88. python_types[k] = bson_to_python_types_except_dates[v]
  89. return python_types
  90. def get_patterns(self) -> dict:
  91. '''
  92. '''
  93. return self._parse(field_info="pattern")
  94. def get_default_values(self) -> dict:
  95. '''
  96. '''
  97. return self._parse(field_info="default")
  98. def get_allowed_values(self) -> dict:
  99. '''
  100. '''
  101. return self._parse(field_info="enum")
  102. def get_maximum_value(self) -> dict:
  103. '''
  104. '''
  105. return self._parse(field_info="maximum")
  106. def get_minimum_value(self) -> dict:
  107. '''
  108. '''
  109. return self._parse(field_info="minimum")
  110. def get_max_items(self) -> dict:
  111. '''
  112. '''
  113. return self._parse(field_info="maxItems")
  114. def get_min_items(self) -> dict:
  115. '''
  116. '''
  117. return self._parse(field_info="minItems")
  118. def get_field_descriptions(self) -> dict:
  119. '''
  120. '''
  121. return self._parse(field_info="description")
  122. def _parse(self,
  123. field_info: str = None,
  124. required_only: bool = False):
  125. '''
  126. '''
  127. result = self._parse_one(schema=self.schemas[0],
  128. field_info=field_info,
  129. required_only=required_only)
  130. for schema in self.schemas[1:]:
  131. next_result = self._parse_one(schema=schema,
  132. field_info=field_info,
  133. required_only=required_only)
  134. if isinstance(result, list):
  135. result.extend(next_result)
  136. else:
  137. result.update(next_result)
  138. return result
  139. def _parse_one(self,
  140. schema: dict,
  141. field_info: str = None,
  142. required_only: bool = False,
  143. super_field_name: str = None,
  144. already_parsed: (list, dict) = None) -> (list, dict):
  145. '''
  146. Recursive function that returns a list of (nested) field names or
  147. a dictionary of (nested) field names with field characteristics.
  148. :param schema: if None => entire self.schema, or a sub-schema
  149. of self.schema
  150. :param field_info: optional, if provided a dictionary of field
  151. names with field characteristics is returned (for examples
  152. bsonType of each field), else a list of fields is returned
  153. :param required_only: when True, only returns fields marked as
  154. required in the mongo schema
  155. :param super_field_name: needed for recursion
  156. Example: the field 'article' has
  157. subfields 'id' and 'supplier'.
  158. If we parse the sub-document corresponding to article, then
  159. super_field_name is'article' and we might get an output like
  160. {'article.id': string, 'article.supplier': string}
  161. :param alread_parsed: needed for recursion
  162. '''
  163. schema = deepcopy(schema)
  164. assert(isinstance(schema, dict)),\
  165. "Parameter 'schema' must be a dict"
  166. if field_info is None:
  167. # parse a list of fields
  168. if already_parsed is None:
  169. already_parsed = []
  170. else:
  171. assert(isinstance(already_parsed, list)),\
  172. "Parameter 'already_parsed' must be of type list"
  173. else:
  174. # parse a dictionary of field names with field characteristics
  175. if already_parsed is None:
  176. already_parsed = {}
  177. else:
  178. assert(isinstance(already_parsed, dict)),\
  179. "Parameter 'already_parsed' must be of type dict"
  180. # If schema is nested, then
  181. # either it is of bsonType object
  182. # and the field information is stored under the key 'properties'
  183. # or it is of bsonType array
  184. # and the field information is stored in sub-schemas
  185. # under the key 'items'
  186. # if schema is of bsonType object
  187. if "properties" in schema.keys():
  188. if "required" in schema.keys():
  189. required_subfields = schema["required"]
  190. for sub_field_name in schema["properties"].keys():
  191. sub_schema = schema["properties"][sub_field_name]
  192. # only process fields that are required
  193. if required_only and\
  194. (sub_field_name not in required_subfields):
  195. pass
  196. else:
  197. if super_field_name is not None:
  198. field_name = '.'.join([super_field_name,
  199. sub_field_name])
  200. else:
  201. field_name = sub_field_name
  202. # if the given sub-field is nested, parse the
  203. # sub-schema corresponding to this sub-field
  204. self._parse_one(
  205. schema=sub_schema,
  206. super_field_name=field_name,
  207. field_info=field_info,
  208. already_parsed=already_parsed,
  209. required_only=required_only)
  210. # if schema is of bsonType array
  211. elif "items" in schema.keys():
  212. # one schema for all items
  213. if isinstance(schema["items"], dict):
  214. sub_schema = schema["items"]
  215. self._parse_one(schema=sub_schema,
  216. super_field_name=super_field_name,
  217. field_info=field_info,
  218. already_parsed=already_parsed,
  219. required_only=required_only)
  220. # list of separate schemas for each item
  221. elif isinstance(schema["items"], list):
  222. for sub_schema in schema["items"]:
  223. self._parse_one(schema=sub_schema,
  224. super_field_name=super_field_name,
  225. field_info=field_info,
  226. already_parsed=already_parsed,
  227. required_only=required_only)
  228. else:
  229. raise Exception(('Schema is not composed correctly: '
  230. 'items must be a dictionary or a list'))
  231. else:
  232. # If neither properties nor items is in schema keys
  233. # we reached the last level of nestedness,
  234. # field information is stored in the schema keys.
  235. field_name = super_field_name
  236. if field_info is None:
  237. already_parsed.append(field_name)
  238. else:
  239. if field_info in schema.keys():
  240. already_parsed[field_name] = schema[field_info]
  241. else:
  242. pass
  243. return already_parsed
  244. if __name__ == "__main__":
  245. # Only for testing
  246. schema_path = os.path.join(".", "mongo_schema", "schema_wheelsets.json")
  247. if os.path.isfile(schema_path):
  248. parse_obj = ParseJsonSchema(schema_paths=schema_path)
  249. fields = parse_obj.get_fields()
  250. required_fileds = parse_obj.get_required_fields()
  251. patterns = parse_obj.get_patterns()
  252. mongo_types = parse_obj.get_mongo_types()
  253. python_types_except_dates = parse_obj.get_python_types()
  254. datetime_fields = parse_obj.get_datetime_fields()
  255. allowed_values = parse_obj.get_allowed_values()
  256. descriptions = parse_obj.get_field_descriptions()