|
@@ -84,19 +84,37 @@ class MongodbHandler:
|
|
|
|
|
|
self._database_name = database_name
|
|
|
|
|
|
- def set_database(self, database_name):
|
|
|
+ def set_database(self, database_name: str):
|
|
|
+ '''
|
|
|
+ :param str database_name: Name of the database.
|
|
|
+ '''
|
|
|
+ assert(isinstance(database_name, str)),\
|
|
|
+ "Parameter 'database_name' must be a string type"
|
|
|
+
|
|
|
+ if database_name not in self._client.list_database_names():
|
|
|
+ self._log.info(('Database: {} didnt exist, it will be created for you once a collection is created in it').format(database_name))
|
|
|
self._database = self._client[database_name]
|
|
|
|
|
|
+
|
|
|
def drop_database(self):
|
|
|
'''
|
|
|
'''
|
|
|
- self._client.drop_database(self._database_name)
|
|
|
+ try:
|
|
|
+ self._client.drop_database(self._database_name)
|
|
|
+ except Exception as error:
|
|
|
+ self._log.log_and_raise_error(('Couldnt drop the database. Error: {}').format(error))
|
|
|
+
|
|
|
|
|
|
def drop_collection(self, collection_name: str):
|
|
|
'''
|
|
|
'''
|
|
|
- self._database[collection_name].drop()
|
|
|
-
|
|
|
+ try:
|
|
|
+ return_message = self._database.drop_collection(collection_name)
|
|
|
+ if 'errmsg' in return_message:
|
|
|
+ raise Exception(return_message['errmsg'])
|
|
|
+ except Exception as error:
|
|
|
+ self._log.log_and_raise_error(('Couldnt drop the collection {}. Error: {}').format(collection_name, error))
|
|
|
+
|
|
|
def create_index(self, collection_name: str, key: str, direction: (str, int)='text'):
|
|
|
'''
|
|
|
:param str collection_name: name on the collection for which the schema will be set.
|
|
@@ -108,12 +126,15 @@ class MongodbHandler:
|
|
|
|
|
|
assert(isinstance(collection_name, str)),\
|
|
|
"Parameter 'collection_name' must be a string type"
|
|
|
- assert(isinstance(key, (str, int)) & key in allowed_directions),\
|
|
|
- "Parameter 'key' must be a string or integer type and must be one of these values: 1, -1, '2d', 'geoHaystack', '2dsphere', 'hashed', 'text' "
|
|
|
+ assert(isinstance(key, (str, int))),\
|
|
|
+ "Parameter 'key' must be a string or integer type"
|
|
|
+ assert(direction in allowed_directions),\
|
|
|
+ "Parameter 'key' must be one of these values: 1, -1, '2d', 'geoHaystack', '2dsphere', 'hashed', 'text' "
|
|
|
assert(isinstance(direction, str)),\
|
|
|
"Parameter 'direction' must be a string type"
|
|
|
|
|
|
- self._database[collection_name].create_index([key, direction])
|
|
|
+ self._database[collection_name].create_index([(key, direction)], name=key)
|
|
|
+ #collection.create_index([('field_i_want_to_index', pymongo.TEXT)], name='search_index', default_language='english')
|
|
|
|
|
|
def set_collection_schema(self, collection_name: str, schema_path: str,
|
|
|
validation_level: str = 'moderate',validation_action: str = 'error'):
|
|
@@ -135,7 +156,6 @@ class MongodbHandler:
|
|
|
parse_obj = ParseJsonSchema(schema_paths=schema_path)
|
|
|
|
|
|
schema = parse_obj.read_schema_and_parse_for_mongodb(schema_path)
|
|
|
-
|
|
|
command = {
|
|
|
'collMod': collection_name,
|
|
|
'validator': {
|
|
@@ -162,13 +182,13 @@ class MongodbHandler:
|
|
|
if collection_name not in self._database.list_collection_names():
|
|
|
try:
|
|
|
self._log.info(("Collection '{}' has been created").format(collection_name))
|
|
|
- return self._database.create_collection(collection_name)
|
|
|
+ self._database.create_collection(collection_name)
|
|
|
|
|
|
except Exception as error:
|
|
|
self._log.log_and_raise_error(('An error occured while creating the new collection: {}. \nError: {}').format(collection_name, error))
|
|
|
else:
|
|
|
self._log.info(("Collection '{}' already exists").format(collection_name))
|
|
|
- return self._database[collection_name]
|
|
|
+ self._database[collection_name]
|
|
|
|
|
|
def insert_data_into_collection(self, data: (dict, list, np.ndarray, pd.DataFrame, pd.Series),
|
|
|
collection_name: str,
|
|
@@ -226,7 +246,7 @@ class MongodbHandler:
|
|
|
self.set_collection_schema(collection_name=collection_name, schema_path=schema_path)
|
|
|
|
|
|
def query_data_and_generate_dataframe(self, collection_name: str, attribute: str = None,
|
|
|
- attribute_value: str = None, comparison_operator: str = '$eq'):
|
|
|
+ attribute_value: str = None, comparison_operator: str = '$eq', index = None):
|
|
|
'''
|
|
|
|
|
|
'''
|
|
@@ -239,25 +259,25 @@ class MongodbHandler:
|
|
|
except Exception as error:
|
|
|
self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}: {}:{}. \nError:{}').format(collection_name, attribute_value, comparison_operator, attribute_value, error))
|
|
|
|
|
|
- return self.convert_mongo_data_into_dataframe(data)
|
|
|
+ return self.convert_mongo_data_into_dataframe(data, index)
|
|
|
|
|
|
def aggregate_data_and_generate_dataframe(self, collection_name: str, aggregation_pipeline: list):
|
|
|
|
|
|
try:
|
|
|
data = self._database[collection_name].aggregate(pipeline=aggregation_pipeline, allowDiskUse=True)
|
|
|
-
|
|
|
except Exception as error:
|
|
|
self._log.log_and_raise_error(('A problem occured when aggregating the collection {} with the pipeline {}. \nError: {}').format(collection_name, aggregation_pipeline, error))
|
|
|
|
|
|
return self.convert_mongo_data_into_dataframe(data)
|
|
|
|
|
|
- def convert_mongo_data_into_dataframe(self, data) -> pd.DataFrame():
|
|
|
+ def convert_mongo_data_into_dataframe(self, data, index: str = None) -> pd.DataFrame():
|
|
|
|
|
|
data = list(data)
|
|
|
try:
|
|
|
if len(data)> 0:
|
|
|
df = pd.DataFrame(data)
|
|
|
- df.set_index('radsatznummer', inplace=True)
|
|
|
+ if index is not None:
|
|
|
+ df.set_index(index, inplace=True)
|
|
|
return df
|
|
|
else:
|
|
|
self._log.warning(('No data for the query was found').format())
|