import atexit
import json
import logging
import os
from typing import Any, Dict, List, Tuple, TypeAlias
from dotenv import load_dotenv
from pymongo.collection import Collection
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
from academic_metrics.configs import (
configure_logging,
DEBUG,
)
CollectionData: TypeAlias = List[Dict[str, Any]]
"""Type alias representing a collection of documents from MongoDB.
Each document is represented as a dictionary with string keys and arbitrary values.
Type:
List[Dict[str, Any]]: A list of dictionaries where each dictionary represents a MongoDB document.
"""
DatabaseSnapshot: TypeAlias = Tuple[CollectionData, CollectionData, CollectionData]
"""Type alias representing a snapshot of all collections in the database.
Contains data from articles, categories, and faculty collections in that order.
Type:
Tuple[CollectionData, CollectionData, CollectionData]: A tuple containing:
- article_data (CollectionData): Documents from the articles collection
- category_data (CollectionData): Documents from the categories collection
- faculty_data (CollectionData): Documents from the faculty collection
"""
[docs]
class DatabaseWrapper:
"""A wrapper class for MongoDB operations.
Attributes:
logger (logging.Logger): Logger for logging messages.
client (MongoClient): MongoDB client.
db (Database): MongoDB database.
article_collection (Collection): MongoDB collection for article data.
category_collection (Collection): MongoDB collection for category data.
faculty_collection (Collection): MongoDB collection for faculty data.
Methods:
_test_connection: Test the connection to the MongoDB server.
get_dois: Get all DOIs from the article collection.
get_all_data: Get all data from the article, category, and faculty collections.
insert_categories: Insert multiple categories into the collection.
update_category: Update an existing category.
insert_articles: Insert multiple articles into the collection.
insert_faculty: Insert multiple faculty entries into the collection.
update_faculty: Update an existing faculty member.
process: Process data and insert it into the appropriate collection.
run_all_process: Run the process method for all collections.
clear_collection: Clear the entire collection.
close_connection: Close the connection to the MongoDB server.
"""
[docs]
def __init__(self, *, db_name: str, mongo_uri: str):
"""Initialize the DatabaseWrapper with database name, collection name, and MongoDB URL.
Args:
db_name (str): Name of the database.
mongo_uri (str): MongoDB URI.
"""
self.logger = configure_logging(
module_name=__name__,
log_file_name="database_setup",
log_level=DEBUG,
)
if not mongo_uri:
print("Url error")
return
self.mongo_uri = mongo_uri
self.client = MongoClient(self.mongo_uri, server_api=ServerApi("1"))
self.db = self.client[db_name]
self.article_collection: Collection = self.db["article_data"]
self.category_collection: Collection = self.db["category_data"]
self.faculty_collection: Collection = self.db["faculty_data"]
self._test_connection()
atexit.register(self.close_connection)
[docs]
def _test_connection(self):
"""Test the connection to the MongoDB server."""
try:
self.client.admin.command("ping")
self.logger.info(
"Pinged your deployment. You successfully connected to MongoDB!"
)
except Exception as e:
self.logger.error(f"Connection error: {e}")
[docs]
def get_dois(self) -> List[str]:
"""Get all DOIs from the article collection.
Returns:
doi_list (List[str]): List of DOIs.
"""
articles = self.article_collection.find({})
doi_list = []
for article in articles:
doi_list.append(article["_id"])
self.logger.info(f"Retrieved DOIs: {doi_list}")
return doi_list
[docs]
def get_all_data(self) -> DatabaseSnapshot:
"""Get all data from the article, category, and faculty collections.
Returns:
Tuple[CollectionData, CollectionData, CollectionData]: A tuple containing:
- articles (CollectionData): Documents from the articles collection
- categories (CollectionData): Documents from the categories collection
- faculty (CollectionData): Documents from the faculty collection
"""
articles: CollectionData = list(self.article_collection.find({}))
categories: CollectionData = list(self.category_collection.find({}))
faculty: CollectionData = list(self.faculty_collection.find({}))
self.logger.info("Retrieved all data from collections.")
return (articles, categories, faculty)
[docs]
def insert_categories(self, category_data: List[Dict[str, Any]]):
"""Insert multiple categories into the collection.
If a category already exists, add the numbers and extend the lists.
Args:
category_data (List[Dict[str, Any]]): List of category data.
"""
if not category_data:
self.logger.error("Category data is empty or None")
for item in category_data:
existing_data = self.category_collection.find_one({"_id": item["_id"]})
if existing_data:
new_item = self.update_category(existing_data, item)
self.category_collection.update_one(
{"_id": item["_id"]}, {"$set": new_item}
)
self.logger.info(f"Updated category: {item['_id']}")
else:
self.category_collection.insert_one(item)
self.logger.info(f"Inserted new category: {item['_id']}")
[docs]
def update_category(
self, existing_data: Dict[str, Any], new_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Update existing category data with new data, handling None values and logging state.
Args:
existing_data (dict[str, Any]): Existing category data.
new_data (dict[str, Any]): New category data.
Returns:
existing_data (Dict[str, Any]): Updated category data.
"""
# ! THESE HAVE TO BE UNION NOT UPDATE
# ! ALSO USING .GET() SO IT DOESN'T THROW AN ERROR IF THE KEY DOESN'T EXIST
# ! NOW DOING LIST(SET()) TO CONVERT TO LIST
# Get DOI lists with None protection
existing_dois = existing_data.get("doi_list", []) or []
new_dois = new_data.get("doi_list", []) or []
self.logger.debug(f"DOIs - Existing: {existing_dois}, New: {new_dois}")
# Only update if there's no intersection between DOI lists
if not set(existing_dois).intersection(set(new_dois)):
self.logger.info(
f"No DOI intersection found for category {existing_data.get('_id')} - updating data"
)
# Calculate new citation average
scaled_averages = len(existing_dois) * existing_data.get(
"citation_average", 0
) + len(new_dois) * new_data.get("citation_average", 0)
new_average = scaled_averages / (len(existing_dois) + len(new_dois))
existing_data["citation_average"] = new_average
self.logger.debug(f"Updated citation average to: {new_average}")
# Update lists using set operations with None protection
existing_data["doi_list"] = list(set(existing_dois).union(new_dois))
existing_data["themes"] = list(
set(existing_data.get("themes", []) or []).union(
new_data.get("themes", []) or []
)
)
existing_data["faculty"] = list(
set(existing_data.get("faculty", []) or []).union(
new_data.get("faculty", []) or []
)
)
existing_data["departments"] = list(
set(existing_data.get("departments", []) or []).union(
new_data.get("departments", []) or []
)
)
existing_data["titles"] = list(
set(existing_data.get("titles", []) or []).union(
new_data.get("titles", []) or []
)
)
# Update numeric counts
existing_data["faculty_count"] = len(existing_data["faculty"])
existing_data["department_count"] = len(existing_data["departments"])
existing_data["article_count"] = len(existing_data["titles"])
existing_data["tc_count"] = existing_data.get("tc_count", 0) + new_data.get(
"tc_count", 0
)
self.logger.debug(
f"Updated counts - Faculty: {existing_data['faculty_count']}, "
f"Departments: {existing_data['department_count']}, "
f"Articles: {existing_data['article_count']}, "
f"TC: {existing_data['tc_count']}"
)
else:
self.logger.info(
f"DOI intersection found for category {existing_data.get('_id')} - skipping update"
)
return existing_data
[docs]
def insert_articles(self, article_data: List[Dict[str, Any]]):
"""Insert multiple articles into the collection.
If an article already exists, merge the new data with the existing data.
Args:
article_data (List[Dict[str, Any]]): List of article data.
"""
for item in article_data:
try:
self.article_collection.insert_one(item)
self.logger.info(f"Inserted new articles: {item['_id']}")
except Exception as e:
self.logger.info(f"Duplicate content not adding {e}")
[docs]
def insert_faculty(self, faculty_data: List[Dict[str, Any]]):
"""Insert multiple faculty entries into the collection.
If a faculty member already exists, update the data accordingly.
Args:
faculty_data (List[Dict[str, Any]]): List of faculty data.
"""
for item in faculty_data:
existing_data = self.faculty_collection.find_one({"_id": item["_id"]})
if existing_data:
new_item = self.update_faculty(existing_data, item)
self.faculty_collection.update_one(
{"_id": item["_id"]}, {"$set": new_item}
)
self.logger.info(f"Updated faculty: {item['_id']}")
else:
self.faculty_collection.insert_one(item)
self.logger.info(f"Inserted new faculty: {item['_id']}")
[docs]
def update_faculty(
self, existing_data: Dict[str, Any], new_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Update existing faculty data with new data, handling None values and logging state.
Args:
existing_data (Dict[str, Any]): Existing faculty data.
new_data (Dict[str, Any]): New faculty data.
Returns:
existing_data (Dict[str, Any]): Updated faculty data.
"""
# Get DOI lists with None protection
existing_dois = existing_data.get("dois", []) or []
new_dois = new_data.get("dois", []) or []
self.logger.debug(f"DOIs - Existing: {existing_dois}, New: {new_dois}")
# Only update if there's no intersection between DOI lists
if not set(existing_dois).intersection(set(new_dois)):
self.logger.info(
f"No DOI intersection found for faculty {existing_data.get('_id')} - updating data"
)
# Update numeric values
existing_data["total_citations"] = existing_data.get(
"total_citations", 0
) + new_data.get("total_citations", 0)
# Update lists using set operations with None protection
existing_data["dois"] = list(set(existing_dois).union(new_dois))
# Handle department affiliations as list
existing_affiliations = (
existing_data.get("department_affiliations", []) or []
)
new_affiliations = new_data.get("department_affiliations", []) or []
existing_data["department_affiliations"] = list(
set(existing_affiliations).union(new_affiliations)
)
# Update all set-based fields with None protection
existing_data["titles"] = list(
set(existing_data.get("titles", []) or []).union(
new_data.get("titles", []) or []
)
)
existing_data["categories"] = list(
set(existing_data.get("categories", []) or []).union(
new_data.get("categories", []) or []
)
)
existing_data["top_level_categories"] = list(
set(existing_data.get("top_level_categories", []) or []).union(
new_data.get("top_level_categories", []) or []
)
)
existing_data["mid_level_categories"] = list(
set(existing_data.get("mid_level_categories", []) or []).union(
new_data.get("mid_level_categories", []) or []
)
)
existing_data["low_level_categories"] = list(
set(existing_data.get("low_level_categories", []) or []).union(
new_data.get("low_level_categories", []) or []
)
)
existing_data["category_urls"] = list(
set(existing_data.get("category_urls", []) or []).union(
new_data.get("category_urls", []) or []
)
)
existing_data["top_category_urls"] = list(
set(existing_data.get("top_category_urls", []) or []).union(
new_data.get("top_category_urls", []) or []
)
)
existing_data["mid_category_urls"] = list(
set(existing_data.get("mid_category_urls", []) or []).union(
new_data.get("mid_category_urls", []) or []
)
)
existing_data["low_category_urls"] = list(
set(existing_data.get("low_category_urls", []) or []).union(
new_data.get("low_category_urls", []) or []
)
)
existing_data["themes"] = list(
set(existing_data.get("themes", []) or []).union(
new_data.get("themes", []) or []
)
)
existing_data["journals"] = list(
set(existing_data.get("journals", []) or []).union(
new_data.get("journals", []) or []
)
)
self.logger.debug(
f"Updated counts - Citations: {existing_data['total_citations']}, "
f"DOIs: {len(existing_data['dois'])}"
)
else:
self.logger.info(
f"DOI intersection found for faculty {existing_data.get('_id')} - skipping update"
)
return existing_data
[docs]
def process(self, data: List[Dict[str, Any]], collection: str):
"""Process data and insert it into the appropriate collection.
Args:
data (List[Dict[str, Any]]): Data to be inserted.
collection (str): Name of the collection to insert the data into.
"""
if collection == "article_data":
self.insert_articles(data)
elif collection == "category_data":
self.insert_categories(data)
elif collection == "faculty_data":
self.insert_faculty(data)
[docs]
def run_all_process(
self,
category_data: List[Dict[str, Any]],
article_data: List[Dict[str, Any]],
faculty_data: List[Dict[str, Any]],
):
"""Process all data and insert it into the appropriate collections.
Args:
category_data (List[Dict[str, Any]]): Category data.
article_data (List[Dict[str, Any]]): Article data.
faculty_data (List[Dict[str, Any]]): Faculty data.
"""
self.process(category_data, "category_data")
self.process(article_data, "article_data")
self.process(faculty_data, "faculty_data")
[docs]
def fix_counts(self):
existing_data = list(self.category_collection.find({}))
for data in existing_data:
data["faculty_count"] = len(data["faculty"])
data["department_count"] = len(data["departments"])
self.category_collection.update_one({"_id": data["_id"]}, {"$set": data})
[docs]
def clear_collection(self):
"""Clear the entire collection."""
self.category_collection.delete_many({})
self.article_collection.delete_many({})
self.faculty_collection.delete_many({})
self.logger.info("Cleared the entire collection")
[docs]
def close_connection(self):
"""Close the connection to the MongoDB server."""
self.client.close()
self.logger.info("Connection closed")
if __name__ == "__main__":
# Load environment variables
load_dotenv()
mongo_uri = os.getenv("MONGODB_URI")
# # Handle article data
# with open(
# "../../data/core/output_files/test_processed_article_stats_obj_data.json", "r"
# ) as f:
# article_data = json.load(f)
# # Handle category data
# with open(
# "../../data/core/output_files/test_processed_category_data.json", "r"
# ) as f:
# category_data = json.load(f)
# # Handle faculty data
# with open(
# "../../data/core/output_files/test_processed_global_faculty_stats_data.json",
# "r",
# ) as f:
# faculty_data = json.load(f)
database = DatabaseWrapper(db_name="Site_Data", mongo_uri=mongo_uri)
# database.clear_collection()
# database.process(article_data, "article_data")
# database.process(category_data, "category_data")
# database.process(faculty_data, "faculty_data")
database.fix_counts()