|
@@ -288,12 +288,12 @@ class MongodbHandler:
|
|
|
|
|
|
return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
|
|
|
- def convert_mongo_data_into_dataframe(self, data, index: str = None, collection_name: str = None) -> pd.DataFrame():
|
|
|
|
|
|
+ def convert_mongo_data_into_dataframe(self, data, index: str = None, collection_name: str = None, chunksize: int = 10000) -> pd.DataFrame():
|
|
|
|
|
|
start_time = time.time()
|
|
start_time = time.time()
|
|
'''
|
|
'''
|
|
self._log.info('Converting returned mongo data into a DataFrame')
|
|
self._log.info('Converting returned mongo data into a DataFrame')
|
|
- df = pd.DataFrame.from_records(data)
|
|
|
|
|
|
+
|
|
data = list(data)
|
|
data = list(data)
|
|
try:
|
|
try:
|
|
if len(data)> 0:
|
|
if len(data)> 0:
|
|
@@ -301,8 +301,7 @@ class MongodbHandler:
|
|
self._log.info(('{} rows were fetched from the {} collection').format(len(data), collection_name))
|
|
self._log.info(('{} rows were fetched from the {} collection').format(len(data), collection_name))
|
|
else:
|
|
else:
|
|
self._log.info(('{} rows were fetched from the database').format(len(data)))
|
|
self._log.info(('{} rows were fetched from the database').format(len(data)))
|
|
- #df = pd.DataFrame(data)
|
|
|
|
- #df = pd.DataFrame.from_records(data)
|
|
|
|
|
|
+ df = pd.DataFrame(data)
|
|
if index is not None:
|
|
if index is not None:
|
|
df.set_index(index, inplace=True)
|
|
df.set_index(index, inplace=True)
|
|
|
|
|
|
@@ -318,13 +317,13 @@ class MongodbHandler:
|
|
for index, value in enumerate(data):
|
|
for index, value in enumerate(data):
|
|
|
|
|
|
records.append(value)
|
|
records.append(value)
|
|
- if index % 1000 == 0:
|
|
|
|
|
|
+ if index % chunksize == 0:
|
|
frames.append(pd.DataFrame(records))
|
|
frames.append(pd.DataFrame(records))
|
|
records = []
|
|
records = []
|
|
if records:
|
|
if records:
|
|
frames.append(pd.DataFrame(records))
|
|
frames.append(pd.DataFrame(records))
|
|
return_df = pd.concat(frames)
|
|
return_df = pd.concat(frames)
|
|
- self._log.info(('DataFrame conversion is done, took {} seconds').format(time.time()-start_time))
|
|
|
|
|
|
+ self._log.info(('{} Rows were fetched from {}. DataFrame conversion is done, took {} seconds').format(len(return_df.index), collection_name if collection_name is not None else 'the database', time.time()-start_time))
|
|
|
|
|
|
return return_df
|
|
return return_df
|
|
|
|
|