#!/usr/bin/env python  # Shebang: allows script to be run directly using the system's default Python interpreter
# coding: utf-8        # Encoding declaration: ensures the file supports UTF-8 characters (e.g., symbols, international text)

Project Review Scheduler — Annotated Source Documentation

This document provides a structured, module-by-module annotation of the scheduler.py file. It explains each functional section, its purpose, and key implementation details.

Imports and Global Configuration

Main Implementation Module

This module implements the core functionality for the Project Review Scheduler system, including due date calculation, reviewer assignment, notification, and reporting.


import sys
import os

from pathlib import Path

# Add your project directory to sys.path
project_dir = Path.home() / "Documents" / "ProjectReviewScheduler" / "src"
sys.path.append(str(project_dir))

import os
import csv
import smtplib
from datetime import datetime, timedelta
from email.message import EmailMessage
import pandas as pd
import matplotlib.pyplot as plt
from io import StringIO

from faker import Faker
import random
fake = Faker()

from utils import safe_parse_date
from dateutil.relativedelta import relativedelta

# convert a list of CLI arguments into a structured dictionary 
def parse_args(args):
    parsed = {}

    if not args:
        parsed["command"] = "help"
        return parsed

    parsed["command"] = args[0]

    for i in range(1, len(args)):
        if args[i].startswith("--"):
            key = args[i][2:].replace("-", "_")  # allow both --csv-file and --csv_file
            if i + 1 < len(args) and not args[i + 1].startswith("--"):
                parsed[key] = args[i + 1]
            else:
                parsed[key] = True

    return parsed

Data Access Functions — Purpose

These functions allow your script to:

  • Read, write, update, and backup CSV files for:

    • Projects

    • Users (Reviewers)

    • Reviews (assignments)

    • Ensure data persistence and file safety without using a database.

They encapsulate file I/O logic, so other modules (like reviewer assignment or reporting) don’t have to deal with raw file operations directly.

import os
import csv
from datetime import datetime

def read_csv(file_path):
    """
    Read data from a CSV file. If missing, create a new one with default schema.

    Args:
        file_path (str): Full or relative path to the CSV file.

    Returns:
        list: List of rows as dictionaries.
    """
    if not os.path.exists(file_path):
        print(f" {file_path} not found. Creating with headers...")
        create_csv_if_missing(file_path)
        return []

    with open(file_path, mode='r', newline='') as csvfile:
        reader = csv.DictReader(csvfile)
        return list(reader)
        
def write_csv(file_path, data, fieldnames=None):
    """
    Write data to a CSV file. Backs up old file if it exists.

    Args:
        file_path (str): Path to the CSV file
        data (list): List of dictionaries to write
        fieldnames (list): Optional list of field names. Uses keys from first row if omitted.
    """
    if not data:
        print(f" No data to write to {file_path}")
        return

    if fieldnames is None:
        fieldnames = data[0].keys()

    # Backup existing file
    if os.path.exists(file_path):
        backup_file(file_path)

    with open(file_path, mode='w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(data)

def backup_file(file_path):
    """
    Create a backup copy of a file with a timestamp.

    Args:
        file_path (str): Path to the file to backup
    """
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_path = f"{file_path}.{timestamp}.bak"
    with open(file_path, 'r') as src, open(backup_path, 'w') as dst:
        dst.write(src.read())
    print(f" Backed up {file_path} → {backup_path}")

def create_csv_if_missing(file_path, schema=None):
    """
    Create a CSV file with headers based on filename patterns if it doesn't exist.

    Args:
        file_path (str): Path to the file

    Returns:
        bool: True if file was created, False if it already existed
    """
    # Check if the file already exists
    
    if os.path.exists(file_path):
        return False

    # Derive a default header based on file name patterns
    filename = os.path.basename(file_path)
    headers = []

    # Assign appropriate headers based on file naming convention
    if "Projects" in filename:
        headers = ["Project_ID", "Project_Name", "Start_Date", "Last_Review_Date",
                   "Review_Frequency_Years", "Department", "Status", "Next_Review_Date"]
    elif "Users" in filename:
        headers = ["User_ID", "Name", "Email", "Department", "Current_Load"]
    elif "Reviews" in filename:
        headers = ["Review_ID", "Project_ID", "Reviewer_ID", "Scheduled_Date",
                   "Status", "Completion_Date"]
    else:
        headers = ["ID"]  # fallback

    # Write the headers to a new CSV file

    with open(file_path, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(headers)

    print(f" Created new CSV: {file_path} with headers: {headers}")
    return True


Convenience Accessors

These utility functions provide streamlined access to common data queries, such as retrieving all records or filtering by criteria (e.g., status or ID). They wrap the generic read_csv() function and apply custom logic when needed.

def get_all_projects(file_path='Projects.csv'):
     # Return a list of all projects from the Projects CSV file.
    return read_csv(file_path)

def get_all_users(file_path='Users.csv'):
    # Return a list of all users from the Users CSV file.
    return read_csv(file_path)

def get_all_reviews(file_path='Reviews.csv'):
    # Return a list of all reviews from the Reviews CSV file.
    return read_csv(file_path)

def get_projects_by_status(status, file_path='Projects.csv'):
    # Return projects that match a specific status.
    return [p for p in read_csv(file_path) if p.get('Status') == status]

def get_reviews_by_project(project_id, file_path='Reviews.csv'):
    # Return all reviews linked to a specific project ID. 
    return [r for r in read_csv(file_path) if r.get('Project_ID') == project_id]

def get_reviewer(reviewer_id, file_path='Users.csv'):
    """
    Retrieve a specific reviewer by their user ID.
    
    Returns:
        dict: Reviewer record if found, otherwise None.
    """
    for user in read_csv(file_path):
        if user.get('User_ID') == reviewer_id:
            return user
    return None

Data Update Functions

The data update functions in are responsible for modifying existing entries in the CSV files. These are higher-level helpers built on top of read_csv() and write_csv().

Purpose of Data Update Functions

They allow your system to:

  • Locate specific records (by ID)

  • Update fields (e.g., workload, status)

  • Write the updated data back to disk

This supports dynamic changes during operations like reviewer assignment, status updates, or schedule recalculation — without manually editing CSVs.

def update_project(project, file_path='Projects.csv'):
    """
    Update a project entry by matching Project_ID.

    Args:
        project (dict): Updated project data.
    """
    projects = read_csv(file_path)
    for i, p in enumerate(projects):
        if p.get('Project_ID') == project.get('Project_ID'):
            projects[i] = project
            break
    write_csv(file_path, projects)

def update_user(user, file_path='Users.csv'):
    """
    Update a user entry by matching User_ID.

    Args:
        user (dict): Updated user data.
    """
    users = read_csv(file_path)
    for i, u in enumerate(users):
        if u.get('User_ID') == user.get('User_ID'):
            users[i] = user
            break
    write_csv(file_path, users)

def add_review(review, file_path='Reviews.csv'):
    """
    Add a new review record to the Reviews file.

    Args:
        review (dict): Review data to append.
    """
    reviews = read_csv(file_path)
    reviews.append(review)
    write_csv(file_path, reviews)

def update_review(review, file_path='Reviews.csv'):
    """
    Update an existing review by matching Review_ID.

    Args:
        review (dict): Updated review data.
    """
    reviews = read_csv(file_path)
    for i, r in enumerate(reviews):
        if r.get('Review_ID') == review.get('Review_ID'):
            reviews[i] = review
            break
    write_csv(file_path, reviews)

Data Validation Functions

The Data Validation Functions are designed to ensure the accuracy, consistency, and integrity of your CSV data files before they’re used in review calculations, assignments, or reporting.

Purpose of Data Validation Functions

These functions answer the questions:

  • Are the required columns present in the CSV files?

  • Do values match expected data types (e.g., dates, numbers)?

  • Are there missing fields, invalid formats, or broken references?

This helps catch data issues before they cause logic errors or incorrect outputs in later stages of the workflow.

def validate_csv_data(file_path, schema=None):
    """
    Validate the data in a CSV file against a schema.
    
    Args:
        file_path (str): Path to the CSV file
        schema (dict, optional): Dictionary defining the validation rules
        
    Returns:
        dict: Validation result with 'valid' flag and list of 'errors'
    """
    # Create file if it doesn't exist
    if not os.path.exists(file_path):
        create_csv_if_missing(file_path, schema)
        return {'valid': True, 'errors': []}
    
    # Determine schema based on filename if not provided
    if schema is None:
        filename = os.path.basename(file_path)
        if 'Projects' in filename:
            schema = {
                'Project_ID': {'type': 'string', 'required': True},
                'Project_Name': {'type': 'string', 'required': True},
                'Start_Date': {'type': 'date', 'required': True},
                'Last_Review_Date': {'type': 'date', 'required': True},
                'Review_Frequency_Years': {'type': 'decimal', 'required': True},
                'Department': {'type': 'string', 'required': True}
            }
        elif 'Users' in filename:
            schema = {
                'User_ID': {'type': 'string', 'required': True},
                'Name': {'type': 'string', 'required': True},
                'Email': {'type': 'string', 'required': True},
                'Department': {'type': 'string', 'required': True}
            }
        elif 'Reviews' in filename:
            schema = {
                'Review_ID': {'type': 'string', 'required': True},
                'Project_ID': {'type': 'string', 'required': True},
                'Reviewer_ID': {'type': 'string', 'required': True},
                'Scheduled_Date': {'type': 'date', 'required': True},
                'Status': {'type': 'string', 'required': True}
            }
    
    data = read_csv(file_path)
    errors = []
    
    # Check each row against the schema
    for i, row in enumerate(data, start=2):  # Start from 2 to account for header row
        for field, rules in schema.items():
            # Check required fields
            if rules.get('required', False) and (field not in row or not row[field]):
                errors.append({
                    'row': i,
                    'field': field,
                    'message': f"Required field '{field}' is missing or empty in row {i}"
                })
                continue
            
            # Skip validation if field is not present
            if field not in row or not row[field]:
                continue
            
            # Validate data types
            value = row[field]
            if rules.get('type') == 'date':
                try:
                    datetime.strptime(value, '%Y-%m-%d')
                except ValueError:
                    errors.append({
                        'row': i,
                        'field': field,
                        'message': f"Field '{field}' with value '{value}' is not a valid date (YYYY-MM-DD) in row {i}"
                    })
            elif rules.get('type') == 'decimal' or rules.get('type') == 'integer':
                try:
                    float(value)
                    if rules.get('type') == 'integer' and '.' in value:
                        errors.append({
                            'row': i,
                            'field': field,
                            'message': f"Field '{field}' with value '{value}' should be an integer in row {i}"
                        })
                except ValueError:
                    errors.append({
                        'row': i,
                        'field': field,
                        'message': f"Field '{field}' with value '{value}' is not a valid number in row {i}"
                    })
    
    return {
        'valid': len(errors) == 0,
        'errors': errors
    }

def validate_referential_integrity():
    """
    Validate referential integrity between CSV files.
    
    Returns:
        dict: Validation result with 'valid' flag and list of 'errors'
    """
    projects = get_all_projects()
    users = get_all_users()
    reviews = get_all_reviews()
    
    errors = []
    
    # Create sets of IDs for faster lookup
    project_ids = {p.get('Project_ID') for p in projects}
    user_ids = {u.get('User_ID') for u in users}
    
    # Check Reviews reference valid Projects and Users
    for i, review in enumerate(reviews, start=2):  # Start from 2 to account for header row
        project_id = review.get('Project_ID')
        if project_id not in project_ids:
            errors.append({
                'row': i,
                'field': 'Project_ID',
                'message': f"Review references Project_ID: {project_id} which does not exist"
            })
        
        reviewer_id = review.get('Reviewer_ID')
        if reviewer_id not in user_ids:
            errors.append({
                'row': i,
                'field': 'Reviewer_ID',
                'message': f"Review references Reviewer_ID: {reviewer_id} which does not exist"
            })
    
    return {
        'valid': len(errors) == 0,
        'errors': errors
    }

Due Date Calculator

The Due Date Calculator module is responsible for automatically determining when each project is next due for review, and assigning a status like “Overdue”, “Due Soon”, or “Up to Date”.

Purpose of the Due Date Calculator

It helps answer two core questions:

  • When should each project be reviewed again?

  • Which projects need immediate attention?

This allows your system to:

  • Proactively flag overdue work

  • Prioritize assignments

  • Drive automated reviewer scheduling and notifications

def calculate_due_date(project_data, current_date=None):
    """
    Calculate the next review date based on the project's last review date and review frequency.
    
    Args:
        project_data (dict): Dictionary containing project information
        current_date (datetime or str, optional): Current date for comparison. Defaults to today.
    
    Returns:
        dict: Updated project data with Next_Review_Date and Status fields
    """
    if current_date is None:
        current_date = datetime.now()
    elif isinstance(current_date, str):
        current_date = datetime.strptime(current_date, '%Y-%m-%d')
    
    # Parse dates
    last_review = datetime.strptime(project_data['Last_Review_Date'], '%Y-%m-%d')
    frequency_years = float(project_data['Review_Frequency_Years'])
    
    # Calculate months and days
    frequency_days = int(frequency_years * 365)
    
    # Calculate next review date
    next_review = last_review + relativedelta(months=int(frequency_years * 12))
    
    # Set status based on next review date
    days_until_review = (next_review - current_date).days
    
    if days_until_review < 0:
        status = 'Overdue'
    elif days_until_review <= 30:
        status = 'Due Soon'
    else:
        status = 'Up to Date'
    
    # Update project data
    result = project_data.copy()
    result['Next_Review_Date'] = next_review.strftime('%Y-%m-%d')
    result['Status'] = status
    
    return result

def calculate_all_reviews(projects_file='Projects.csv', current_date=None):
    """
    Calculate review dates for all projects and update the Projects CSV file.
    
    Args:
        projects_file (str): Path to the Projects CSV file
        current_date (datetime or str, optional): Current date for comparison. Defaults to today.
        
    Returns:
        dict: Summary of calculation results
    """
    if current_date is None:
        current_date = datetime.now()
    elif isinstance(current_date, str):
        current_date = datetime.strptime(current_date, '%Y-%m-%d')
    
    # Read projects data
    projects = read_csv(projects_file)
    
    # Calculate due dates and update status
    for project in projects:
        updated_project = calculate_due_date(project, current_date)
        project.update(updated_project)
    
    # Write updated data back to CSV
    write_csv(projects_file, projects)
    
    # Generate summary
    status_counts = {
        'overdue': len([p for p in projects if p['Status'] == 'Overdue']),
        'due_soon': len([p for p in projects if p['Status'] == 'Due Soon']),
        'up_to_date': len([p for p in projects if p['Status'] == 'Up to Date']),
        'total_projects': len(projects)
    }
    
    return status_counts

Reviewer Assignment

The Reviewer Assignment module is responsible for intelligently assigning reviewers to projects that are due for review — with the goal of balancing workloads, respecting department alignment, and preventing review overload.

Purpose of the Reviewer Assignment Module

It answers the question:

  • “Which reviewer should be assigned to each project that needs a review — and how do we do this fairly?”
# Enhanced reviewer assignment function
# -------------------------------------
# This function selects the most appropriate reviewer for a project by considering
# both departmental alignment and workload balancing. It first attempts to assign
# a reviewer from the same department whose current workload is below the department
# average. If none are available, it defaults to the reviewer with the least overall load.
# The function also updates the reviewer's load and records the assignment in Reviews.csv.


def assign_reviewer(project, reviewers=None, users_file='Users.csv', reviews_file='Reviews.csv', verbose=False):
    """
    Enhanced version with detailed logging for debugging.
    
    Args:
        project (dict): Project dictionary
        reviewers (list, optional): List of reviewers
        users_file (str): Path to users file
        reviews_file (str): Path to reviews file
        verbose (bool): Whether to print debug information
    
    Returns:
        dict: Review assignment result or None
    """
    if verbose:
        print(f" Assigning reviewer for project {project.get('Project_ID')} ({project.get('Department')})")
    
    # Get reviewers if not provided
    if reviewers is None:
        reviewers = get_all_users(users_file)
        if verbose:
            print(f"   Loaded {len(reviewers)} reviewers from {users_file}")
    
    if not reviewers:
        if verbose:
            print(f"   No reviewers available")
        return None
    
    # Process reviewer workloads
    working_reviewers = []
    for reviewer in reviewers:
        reviewer_copy = reviewer.copy()
        try:
            reviewer_copy['Current_Load'] = int(reviewer_copy.get('Current_Load', 0))
        except ValueError:
            reviewer_copy['Current_Load'] = 0
        working_reviewers.append(reviewer_copy)
    
    if verbose:
        print(f"   Reviewer workloads:")
        for r in working_reviewers:
            print(f"      {r['Name']} ({r['Department']}): {r['Current_Load']} reviews")
    
    # Sort by workload
    reviewers_sorted = sorted(working_reviewers, key=lambda r: r['Current_Load'])
    
    # Department-based assignment logic
    project_dept = project.get('Department', '')
    other_dept_reviewers = [r for r in reviewers_sorted if r.get('Department') != project_dept]
    
    if other_dept_reviewers:
        assigned_reviewer = other_dept_reviewers[0]
        assignment_type = "cross-department"
    else:
        if not reviewers_sorted:
            if verbose:
                print(f"   No reviewers available after sorting")
            return None
        assigned_reviewer = reviewers_sorted[0]
        assignment_type = "same-department"
    
    if verbose:
        print(f"   Selected {assigned_reviewer['Name']} ({assigned_reviewer['Department']}) - {assignment_type}")
        print(f"   Updating workload from {assigned_reviewer['Current_Load']} to {assigned_reviewer['Current_Load'] + 1}")
    
    # Update workload
    assigned_reviewer['Current_Load'] += 1
    
    try:
        update_user(assigned_reviewer, users_file)
        if verbose:
            print(f"   Updated user file: {users_file}")
    except Exception as e:
        if verbose:
            print(f"   Failed to update user: {e}")
        return None
    
    # Create review
    from datetime import datetime, timedelta
    review = {
        'Review_ID': f"R{datetime.now().strftime('%Y%m%d%H%M%S')}",
        'Project_ID': project['Project_ID'],
        'Reviewer_ID': assigned_reviewer['User_ID'],
        'Scheduled_Date': (datetime.now() + timedelta(days=30)).strftime('%Y-%m-%d'),
        'Status': 'Scheduled',
        'Completion_Date': ''
    }
    
    try:
        add_review(review, reviews_file)
        if verbose:
            print(f"   Added review to: {reviews_file}")
            print(f"   Assignment complete: {review['Review_ID']}")
    except Exception as e:
        if verbose:
            print(f"   Failed to add review: {e}")
        return None
    
    return review


def assign_reviewer_balanced(project, reviewers=None, users_file='Users.csv', reviews_file='Reviews.csv', verbose=False):
    """
    Assign a reviewer with cross-department balancing to prevent overload.

    Strategy:
    - Prefer reviewers from the project department under the department average
    - Else, select least-loaded reviewer from any department
    """
    if reviewers is None:
        reviewers = get_all_users(users_file)

    if not reviewers:
        return None  # No reviewers available

    # Convert workload to integer
    for reviewer in reviewers:
        try:
            reviewer["Current_Load"] = int(reviewer.get("Current_Load", 0))
        except ValueError:
            reviewer["Current_Load"] = 0

    # Calculate department averages
    from collections import defaultdict
    dept_loads = defaultdict(list)
    for reviewer in reviewers:
        dept = reviewer["Department"]
        dept_loads[dept].append(reviewer["Current_Load"])

    dept_avg_load = {
        dept: sum(loads) / len(loads) for dept, loads in dept_loads.items()
    }

    project_dept = project.get("Department")
    avg_load = dept_avg_load.get(project_dept, 0)

    # Filter reviewers from the same department with load below department average
    eligible_same_dept = [
        r for r in reviewers
        if r["Department"] == project_dept and r["Current_Load"] <= avg_load
    ]

    # Sort by load and pick lowest-loaded eligible reviewer
    if eligible_same_dept:
        eligible_same_dept.sort(key=lambda r: r["Current_Load"])
        assigned_reviewer = eligible_same_dept[0]
        assignment_reason = "same department under average"
    else:
        # Fallback: pick lowest loaded reviewer overall
        reviewers.sort(key=lambda r: r["Current_Load"])
        assigned_reviewer = reviewers[0]
        assignment_reason = "fallback to lowest overall"

    # Update reviewer workload
    assigned_reviewer["Current_Load"] += 1
    update_user(assigned_reviewer, users_file)

    # Create and save review record
    from datetime import datetime, timedelta
    review = {
        "Review_ID": f"R{datetime.now().strftime('%Y%m%d%H%M%S')}",
        "Project_ID": project["Project_ID"],
        "Reviewer_ID": assigned_reviewer["User_ID"],
        "Scheduled_Date": (datetime.now() + timedelta(days=30)).strftime('%Y-%m-%d'),
        "Status": "Scheduled",
        "Completion_Date": ""
    }
    add_review(review, reviews_file)

    if verbose:
        print(f" Assigned {assigned_reviewer['Name']} ({assigned_reviewer['Department']}) using {assignment_reason}")

    return review

def assign_all_reviewers(projects_file='Projects.csv', users_file='Users.csv', reviews_file='Reviews.csv'):
    """
    Assign reviewers to all projects needing review (Overdue or Due Soon).
    
    Args:
        projects_file (str): Path to the Projects CSV file
        users_file (str): Path to the Users CSV file  
        reviews_file (str): Path to the Reviews CSV file
        
    Returns:
        dict: Summary of assignment results including counts and assignment details
    """
    # Read data from specified files
    projects = read_csv(projects_file)
    users = read_csv(users_file)
    existing_reviews = read_csv(reviews_file)
    
    # Filter projects that need review (Overdue or Due Soon status)
    projects_needing_review = [p for p in projects 
                               if p.get('Status') in ['Overdue', 'Due Soon']]
    
    # Exclude projects that already have active reviews to avoid double-assignment
    active_project_ids = {r['Project_ID'] for r in existing_reviews 
                          if r.get('Status') in ['Scheduled', 'In Progress']}
    
    projects_to_assign = [p for p in projects_needing_review 
                          if p['Project_ID'] not in active_project_ids]
    
    # Track assignment results
    new_assignments = []
    failed_assignments = []
    
    # Assign reviewers to each project needing assignment
    for project in projects_to_assign:
        try:
            review = assign_reviewer_balanced(project, users, users_file, reviews_file, verbose=True)
            if review:  # Assignment successful
                new_assignments.append(review)
            else:  # Assignment failed (no available reviewers)
                failed_assignments.append({
                    'project_id': project['Project_ID'],
                    'reason': 'No available reviewers'
                })
        except Exception as e:
            # Handle any errors during assignment
            failed_assignments.append({
                'project_id': project['Project_ID'],
                'reason': f'Assignment error: {str(e)}'
            })
    
    # Return comprehensive assignment summary
    return {
        'total_assigned': len(new_assignments),
        'total_needing_review': len(projects_needing_review),
        'already_assigned': len(projects_needing_review) - len(projects_to_assign),
        'failed_assignments': len(failed_assignments),
        'assignments': new_assignments,
        'failures': failed_assignments
    }

# Helper function to ensure proper file path handling in add_review
def add_review(review, file_path='Reviews.csv'):
    """
    Add a new review to the reviews CSV file.
    
    Args:
        review (dict): Review dictionary to add
        file_path (str): Path to the reviews CSV file
    """
    reviews = read_csv(file_path)
    reviews.append(review)
    write_csv(file_path, reviews)

# Helper function to ensure proper file path handling in update_user  
def update_user(user, file_path='Users.csv'):
    """
    Update an existing user in the users CSV file.
    
    Args:
        user (dict): User dictionary with updated information
        file_path (str): Path to the users CSV file
    """
    users = read_csv(file_path)
    for i, u in enumerate(users):
        if u.get('User_ID') == user.get('User_ID'):
            users[i] = user
            break
    write_csv(file_path, users)

Notification System

Purpose of the Notification System The goal is to ensure reviewers are informed about their tasks without manual follow-up. It helps answer:

  • “Which reviewers need to know about overdue or upcoming reviews — and how can we alert them efficiently?”
def send_notifications(status_filter=None, smtp_server='localhost', smtp_port=587, 
                      smtp_user=None, smtp_password=None, sender_email='scheduler@example.com'):
    """
    Send email notifications to reviewers for their assigned reviews.
    
    Args:
        status_filter (str, optional): Filter projects by status ('Overdue', 'Due Soon')
        smtp_server (str): SMTP server address
        smtp_port (int): SMTP server port
        smtp_user (str, optional): SMTP username
        smtp_password (str, optional): SMTP password
        sender_email (str): Sender email address
        
    Returns:
        dict: Summary of notification results
    """
    # Read necessary data
    projects = get_all_projects()
    
    # Filter projects if needed
    if status_filter:
        projects = [p for p in projects if p.get('Status') == status_filter]
    
    sent_count = 0
    failed_count = 0
    notification_log = []
    
    # Process each project
    for project in projects:
        # Get associated reviews
        reviews = get_reviews_by_project(project['Project_ID'])
        
        # Filter reviews that are scheduled or in progress
        active_reviews = [r for r in reviews if r['Status'] in ['Scheduled', 'In Progress']]
        
        for review in active_reviews:
            # Get reviewer information
            reviewer = get_reviewer(review['Reviewer_ID'])
            
            if not reviewer or not reviewer.get('Email'):
                notification_log.append({
                    'project_id': project['Project_ID'],
                    'review_id': review['Review_ID'],
                    'status': 'Failed',
                    'reason': 'Missing reviewer or email'
                })
                failed_count += 1
                continue
            
            try:
                # Prepare email content
                msg = EmailMessage()
                
                # Set subject with urgency prefix if overdue
                subject = project['Project_Name'] + ' - Review Required'
                if project.get('Status') == 'Overdue':
                    subject = '[URGENT] ' + subject
                
                msg['Subject'] = subject
                msg['From'] = sender_email
                msg['To'] = reviewer['Email']
                
                # Create email body
                body = f"""
                Dear {reviewer['Name']},
                
                You have been assigned to review the following project:
                
                Project: {project['Project_Name']}
                Project ID: {project['Project_ID']}
                Scheduled Date: {review['Scheduled_Date']}
                Status: {project.get('Status', '')}
                
                """
                
                if project.get('Status') == 'Overdue':
                    body += "This review is OVERDUE and requires immediate attention.\n"
                elif project.get('Status') == 'Due Soon':
                    body += "This review is due soon. Please prioritize accordingly.\n"
                
                body += """
                Please complete your review by the scheduled date.
                
                Thank you,
                Project Review Scheduler
                """
                
                msg.set_content(body)
                
                # In a production environment, we would send the email here
                # For testing/development, we'll simulate this
                if smtp_server == 'localhost':
                    # Simulate email sending for testing
                    print(f"Email would be sent to {reviewer['Email']}")
                else:
                    # Actually send the email
                    with smtplib.SMTP(smtp_server, smtp_port) as server:
                        server.starttls()
                        if smtp_user and smtp_password:
                            server.login(smtp_user, smtp_password)
                        server.send_message(msg)
                
                notification_log.append({
                    'project_id': project['Project_ID'],
                    'review_id': review['Review_ID'],
                    'reviewer_email': reviewer['Email'],
                    'status': 'Sent',
                    'subject': subject
                })
                sent_count += 1
                
            except Exception as e:
                notification_log.append({
                    'project_id': project['Project_ID'],
                    'review_id': review['Review_ID'],
                    'status': 'Failed',
                    'reason': str(e)
                })
                failed_count += 1
    
    return {
        'sent': sent_count,
        'failed': failed_count,
        'total': sent_count + failed_count,
        'log': notification_log
    }

Reporting Module

The Reporting Module is responsible for generating structured outputs (CSV and visualizations) that summarize the current state of project reviews and reviewer workloads. These reports provide insight, accountability, and support decision-making.

Purpose of the Reporting Module

It answers practical questions like:

  • “What’s the review schedule for this month?”

  • “Which reviewers are overloaded?”

  • “Which projects are overdue?”

These reports can be shared with stakeholders or used for operational planning.

def generate_monthly_schedule(month, year, output_file=None):
    """
    Generate a monthly schedule of upcoming reviews.
    
    Args:
        month (str): Month to generate schedule for (format: MM)
        year (str): Year to generate schedule for (format: YYYY)
        output_file (str, optional): Path to output file. If None, uses default naming.
        
    Returns:
        dict: Report generation results
    """
    # Read data
    reviews = get_all_reviews()
    
    # Build full review data with project and reviewer info
    enriched_reviews = []
    for review in reviews:
        # Only include scheduled and in-progress reviews
        if review['Status'] not in ['Scheduled', 'In Progress']:
            continue
        
        # Check if the review is in the specified month/year
        review_date = datetime.strptime(review['Scheduled_Date'], '%Y-%m-%d')
        if review_date.strftime('%m') != month or review_date.strftime('%Y') != year:
            continue
        
        # Get project and reviewer info
        project = next((p for p in get_all_projects() if p['Project_ID'] == review['Project_ID']), {})
        reviewer = get_reviewer(review['Reviewer_ID'])
        
        # Create enriched review record
        enriched_review = {
            'Review_ID': review['Review_ID'],
            'Project_ID': review['Project_ID'],
            'Project_Name': project.get('Project_Name', 'Unknown'),
            'Reviewer_ID': review['Reviewer_ID'],
            'Reviewer_Name': reviewer.get('Name', 'Unknown') if reviewer else 'Unknown',
            'Scheduled_Date': review['Scheduled_Date'],
            'Status': review['Status']
        }
        
        enriched_reviews.append(enriched_review)
    
    # Sort by scheduled date
    enriched_reviews.sort(key=lambda r: r['Scheduled_Date'])
    
    # Define output file name if not specified
    if output_file is None:
        output_file = f"monthly_schedule_{month}-{year}.csv"
    
    # Define fields for the report
    fieldnames = [
        'Project_ID', 'Project_Name', 'Reviewer_Name', 'Scheduled_Date', 'Status'
    ]
    
    # Write report to CSV
    if enriched_reviews:
        write_csv(output_file, enriched_reviews, fieldnames)
    
    return {
        'file': output_file,
        'month': month,
        'year': year,
        'review_count': len(enriched_reviews)
    }

def generate_workload_report(output_file=None):
    """
    Generate a report showing workload distribution among reviewers.
    
    Args:
        output_file (str, optional): Path to output file. If None, uses default naming.
        
    Returns:
        dict: Report generation results
    """
    # Read data
    users = get_all_users()
    reviews = get_all_reviews()
    
    # Count active reviews per reviewer
    workloads = {}
    for user in users:
        user_id = user['User_ID']
        active_reviews = [r for r in reviews 
                          if r['Reviewer_ID'] == user_id and 
                          r['Status'] in ['Scheduled', 'In Progress']]
        workloads[user_id] = {
            'User_ID': user_id,
            'Name': user['Name'],
            'Department': user.get('Department', 'Unknown'),
            'Current_Load': len(active_reviews),
            'Active_Reviews': [r['Review_ID'] for r in active_reviews]
        }
    
    # Sort by current load (descending)
    sorted_workloads = sorted(workloads.values(), key=lambda w: w['Current_Load'], reverse=True)
    
    # Define output file name if not specified
    if output_file is None:
        output_file = f"workload_report_{datetime.now().strftime('%Y%m%d')}.csv"
    
    # Define fields for the report
    fieldnames = ['User_ID', 'Name', 'Department', 'Current_Load', "Current_Load", "Active_Reviews"]
    
    # Write report to CSV
    write_csv(output_file, sorted_workloads, fieldnames)
    
    # Generate visualization
    plt.figure(figsize=(10, 6))
    plt.bar([w['Name'] for w in sorted_workloads], [w['Current_Load'] for w in sorted_workloads])
    plt.xlabel('Reviewer')
    plt.ylabel('Active Reviews')
    plt.title('Reviewer Workload Distribution')
    plt.xticks(rotation=45)
    plt.tight_layout()
    
    # Save chart
    chart_file = output_file.replace('.csv', '.png')
    plt.savefig(chart_file)
    
    return {
        'file': output_file,
        'chart': chart_file,
        'reviewer_count': len(sorted_workloads),
        'total_reviews': sum(w['Current_Load'] for w in sorted_workloads)
    }

def generate_overdue_alerts(output_file=None):
    """
    Generate alerts for overdue reviews.
    
    Args:
        output_file (str, optional): Path to output file. If None, uses default naming.
        
    Returns:
        dict: Report generation results
    """
    # Read data
    projects = get_all_projects()
    
    # Filter overdue projects
    overdue_projects = [p for p in projects if p.get('Status') == 'Overdue']
    
    # Sort by next review date (oldest first)
    overdue_projects.sort(key=lambda p: p.get('Next_Review_Date', ''))
    
    # Enrich with reviewer information
    enriched_overdue = []
    for project in overdue_projects:
        reviews = get_reviews_by_project(project['Project_ID'])
        active_reviews = [r for r in reviews if r['Status'] in ['Scheduled', 'In Progress']]
        
        if active_reviews:
            review = active_reviews[0]  # Take the first active review
            reviewer = get_reviewer(review['Reviewer_ID'])
            reviewer_name = reviewer.get('Name', 'Unknown') if reviewer else 'Unknown'
            next_review = datetime.strptime(project['Next_Review_Date'], '%Y-%m-%d')
            days_overdue = (datetime.now() - next_review).days

        else:
            reviewer_name = 'No reviewer assigned'
            days_overdue = 0
        
        enriched_project = {
            'Project_ID': project['Project_ID'],
            'Project_Name': project['Project_Name'],
            'Department': project.get('Department', 'Unknown'),
            'Next_Review_Date': project.get('Next_Review_Date', ''),
            'Days_Overdue': days_overdue,
            'Reviewer_Name': reviewer_name
        }
        
        enriched_overdue.append(enriched_project)
           # Define output file name if not specified
    if output_file is None:
        output_file = f"overdue_alerts_{datetime.now().strftime('%Y%m%d')}.csv"
    
    # Define fields for the report
    fieldnames = ['Project_ID', 'Project_Name', 'Department', 'Next_Review_Date', 'Days_Overdue', 'Reviewer_Name']
    
    # Write report to CSV
    if enriched_overdue:
        write_csv(output_file, enriched_overdue, fieldnames)
    
    return {
        'file': output_file,
        'overdue_count': len(enriched_overdue),
        'projects': [p['Project_ID'] for p in enriched_overdue]
    }

def generate_all_reports():
    """
    Generate all standard reports.
    
    Returns:
        dict: Summary of all report generation results
    """
    # Current date for filenames and monthly report
    current_date = datetime.now()
    current_month = current_date.strftime('%m')
    current_year = current_date.strftime('%Y')
    
    # Generate reports
    monthly_result = generate_monthly_schedule(current_month, current_year)
    workload_result = generate_workload_report()
    overdue_result = generate_overdue_alerts()

    # Generate visual graphs
    generate_graphs()
    
    return {
        'monthly_schedule': monthly_result,
        'workload_report': workload_result,
        'overdue_alerts': overdue_result,
        'timestamp': current_date.strftime('%Y-%m-%d %H:%M:%S')
    }

def generate_graphs(projects_file='Projects.csv', users_file='Users.csv'):
    """
    Generate and save visual graphs from project and reviewer data.
    Creates:
    - reviewer_workload.png
    - review_frequency.png
    """
    import pandas as pd
    import matplotlib.pyplot as plt
    import seaborn as sns
    import os

    # Reviewer Workload by Department
    try:
        df_users = pd.read_csv(users_file)
        plt.figure(figsize=(12, 6))

        for dept in df_users["Department"].unique():
            dept_df = df_users[df_users["Department"] == dept]
            plt.bar(dept_df["Name"], dept_df["Current_Load"], label=dept)

        plt.xticks(rotation=45, ha='right')
        plt.xlabel("Reviewer")
        plt.ylabel("Current Load")
        plt.title("Reviewer Workload by Department")
        plt.legend(title="Department")
        plt.tight_layout()
        plt.grid(True)
        workload_path = "reviewer_workload.png"
        plt.savefig(workload_path)
        plt.close()
        print(f" Saved: {workload_path}")

    except Exception as e:
        print(f" Error generating reviewer workload graph: {e}")

    # Project Review Frequency by Department
    try:
        df_projects = pd.read_csv(projects_file)
        plt.figure(figsize=(10, 6))

        sns.countplot(data=df_projects, x="Review_Frequency_Years", hue="Department")

        plt.xlabel("Review Frequency (Years)")
        plt.ylabel("Number of Projects")
        plt.title("Project Review Frequency by Department")
        plt.legend(title="Department", bbox_to_anchor=(1.05, 1), loc='upper left')
        plt.tight_layout()
        frequency_path = "review_frequency.png"
        plt.savefig(frequency_path)
        plt.close()
        print(f" Saved: {frequency_path}")

    except Exception as e:
        print(f" Error generating review frequency graph: {e}")

Command Line Interface

The Command Line Interface (CLI) module enables users to run key operations — like generating reports, assigning reviewers, or sending notifications directly from the terminal, without modifying or executing Python code manually.

Purpose of the CLI Module It allows non-programmers, automation scripts, or scheduled tasks (e.g., cron jobs) to interact with the scheduler easily and consistently by running commands like:

python scheduler.py generate_reports --type all

This enables automation, batch processing, and reproducibility in a file-based environment.

# Explicit command-line argument parser
# -------------------------------------
# Parses CLI arguments based on the specific command provided.
# Allows custom options for each supported command and provides safer, structured input handling.

def parse_args(args):
    """
    Parse command-line arguments based on the provided command.

    Supported commands:
      - calculate_reviews: accepts --csv
      - assign_reviewers: accepts --projects, --users, --reviews
      - send_notifications: accepts --status, --smtp-server, --smtp-port
      - generate_reports: accepts --type, --month, --year, --output
      - generate_graphs: triggers graph generation immediately

    Args:
        args (list): Command-line arguments passed to the script.

    Returns:
        dict: Parsed arguments in the form {'command': ..., 'arg_name': ...}
    """


def parse_args(args):
    """
    Parse command line arguments.
    
    Args:
        args (list): List of command line arguments
        
    Returns:
        dict: Parsed arguments
    """
    if not args or len(args) < 1:
        return {'command': 'help'}
    
    command = args[0]
    parsed = {'command': command}
    
    # Parse command-specific arguments
    if command == 'calculate_reviews':
        for i in range(1, len(args)):
            if args[i] == '--csv' and i + 1 < len(args):
                parsed['csv_file'] = args[i + 1]
    
    elif command == 'assign_reviewers':
        for i in range(1, len(args)):
            if args[i] == '--projects' and i + 1 < len(args):
                parsed['projects_file'] = args[i + 1]
            elif args[i] == '--users' and i + 1 < len(args):
                parsed['users_file'] = args[i + 1]
            elif args[i] == '--reviews' and i + 1 < len(args):
                parsed['reviews_file'] = args[i + 1]
    
    elif command == 'send_notifications':
        for i in range(1, len(args)):
            if args[i] == '--status' and i + 1 < len(args):
                parsed['status'] = args[i + 1]
            elif args[i] == '--smtp-server' and i + 1 < len(args):
                parsed['smtp_server'] = args[i + 1]
            elif args[i] == '--smtp-port' and i + 1 < len(args):
                parsed['smtp_port'] = int(args[i + 1])
    
    elif command == 'generate_reports':
        for i in range(1, len(args)):
            if args[i] == '--type' and i + 1 < len(args):
                parsed['report_type'] = args[i + 1]
            elif args[i] == '--month' and i + 1 < len(args):
                parsed['month'] = args[i + 1]
            elif args[i] == '--year' and i + 1 < len(args):
                parsed['year'] = args[i + 1]
            elif args[i] == '--output' and i + 1 < len(args):
                parsed['output_file'] = args[i + 1]

    elif command == 'generate_graphs':
            generate_graphs()
            print("Graphs generated successfully.")
            return {'success': True, 'command': 'generate_graphs'}
    
    return parsed



def execute_command(args):
    """
    Execute a command based on parsed arguments.

    Args:
        args (list): List of command line arguments

    Returns:
        dict: Command execution results
    """
    parsed_args = parse_args(args)
    print(f"DEBUG: parsed_args = {parsed_args}")
    command = parsed_args.get('command')
    print("DEBUG: command =", command)  

    if command == 'help':
        print("Project Review Scheduler - Command Line Interface\n")
        print("Available commands:")
        print("  calculate_reviews [--csv PROJECTS_FILE]")
        print("      Calculate review due dates for all projects")
        print("  assign_reviewers [--projects PROJECTS_FILE] [--users USERS_FILE] [--reviews REVIEWS_FILE]")
        print("      Assign reviewers to projects needing review")
        print("  send_notifications [--status STATUS] [--smtp-server SERVER] [--smtp-port PORT]")
        print("      Send email notifications to reviewers")
        print("  generate_reports [--type REPORT_TYPE] [--month MONTH] [--year YEAR] [--output OUTPUT_FILE]")
        print("      Generate reports. REPORT_TYPE can be 'monthly', 'workload', 'overdue', or 'all'")
        return {'success': True, 'command': 'help'}

    elif command == 'calculate_reviews':
        csv_file = parsed_args.get('csv_file', 'Projects.csv')

        result = calculate_all_reviews(csv_file)

        print(f"Review calculations completed successfully")
        print(f"Total projects: {result['total_projects']}")
        print(f"Overdue: {result['overdue']}")
        print(f"Due Soon: {result['due_soon']}")
        print(f"Up to Date: {result['up_to_date']}")

        return {'success': True, 'command': 'calculate_reviews', 'result': result}

    elif command == 'assign_reviewers':
        projects_file = parsed_args.get('projects_file', 'Projects.csv')
        users_file = parsed_args.get('users_file', 'Users.csv')
        reviews_file = parsed_args.get('reviews_file', 'Reviews.csv')

        result = assign_all_reviewers(projects_file, users_file, reviews_file)

        print(f"Reviewer assignments completed successfully")
        print(f"Projects needing review: {result['total_needing_review']}")
        print(f"Projects already assigned: {result['already_assigned']}")
        print(f"New assignments created: {result['total_assigned']}")

        return {'success': True, 'command': 'assign_reviewers', 'result': result}

    elif command == 'send_notifications':
        status = parsed_args.get('status')
        smtp_server = parsed_args.get('smtp_server', 'localhost')
        smtp_port = parsed_args.get('smtp_port', 587)

        result = send_notifications(status, smtp_server, smtp_port)

        print(f"Notification process completed")
        print(f"Total notifications: {result['total']}")
        print(f"Successfully sent: {result['sent']}")
        print(f"Failed: {result['failed']}")

        return {'success': True, 'command': 'send_notifications', 'result': result}

    elif command == 'generate_reports':
        report_type = parsed_args.get('report_type', 'all')

        if report_type == 'all':
            result = generate_all_reports()

            print(f"All reports generated successfully")
            print(f"Monthly schedule: {result['monthly_schedule']['file']} ({result['monthly_schedule']['review_count']} reviews)")
            print(f"Workload report: {result['workload_report']['file']} ({result['workload_report']['reviewer_count']} reviewers)")
            print(f"Overdue alerts: {result['overdue_alerts']['file']} ({result['overdue_alerts']['overdue_count']} overdue projects)")

            return {'success': True, 'command': 'generate_reports', 'result': result}

        elif report_type == 'monthly':
            month = parsed_args.get('month', datetime.now().strftime('%m'))
            year = parsed_args.get('year', datetime.now().strftime('%Y'))
            output_file = parsed_args.get('output_file')

            result = generate_monthly_schedule(month, year, output_file)

            print(f"Monthly schedule generated successfully")
            print(f"File: {result['file']}")
            print(f"Reviews: {result['review_count']}")

            return {'success': True, 'command': 'generate_reports', 'report_type': 'monthly', 'result': result}

        elif report_type == 'workload':
            output_file = parsed_args.get('output_file')

            result = generate_workload_report(output_file)

            print(f"Workload report generated successfully")
            print(f"File: {result['file']}")
            print(f"Chart: {result['chart']}")
            print(f"Reviewers: {result['reviewer_count']}")
            print(f"Total active reviews: {result['total_reviews']}")

            return {'success': True, 'command': 'generate_reports', 'report_type': 'workload', 'result': result}

        elif report_type == 'overdue':
            output_file = parsed_args.get('output_file')

            result = generate_overdue_alerts(output_file)

            print(f"Overdue alerts generated successfully")
            print(f"File: {result['file']}")
            print(f"Overdue projects: {result['overdue_count']}")

            return {'success': True, 'command': 'generate_reports', 'report_type': 'overdue', 'result': result}



        else:
            print(f"Error: Unknown report type '{report_type}'")
            return {'success': False, 'command': 'generate_reports', 'error': f"Unknown report type '{report_type}'"}

    else:
        print(f"Error: Unknown command '{command}'")
        return {'success': False, 'command': 'unknown', 'error': f"Unknown command '{command}'"}




Main Entry Point

This block defines the entry point of the script when it’s run from the command line.

def main():
    """
    Main entry point for the command line interface.
    Filters out Jupyter and IDE internal arguments.
    """
    import sys

    print("RAW sys.argv =", sys.argv)  

    # Filter out known unwanted args like '-f' or kernel specs
    filtered_args = [
        arg for arg in sys.argv[1:]
        if not arg.endswith(".json") and not arg.startswith("-f") and not arg.startswith('--Application')
    ]

    print("Project Review Scheduler module loaded successfully")
    execute_command(filtered_args)


if __name__ == '__main__':
    
    main()
RAW sys.argv = ['/Users/cynthiamcginnis/anaconda3/lib/python3.11/site-packages/ipykernel_launcher.py', '-f', '/Users/cynthiamcginnis/Library/Jupyter/runtime/kernel-560678cb-0955-404c-bbdd-9e109e5e1fc2.json']
Project Review Scheduler module loaded successfully
DEBUG: parsed_args = {'command': 'help'}
DEBUG: command = help
Project Review Scheduler - Command Line Interface

Available commands:
  calculate_reviews [--csv PROJECTS_FILE]
      Calculate review due dates for all projects
  assign_reviewers [--projects PROJECTS_FILE] [--users USERS_FILE] [--reviews REVIEWS_FILE]
      Assign reviewers to projects needing review
  send_notifications [--status STATUS] [--smtp-server SERVER] [--smtp-port PORT]
      Send email notifications to reviewers
  generate_reports [--type REPORT_TYPE] [--month MONTH] [--year YEAR] [--output OUTPUT_FILE]
      Generate reports. REPORT_TYPE can be 'monthly', 'workload', 'overdue', or 'all'

Function Demonstration: End-to-End Verification

This section confirms that the key functions in the scheduler script behave as intended when executed in sequence. It utilizes sample project data to simulate realistic system usage and visually verify outputs.

Purpose of validate_referential_integrity()

This function checks whether the records across your CSV files (Projects.csv, Users.csv, and Reviews.csv) are consistent and valid. Specifically, it performs cross-file validation to ensure:

Every Reviewer_ID in Reviews.csv exists in Users.csv.

Every Project_ID in Reviews.csv exists in Projects.csv.

# Step 1: Validate input data


validate_csv_data("/Users/cynthiamcginnis/Documents/ProjectReviewScheduler/Projects.csv", schema=None)


{'valid': True, 'errors': []}

Interpretation:

All reviewers and projects referenced in Reviews.csv are valid.

No mismatched or missing IDs were found.

You are safe to proceed with due date calculations, reviewer assignment, and reporting.

validate_csv_data("/Users/cynthiamcginnis/Documents/ProjectReviewScheduler/Users.csv", schema=None)
{'valid': True, 'errors': []}
validate_csv_data("/Users/cynthiamcginnis/Documents/ProjectReviewScheduler/Reviews.csv", schema=None)
{'valid': True, 'errors': []}

Calculating Review Due Dates with calculate_due_date()

This code snippet reads in the Projects.csv file using pandas and calculates the next review due date for each project.


import pandas as pd

# Load the projects file
df_projects = pd.read_csv("/Users/cynthiamcginnis/Documents/ProjectReviewScheduler/Projects.csv")

# Apply the function to each row
df_projects["Due_Date"] = df_projects.apply(lambda row: calculate_due_date(row.to_dict()), axis=1)

# Display the updated DataFrame
df_projects.head()



Project_ID Project_Name Start_Date Last_Review_Date Review_Frequency_Years Department Status Next_Review_Date Due_Date
0 P001 Generate Enterprise E-Markets 2023-09-16 2024-04-22 2 IT Up to Date 2026-04-22 {‘Project_ID’: ‘P001’, ‘Project_Name’: ’Genera…
1 P002 Syndicate Interactive Users 2024-02-16 2024-07-26 2 HR Up to Date 2026-07-26 {‘Project_ID’: ‘P002’, ‘Project_Name’: ’Syndic…
2 P003 Mesh Efficient E-Markets 2023-11-03 2024-04-20 2 Finance Up to Date 2026-04-20 {‘Project_ID’: ‘P003’, ‘Project_Name’: ’Mesh E…
3 P004 Synthesize Collaborative E-Business 2023-05-29 2024-12-23 2 HR Up to Date 2026-12-23 {‘Project_ID’: ‘P004’, ‘Project_Name’: ’Synthe…
4 P005 Redefine Virtual Synergies 2024-01-13 2025-03-26 1 Finance Up to Date 2026-03-26 {‘Project_ID’: ‘P005’, ‘Project_Name’: ’Redefi…

Generate Visual Graphs

The generate_graphs() function creates visual summaries from your project and reviewer data. These graphs help identify trends and workload distributions at a glance, enhancing your ability to make data-driven decisions.

generate_graphs(
    projects_file="/Users/cynthiamcginnis/Documents/ProjectReviewScheduler/Projects.csv",
    users_file="/Users/cynthiamcginnis/Documents/ProjectReviewScheduler/Users.csv"
)
 Saved: reviewer_workload.png
 Saved: review_frequency.png
from IPython.display import Image, display

display(Image(filename="reviewer_workload.png"))

png

Reviewer Workload by Department

This bar chart displays the current review load assigned to each reviewer, color-coded by department. It helps answer:

  • Who is overloaded or underutilized?

  • Is workload fairly distributed across departments?

Insights:

  • Most reviewers across departments have similar loads (5–6 reviews).

  • This balance suggests that the assignment algorithm is working as intended.

  • The chart makes it easy to spot any individuals or departments with unusual workloads.

display(Image(filename="review_frequency.png"))

png

Project Review Frequency by Department

This bar graph visualizes how frequently projects are reviewed, broken down by department.

Key Insights:

  • IT and QA departments have the highest number of annual review cycles (1 year).

  • Finance shows a more varied distribution, with projects reviewed both annually and biennially.

This breakdown may inform adjustments to scheduling policies or highlight departments requiring more frequent oversight.