runners package#

Submodules#

runners.pipeline module#

class academic_metrics.runners.pipeline.SaveOfflineKwargs[source]#

Bases: TypedDict

offline: bool#
run_crossref_before_file_load: bool#
make_files: bool#
extend: bool#
class academic_metrics.runners.pipeline.PipelineRunner(ai_api_key, crossref_affiliation, data_from_month, data_to_month, data_from_year, data_to_year, mongodb_uri, db_name='Site_Data', debug=False, pre_classification_model='gpt-4o-mini', classification_model='gpt-4o-mini', theme_model='gpt-4o-mini')[source]#

Bases: object

Orchestrates the academic metrics data processing pipeline.

This class manages the end-to-end process of collecting, processing, and storing academic publication data. It handles data collection from Crossref, classification of publications, generation of statistics, and storage in MongoDB.

SAVE_OFFLINE_KWARGS#

Default configuration for offline processing.

Type:

SaveOfflineKwargs

logger#

Pipeline-wide logger instance.

Type:

logging.Logger

ai_api_key#

API key for AI services.

Type:

str

db_name#

Name of the MongoDB database.

Type:

str

mongodb_uri#

URI for MongoDB connection.

Type:

str

db#

Database interface instance.

Type:

DatabaseWrapper

scraper#

Web scraping utility instance.

Type:

Scraper

crossref_wrapper#

Crossref API interface instance.

Type:

CrossrefWrapper

taxonomy#

Publication taxonomy utility.

Type:

Taxonomy

warning_manager#

Warning logging utility.

Type:

WarningManager

strategy_factory#

Strategy pattern factory.

Type:

StrategyFactory

utilities#

General utility functions.

Type:

Utilities

classification_orchestrator#

Publication classifier.

Type:

ClassificationOrchestrator

dataclass_factory#

Data class creation utility.

Type:

DataClassFactory

category_processor#

Category statistics processor.

Type:

CategoryProcessor

faculty_postprocessor#

Faculty data processor.

Type:

FacultyPostprocessor

department_postprocessor#

Department data processor.

Type:

DepartmentPostprocessor

debug#

Debug mode flag.

Type:

bool

run_pipeline()[source]#

Executes the main data processing pipeline.

_create_taxonomy()[source]#

Creates a new Taxonomy instance.

_create_classifier_factory()[source]#

Creates a new ClassifierFactory instance.

_create_warning_manager()[source]#

Creates a new WarningManager instance.

_create_strategy_factory()[source]#

Creates a new StrategyFactory instance.

_create_utilities_instance()[source]#

Creates a new Utilities instance.

_create_classification_orchestrator()[source]#

Creates a new ClassificationOrchestrator.

_create_orchestrator()[source]#

Creates a new CategoryDataOrchestrator.

_get_acf_func()[source]#

Returns the abstract classifier factory function.

_validate_api_key()[source]#

Validates the provided API key.

_make_files()[source]#

Creates split files from input files.

_load_files()[source]#

Loads and returns data from split files.

_create_dataclass_factory()[source]#

Creates a new DataClassFactory instance.

_create_crossref_wrapper()[source]#

Creates a new CrossrefWrapper instance.

_create_category_processor()[source]#

Creates a new CategoryProcessor instance.

_create_faculty_postprocessor()[source]#

Creates a new FacultyPostprocessor instance.

_create_scraper()[source]#

Creates a new Scraper instance.

_create_db()[source]#

Creates a new DatabaseWrapper instance.

_encode_affiliation()[source]#

URL encodes an affiliation string.

SAVE_OFFLINE_KWARGS: SaveOfflineKwargs = {'extend': False, 'make_files': False, 'offline': False, 'run_crossref_before_file_load': False}#
__init__(ai_api_key, crossref_affiliation, data_from_month, data_to_month, data_from_year, data_to_year, mongodb_uri, db_name='Site_Data', debug=False, pre_classification_model='gpt-4o-mini', classification_model='gpt-4o-mini', theme_model='gpt-4o-mini')[source]#

Initialize the PipelineRunner with necessary configurations and dependencies.

Parameters:
  • ai_api_key (str) – API key for AI services (e.g., OpenAI).

  • crossref_affiliation (str) – Institution name to search for in Crossref.

  • data_from_year (int) – Start year for publication data collection.

  • data_to_year (int) – End year for publication data collection.

  • mongodb_uri (str) – Connection URL for MongoDB instance.

  • db_name (str, optional) – Name of the MongoDB database. Defaults to “Site_Data”.

  • debug (bool, optional) – Enable debug mode for additional logging and controls. Defaults to False.

Raises:

Exception – If logger setup fails or required dependencies cannot be initialized.

run_pipeline(save_offline_kwargs={'extend': False, 'make_files': False, 'offline': False, 'run_crossref_before_file_load': False}, test_filtering=False, save_to_db=True)[source]#

Execute the main data processing pipeline.

This method orchestrates the entire pipeline process: 1. Retrieves existing DOIs from database 2. Collects new publication data from Crossref 3. Filters out duplicate articles 4. Runs AI classification on publications 5. Processes and generates category statistics 6. Saves processed data to MongoDB

Parameters:

save_offline_kwargs (SaveOfflineKwargs, optional) – Configuration for offline processing. Defaults to SAVE_OFFLINE_KWARGS. - offline: Whether to run in offline mode - run_crossref_before_file_load: Run Crossref before loading files - make_files: Generate new split files - extend: Extend existing data

Raises:

Exception – If there are errors in data processing or database operations.

test_run()[source]#
_create_taxonomy()[source]#

Create a new Taxonomy instance for publication classification.

Returns:

A new instance of the Taxonomy utility class.

Return type:

Taxonomy

_create_classifier_factory()[source]#

Create a new ClassifierFactory for generating publication classifiers.

Returns:

A factory instance configured with taxonomy and AI API key.

Return type:

ClassifierFactory

_create_warning_manager()[source]#

Create a new WarningManager for handling pipeline warnings.

Returns:

A new instance of the warning management utility.

Return type:

WarningManager

_create_strategy_factory()[source]#

Create a new StrategyFactory for generating processing strategies.

Returns:

A new instance of the strategy factory.

Return type:

StrategyFactory

_create_utilities_instance()[source]#

Create a new Utilities instance with required dependencies.

Returns:

A utility instance configured with strategy factory and warning manager.

Return type:

Utilities

_create_classification_orchestrator()[source]#

Create a new ClassificationOrchestrator for managing publication classification.

Returns:

An orchestrator instance configured with classifier factory and utilities.

Return type:

ClassificationOrchestrator

_create_orchestrator(data, extend)[source]#

Create a new CategoryDataOrchestrator for managing category data processing.

Parameters:
  • data (List[Dict[str, Any]]) – List of publication data to process.

  • extend (bool) – Whether to extend existing data.

Returns:

An orchestrator instance configured with all necessary processors and utilities.

Return type:

CategoryDataOrchestrator

_get_acf_func()[source]#

Get the abstract classifier factory function.

Returns:

A function that creates an AbstractClassifier

given a dictionary of DOIs and abstracts.

Return type:

Callable[[Dict[str, str]], ClassifierFactory]

_validate_api_key(validator, api_key)[source]#

Validate the provided API key.

Parameters:
  • validator (APIKeyValidator) – Validator instance to check the API key.

  • api_key (str) – API key to validate.

Raises:

ValueError – If the API key is invalid.

Return type:

None

_make_files()[source]#

Create split files from input files for offline processing.

Raises:

Exception – If input directory contains no files to process.

Return type:

None

_load_files()[source]#

Load all split files into a list of dictionaries.

Returns:

List of loaded data from split files.

Return type:

List[Dict[str, Any]]

Notes

Warnings are logged for any files that fail to load.

_create_dataclass_factory()[source]#

Create a new DataClassFactory for generating data classes.

Returns:

A new instance of the data class factory.

Return type:

DataClassFactory

_create_crossref_wrapper(**kwargs)[source]#

Create a new CrossrefWrapper for interacting with the Crossref API.

Parameters:

**kwargs – Keyword arguments for CrossrefWrapper configuration.

Returns:

A configured CrossrefWrapper instance.

Return type:

CrossrefWrapper

_create_category_processor()[source]#

Create a new CategoryProcessor for processing publication categories.

Returns:

A processor instance configured with utilities and factories.

Return type:

CategoryProcessor

_create_minhash_util()[source]#

Create a new MinHashUtility instance for minhash operations.

Returns:

A new instance of the minhash utility.

Return type:

MinHashUtility

_create_faculty_postprocessor(minhash_util)[source]#

Create a new FacultyPostprocessor for processing faculty data.

Returns:

A new instance of the faculty post-processor.

Return type:

FacultyPostprocessor

_create_department_postprocessor(minhash_util)[source]#

Create a new DepartmentPostprocessor for processing department data.

Returns:

A new instance of the department post-processor.

Return type:

DepartmentPostprocessor

_create_scraper()[source]#

Create a new Scraper instance for web scraping.

Returns:

A scraper instance configured with the AI API key.

Return type:

Scraper

_create_db()[source]#

Create a new DatabaseWrapper for database operations.

Returns:

A database wrapper configured with connection details.

Return type:

DatabaseWrapper

static _encode_affiliation(affiliation)[source]#

URL encode an affiliation string if it’s not already encoded.

Checks if the string is already properly URL-encoded by: 1. Decoding it with unquote() 2. Re-encoding it with quote() 3. Comparing to original - if they match, it was already encoded

Parameters:

affiliation (str) – Institution name to encode (e.g. “Salisbury University” or “Salisbury%20University”)

Returns:

URL-encoded string (e.g. “Salisbury%20University”)

Return type:

str

academic_metrics.runners.pipeline.get_excel_report(db)[source]#

Save all data from database to Excel files.

Parameters:

db (DatabaseWrapper) – The database wrapper to get data from.

academic_metrics.runners.pipeline.main(openai_api_key_env_var_name='OPENAI_API_KEY', mongodb_uri_env_var_name='MONGODB_URI')[source]#
academic_metrics.runners.pipeline.command_line_runner(openai_api_key_env_var_name='OPENAI_API_KEY', mongodb_uri_env_var_name='MONGODB_URI')[source]#

Module contents#