|
@@ -51,7 +51,8 @@ class DataFrameToCollection():
|
|
|
def to_list_of_documents(self, data: pd.DataFrame,
|
|
|
grp_fields: list,
|
|
|
schema: dict = None,
|
|
|
- _final_step: bool = True) -> list:
|
|
|
+ _final_step: bool = True,
|
|
|
+ already_reshaped: list = []) -> list:
|
|
|
'''
|
|
|
Reshapes a pandas dataframe to a list of documents according
|
|
|
to a complex (json) mongodb schema
|
|
@@ -84,128 +85,134 @@ class DataFrameToCollection():
|
|
|
if field not in self._unroll_nested_names(data.columns):
|
|
|
continue
|
|
|
|
|
|
- field_type = schema["properties"][field]["bsonType"]
|
|
|
+ if field in already_reshaped:
|
|
|
+ print(self._log.green,'Adding ',field, "to reshaped fields",self._log.reset)
|
|
|
+ reshaped_field = data.groupby(grp_fields, sort=False)[field]\
|
|
|
+ .apply(self._make_flattened_list_of_distinct)
|
|
|
+ reshaped_fields.append(reshaped_field)
|
|
|
+ else:
|
|
|
+ field_type = schema["properties"][field]["bsonType"]
|
|
|
|
|
|
- # if field has a simple type
|
|
|
- if field_type not in ["array", "object"]:
|
|
|
+ # if field has a simple type
|
|
|
+ if field_type not in ["array", "object"]:
|
|
|
|
|
|
- grp_fields = [c for c in grp_fields if c in data.columns]
|
|
|
+ grp_fields = [c for c in grp_fields if c in data.columns]
|
|
|
|
|
|
- # check that there is only one possible value of this field
|
|
|
- n_distinct_values = data.groupby(grp_fields, sort=False)[field].nunique().max()
|
|
|
+ # check that there is only one possible value of this field
|
|
|
+ n_distinct_values = data.groupby(grp_fields, sort=False)[field].nunique().max()
|
|
|
|
|
|
- # n_distinct_valus can be 0 if the column only contains NaN values
|
|
|
- if n_distinct_values > 1:
|
|
|
- err = "Field {0} is not unique with respect to {1}"\
|
|
|
- .format(field, grp_fields)
|
|
|
+ # n_distinct_valus can be 0 if the column only contains NaN values
|
|
|
+ if n_distinct_values > 1:
|
|
|
+ err = "Field {0} is not unique with respect to {1}"\
|
|
|
+ .format(field, grp_fields)
|
|
|
|
|
|
- self._log.error(err)
|
|
|
- raise Exception(err)
|
|
|
+ self._log.error(err)
|
|
|
+ raise Exception(err)
|
|
|
|
|
|
- if field not in grp_fields:
|
|
|
- reshaped_field = data.groupby(grp_fields, sort=False)[field].first()
|
|
|
- else:
|
|
|
- reshaped_field =\
|
|
|
- data[grp_fields].drop_duplicates()\
|
|
|
- .set_index(grp_fields, drop=False)[field]
|
|
|
+ if field not in grp_fields:
|
|
|
+ reshaped_field = data.groupby(grp_fields, sort=False)[field].first()
|
|
|
+ else:
|
|
|
+ reshaped_field =\
|
|
|
+ data[grp_fields].drop_duplicates()\
|
|
|
+ .set_index(grp_fields, drop=False)[field]
|
|
|
|
|
|
- reshaped_fields.append(reshaped_field)
|
|
|
+ reshaped_fields.append(reshaped_field)
|
|
|
|
|
|
- # if field is sub-document (dictionary)
|
|
|
- elif field_type == "object":
|
|
|
+ # if field is sub-document (dictionary)
|
|
|
+ elif field_type == "object":
|
|
|
|
|
|
- sub_schema = deepcopy(schema["properties"][field])
|
|
|
+ sub_schema = deepcopy(schema["properties"][field])
|
|
|
|
|
|
- # rename sub-schema properties to match with data column names
|
|
|
- sub_schema["properties"] =\
|
|
|
- {".".join([field, k]): v for k, v
|
|
|
- in sub_schema["properties"].items()}
|
|
|
+ # rename sub-schema properties to match with data column names
|
|
|
+ sub_schema["properties"] =\
|
|
|
+ {".".join([field, k]): v for k, v
|
|
|
+ in sub_schema["properties"].items()}
|
|
|
|
|
|
- sub_data = self.to_list_of_documents(
|
|
|
- data=data,
|
|
|
- schema=sub_schema,
|
|
|
- grp_fields=grp_fields,
|
|
|
- _final_step=False)
|
|
|
+ sub_data = self.to_list_of_documents(
|
|
|
+ data=data,
|
|
|
+ schema=sub_schema,
|
|
|
+ grp_fields=grp_fields,
|
|
|
+ _final_step=False)
|
|
|
|
|
|
- # Need to be checked since child elements can be empty
|
|
|
- if sub_data is not None:
|
|
|
+ # Need to be checked since child elements can be empty
|
|
|
+ if sub_data is not None:
|
|
|
|
|
|
- reshaped_field = sub_data.apply(self._make_dict, axis=1)
|
|
|
- reshaped_field.name = field
|
|
|
+ reshaped_field = sub_data.apply(self._make_dict, axis=1)
|
|
|
+ reshaped_field.name = field
|
|
|
|
|
|
- reshaped_fields.append(reshaped_field)
|
|
|
+ reshaped_fields.append(reshaped_field)
|
|
|
|
|
|
- # if field is a list of dictionaries
|
|
|
- elif field_type == "array":
|
|
|
+ # if field is a list of dictionaries
|
|
|
+ elif field_type == "array":
|
|
|
|
|
|
|
|
|
- items_type = schema["properties"][field]["items"]["bsonType"]
|
|
|
+ items_type = schema["properties"][field]["items"]["bsonType"]
|
|
|
|
|
|
- if items_type == "object":
|
|
|
- array_object = time.time()
|
|
|
- sub_schema = deepcopy(schema["properties"][field]["items"])
|
|
|
+ if items_type == "object":
|
|
|
+ array_object = time.time()
|
|
|
+ sub_schema = deepcopy(schema["properties"][field]["items"])
|
|
|
|
|
|
- # rename sub-schema properties to match data column names
|
|
|
- sub_schema["properties"] =\
|
|
|
- {".".join([field, k]): v for k, v in
|
|
|
- sub_schema["properties"].items()}
|
|
|
+ # rename sub-schema properties to match data column names
|
|
|
+ sub_schema["properties"] =\
|
|
|
+ {".".join([field, k]): v for k, v in
|
|
|
+ sub_schema["properties"].items()}
|
|
|
|
|
|
- # extend grp fields by sub-fields of field simple types
|
|
|
- sub_grp_fields = [f for f in sub_schema["properties"]
|
|
|
- if (sub_schema["properties"][f]["bsonType"] not in ["array", "object"])
|
|
|
- and (f in data.columns)]
|
|
|
+ # extend grp fields by sub-fields of field simple types
|
|
|
+ sub_grp_fields = [f for f in sub_schema["properties"]
|
|
|
+ if (sub_schema["properties"][f]["bsonType"] not in ["array", "object"])
|
|
|
+ and (f in data.columns)]
|
|
|
|
|
|
- if len(sub_grp_fields) == 0:
|
|
|
- err = ("One of the sub-keys in a list of documents"
|
|
|
- " must be of simple type for the field {}"
|
|
|
- .format(field))
|
|
|
+ if len(sub_grp_fields) == 0:
|
|
|
+ err = ("One of the sub-keys in a list of documents"
|
|
|
+ " must be of simple type for the field {}"
|
|
|
+ .format(field))
|
|
|
|
|
|
- self._log.error(err)
|
|
|
- raise Exception(err)
|
|
|
+ self._log.error(err)
|
|
|
+ raise Exception(err)
|
|
|
|
|
|
- # group and reshape sub-fields with complex types
|
|
|
- sub_data = self.to_list_of_documents(
|
|
|
- data=data,
|
|
|
- schema=sub_schema,
|
|
|
- grp_fields=grp_fields + sub_grp_fields,
|
|
|
- _final_step=False)
|
|
|
+ # group and reshape sub-fields with complex types
|
|
|
+ sub_data = self.to_list_of_documents(
|
|
|
+ data=data,
|
|
|
+ schema=sub_schema,
|
|
|
+ grp_fields=grp_fields + sub_grp_fields,
|
|
|
+ _final_step=False)
|
|
|
|
|
|
- if sub_data is not None:
|
|
|
+ if sub_data is not None:
|
|
|
|
|
|
- # gether the results into a list of dictionaries
|
|
|
- sub_data = sub_data.apply(self._make_dict, axis=1)
|
|
|
+ # gether the results into a list of dictionaries
|
|
|
+ sub_data = sub_data.apply(self._make_dict, axis=1)
|
|
|
|
|
|
- sub_data.name = field
|
|
|
- sub_data = sub_data.reset_index(grp_fields)
|
|
|
- ######################################################
|
|
|
- ######## OPTIMIZATIONS MAY BE POSSIBLE HERE ##########
|
|
|
- reshaped_field =\
|
|
|
- sub_data.groupby(grp_fields, sort=False)[field]\
|
|
|
- .apply(self._make_list_of_distinct)
|
|
|
- ######################################################
|
|
|
- reshaped_fields.append(reshaped_field)
|
|
|
+ sub_data.name = field
|
|
|
+ sub_data = sub_data.reset_index(grp_fields)
|
|
|
+ ######################################################
|
|
|
+ ######## OPTIMIZATIONS MAY BE POSSIBLE HERE ##########
|
|
|
+ reshaped_field =\
|
|
|
+ sub_data.groupby(grp_fields, sort=False)[field]\
|
|
|
+ .apply(self._make_list_of_distinct)
|
|
|
+ ######################################################
|
|
|
+ reshaped_fields.append(reshaped_field)
|
|
|
|
|
|
|
|
|
- # if field is a list of values with simple type
|
|
|
- elif items_type == "array":
|
|
|
- grp_fields = [c for c in grp_fields if c in data.columns]
|
|
|
+ # if field is a list of values with simple type
|
|
|
+ elif items_type == "array":
|
|
|
+ grp_fields = [c for c in grp_fields if c in data.columns]
|
|
|
|
|
|
- if field in data.columns:
|
|
|
+ if field in data.columns:
|
|
|
|
|
|
- reshaped_field = data.groupby(grp_fields, sort=False)[field]\
|
|
|
- .apply(self._make_list_of_distinct)
|
|
|
+ reshaped_field = data.groupby(grp_fields, sort=False)[field]\
|
|
|
+ .apply(self._make_list_of_distinct)
|
|
|
|
|
|
- reshaped_fields.append(reshaped_field)
|
|
|
- else:
|
|
|
+ reshaped_fields.append(reshaped_field)
|
|
|
+ else:
|
|
|
|
|
|
- grp_fields = [c for c in grp_fields if c in data.columns]
|
|
|
+ grp_fields = [c for c in grp_fields if c in data.columns]
|
|
|
|
|
|
- if field in data.columns:
|
|
|
+ if field in data.columns:
|
|
|
|
|
|
- reshaped_field = data.groupby(grp_fields, sort=False)[field]\
|
|
|
- .apply(self._make_flattened_list_of_distinct)
|
|
|
+ reshaped_field = data.groupby(grp_fields, sort=False)[field]\
|
|
|
+ .apply(self._make_flattened_list_of_distinct)
|
|
|
|
|
|
- reshaped_fields.append(reshaped_field)
|
|
|
+ reshaped_fields.append(reshaped_field)
|
|
|
|
|
|
if len(reshaped_fields) > 0:
|
|
|
|