ParseJsonSchema.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Thu Jan 31 11:41:48 2019
  5. @author: tanya
  6. """
  7. import os
  8. import sys
  9. from copy import deepcopy
  10. import numpy as np
  11. sys.path.append(os.getcwd())
  12. from libraries.db_migration.ParseDbSchema import ParseDbSchema
  13. class ParseJsonSchema(ParseDbSchema):
  14. '''
  15. Class for retrieving column properties from mongodb jsonSchema
  16. '''
  17. def __init__(self, schema_paths: [list, str], log_file: str = None):
  18. '''
  19. '''
  20. import json
  21. from libraries.log import Log
  22. super().__init__(schema_paths=schema_paths, log_file=log_file)
  23. self._log = Log(name="ParseJsonSchema", log_file=log_file)
  24. # load schemas to dictionaries if they are valid json files
  25. assert(isinstance(schema_paths, (list, str))),\
  26. "Schema paths must be either str or lists"
  27. if isinstance(schema_paths, str):
  28. schema_paths = [schema_paths]
  29. self._schema_paths = schema_paths
  30. self.schemas = []
  31. for schema_path in schema_paths:
  32. try:
  33. with open(schema_path, "r") as f:
  34. self.schemas.append(json.load(f))
  35. except Exception as e:
  36. err = ("Could not load json schema, "
  37. "Obtained error {}".format(e))
  38. self._log.error(err)
  39. raise Exception(err)
  40. @property
  41. def _collection_names(self) -> list:
  42. '''
  43. '''
  44. # Don't use strip() instaed of replace since schema_c.strip(schema_)
  45. # will discard the c as well which is not a appropriate output
  46. return [os.path.basename(p).replace("schema_","").split(".")[0] for p in self._schema_paths]
  47. def get_fields(self) -> list:
  48. '''
  49. '''
  50. return self._parse()
  51. def get_fields_restricted_to_collection(self, collection_name: str) -> list:
  52. '''
  53. '''
  54. schemas = [self.schemas[self._collection_names.index(collection_name)]]
  55. return self._parse(schemas=schemas)
  56. def get_required_fields(self) -> list:
  57. '''
  58. '''
  59. return self._parse(required_only=True)
  60. def get_mongo_types(self) -> dict:
  61. '''
  62. '''
  63. return self._parse(field_info="bsonType")
  64. def get_datetime_fields(self):
  65. '''
  66. '''
  67. mongo_types = self.get_mongo_types()
  68. return [k for k, v in mongo_types.items()
  69. if v in ["date", "timestamp", "Date", "Timestamp"]]
  70. def get_python_types(self) -> dict:
  71. '''
  72. '''
  73. mongo_types = self.get_mongo_types()
  74. python_types = {}
  75. bson_to_python_types = {"double": float,
  76. "decimal": float,
  77. "string": str,
  78. "object": object,
  79. "array": list,
  80. "bool": bool,
  81. "int": int,
  82. "long": int,
  83. "date": np.dtype('<M8[ns]'),
  84. "timestamp": np.dtype('<M8[ns]')
  85. }
  86. for k, v in mongo_types.items():
  87. if isinstance(v, list):
  88. if ("date" in v) or ("timestamp" in v):
  89. v = "date"
  90. elif "string" in v:
  91. v = "string"
  92. elif ("double" in v) or ("decimal" in v):
  93. v = "double"
  94. elif ("null" in v) and (len(v) == 2) and ("int" not in v):
  95. v = [t for t in v if type != "null"][0]
  96. else:
  97. err = "Type {0}: {1} not convertibale".format(k, v)
  98. self._log.error(err)
  99. raise Exception(err)
  100. if v in bson_to_python_types:
  101. python_types[k] = bson_to_python_types[v]
  102. return python_types
  103. def get_patterns(self) -> dict:
  104. '''
  105. '''
  106. return self._parse(field_info="pattern")
  107. def get_default_values(self) -> dict:
  108. '''
  109. '''
  110. return self._parse(field_info="default")
  111. def get_allowed_values(self) -> dict:
  112. '''
  113. '''
  114. return self._parse(field_info="enum")
  115. def get_maximum_value(self) -> dict:
  116. '''
  117. '''
  118. return self._parse(field_info="maximum")
  119. def get_minimum_value(self) -> dict:
  120. '''
  121. '''
  122. return self._parse(field_info="minimum")
  123. def get_max_items(self) -> dict:
  124. '''
  125. '''
  126. return self._parse(field_info="maxItems")
  127. def get_min_items(self) -> dict:
  128. '''
  129. '''
  130. return self._parse(field_info="minItems")
  131. def get_field_descriptions(self) -> dict:
  132. '''
  133. '''
  134. return self._parse(field_info="description")
  135. def _parse(self,
  136. field_info: str = None,
  137. required_only: bool = False,
  138. schemas: list = None):
  139. '''
  140. '''
  141. if schemas is None:
  142. schemas = self.schemas
  143. result = self._parse_one(schema=schemas[0],
  144. field_info=field_info,
  145. required_only=required_only)
  146. for schema in schemas[1:]:
  147. next_result = self._parse_one(schema=schema,
  148. field_info=field_info,
  149. required_only=required_only)
  150. if isinstance(result, list):
  151. result.extend(next_result)
  152. else:
  153. result.update(next_result)
  154. return result
  155. def _parse_one(self,
  156. schema: dict,
  157. field_info: str = None,
  158. required_only: bool = False,
  159. super_field_name: str = None,
  160. already_parsed: (list, dict) = None) -> (list, dict):
  161. '''
  162. Recursive function that returns a list of (nested) field names or
  163. a dictionary of (nested) field names with field characteristics.
  164. :param schema: if None => entire self.schema, or a sub-schema
  165. of self.schema
  166. :param field_info: optional, if provided a dictionary of field
  167. names with field characteristics is returned (for examples
  168. bsonType of each field), else a list of fields is returned
  169. :param required_only: when True, only returns fields marked as
  170. required in the mongo schema
  171. :param super_field_name: needed for recursion
  172. Example: the field 'article' has
  173. subfields 'id' and 'supplier'.
  174. If we parse the sub-document corresponding to article, then
  175. super_field_name is'article' and we might get an output like
  176. {'article.id': string, 'article.supplier': string}
  177. :param alread_parsed: needed for recursion
  178. '''
  179. schema = deepcopy(schema)
  180. assert(isinstance(schema, dict)),\
  181. "Parameter 'schema' must be a dict"
  182. if field_info is None:
  183. # parse a list of fields
  184. if already_parsed is None:
  185. already_parsed = []
  186. else:
  187. assert(isinstance(already_parsed, list)),\
  188. "Parameter 'already_parsed' must be of type list"
  189. else:
  190. # parse a dictionary of field names with field characteristics
  191. if already_parsed is None:
  192. already_parsed = {}
  193. else:
  194. assert(isinstance(already_parsed, dict)),\
  195. "Parameter 'already_parsed' must be of type dict"
  196. # If schema is nested, then
  197. # either it is of bsonType object
  198. # and the field information is stored under the key 'properties'
  199. # or it is of bsonType array
  200. # and the field information is stored in sub-schemas
  201. # under the key 'items'
  202. # if schema is of bsonType object
  203. if "properties" in schema.keys():
  204. if "required" in schema.keys():
  205. required_subfields = schema["required"]
  206. else:
  207. required_subfields = []
  208. for sub_field_name in schema["properties"].keys():
  209. sub_schema = schema["properties"][sub_field_name]
  210. # only process fields that are required
  211. if required_only and\
  212. (sub_field_name not in required_subfields):
  213. pass
  214. else:
  215. if super_field_name is not None:
  216. field_name = '.'.join([super_field_name,
  217. sub_field_name])
  218. else:
  219. field_name = sub_field_name
  220. # if the given sub-field is nested, parse the
  221. # sub-schema corresponding to this sub-field
  222. self._parse_one(
  223. schema=sub_schema,
  224. super_field_name=field_name,
  225. field_info=field_info,
  226. already_parsed=already_parsed,
  227. required_only=required_only)
  228. # if schema is of bsonType array
  229. elif "items" in schema.keys():
  230. # one schema for all items
  231. if isinstance(schema["items"], dict):
  232. sub_schema = schema["items"]
  233. self._parse_one(schema=sub_schema,
  234. super_field_name=super_field_name,
  235. field_info=field_info,
  236. already_parsed=already_parsed,
  237. required_only=required_only)
  238. # list of separate schemas for each item
  239. elif isinstance(schema["items"], list):
  240. for sub_schema in schema["items"]:
  241. self._parse_one(schema=sub_schema,
  242. super_field_name=super_field_name,
  243. field_info=field_info,
  244. already_parsed=already_parsed,
  245. required_only=required_only)
  246. else:
  247. raise Exception(('Schema is not composed correctly: '
  248. 'items must be a dictionary or a list'))
  249. else:
  250. # If neither properties nor items is in schema keys
  251. # we reached the last level of nestedness,
  252. # field information is stored in the schema keys.
  253. field_name = super_field_name
  254. if field_info is None:
  255. already_parsed.append(field_name)
  256. else:
  257. if field_info in schema.keys():
  258. already_parsed[field_name] = schema[field_info]
  259. else:
  260. pass
  261. return already_parsed
  262. if __name__ == "__main__":
  263. # Only for testing
  264. schema_path = os.path.join(".", "mongo_schema", "schema_wheelsets.json")
  265. if os.path.isfile(schema_path):
  266. parse_obj = ParseJsonSchema(schema_paths=schema_path)
  267. fields = parse_obj.get_fields()
  268. required_fileds = parse_obj.get_required_fields()
  269. patterns = parse_obj.get_patterns()
  270. mongo_types = parse_obj.get_mongo_types()
  271. python_types_except_dates = parse_obj.get_python_types()
  272. datetime_fields = parse_obj.get_datetime_fields()
  273. allowed_values = parse_obj.get_allowed_values()
  274. descriptions = parse_obj.get_field_descriptions()