# notification_service.py import logging import json import time import uuid import pytz from datetime import datetime, timedelta from enum import Enum from collections import deque from typing import List, Dict, Any, Optional, Union from config import get_timezone # Constants to replace magic values ONE_DAY_SECONDS = 86400 DEFAULT_TARGET_HOUR = 12 SIGNIFICANT_HASHRATE_CHANGE_PERCENT = 25 NOTIFICATION_WINDOW_MINUTES = 5 class NotificationLevel(Enum): INFO = "info" SUCCESS = "success" WARNING = "warning" ERROR = "error" class NotificationCategory(Enum): HASHRATE = "hashrate" BLOCK = "block" WORKER = "worker" EARNINGS = "earnings" SYSTEM = "system" class NotificationService: """Service for managing mining dashboard notifications.""" def __init__(self, state_manager): """Initialize with state manager for persistence.""" self.state_manager = state_manager self.notifications = [] self.daily_stats_time = "00:00:00" # When to post daily stats (midnight) self.last_daily_stats = None self.max_notifications = 100 # Maximum number to store self.last_block_height = None # Track the last seen block height self.last_payout_notification_time = None # Track the last payout notification time self.last_estimated_payout_time = None # Track the last estimated payout time # Load existing notifications from state self._load_notifications() # Load last block height from state self._load_last_block_height() def _get_current_time(self) -> datetime: """Get current datetime with the configured timezone.""" try: tz = pytz.timezone(get_timezone()) return datetime.now(tz) except Exception as e: logging.error(f"[NotificationService] Error getting timezone: {e}") # Fallback to naive datetime if timezone fails return datetime.now() def _parse_timestamp(self, timestamp_str: str) -> datetime: """Parse an ISO format timestamp string into a timezone-aware datetime.""" try: # Parse the naive datetime from the string dt = datetime.fromisoformat(timestamp_str) # If it's already timezone-aware, return it if dt.tzinfo is not None: return dt # Otherwise, make it timezone-aware tz = pytz.timezone(get_timezone()) return tz.localize(dt) except Exception as e: logging.error(f"[NotificationService] Error parsing timestamp: {e}") # Return current time as fallback return self._get_current_time() def _get_redis_value(self, key: str, default: Any = None) -> Any: """Generic method to retrieve values from Redis.""" try: if hasattr(self.state_manager, 'redis_client') and self.state_manager.redis_client: value = self.state_manager.redis_client.get(key) if value: return value.decode('utf-8') return default except Exception as e: logging.error(f"[NotificationService] Error retrieving {key} from Redis: {e}") return default def _set_redis_value(self, key: str, value: Any) -> bool: """Generic method to set values in Redis.""" try: if hasattr(self.state_manager, 'redis_client') and self.state_manager.redis_client: self.state_manager.redis_client.set(key, str(value)) logging.info(f"[NotificationService] Saved {key} to Redis: {value}") return True return False except Exception as e: logging.error(f"[NotificationService] Error saving {key} to Redis: {e}") return False def _load_notifications(self) -> None: """Load notifications with enhanced error handling.""" try: stored_notifications = self.state_manager.get_notifications() if stored_notifications: self.notifications = stored_notifications logging.info(f"[NotificationService] Loaded {len(self.notifications)} notifications from storage") else: self.notifications = [] logging.info("[NotificationService] No notifications found in storage, starting with empty list") except Exception as e: logging.error(f"[NotificationService] Error loading notifications: {e}") self.notifications = [] # Ensure we have a valid list def _load_last_block_height(self) -> None: """Load last block height from persistent storage.""" try: self.last_block_height = self._get_redis_value("last_block_height") if self.last_block_height: logging.info(f"[NotificationService] Loaded last block height from storage: {self.last_block_height}") else: logging.info("[NotificationService] No last block height found, starting with None") except Exception as e: logging.error(f"[NotificationService] Error loading last block height: {e}") def _save_last_block_height(self) -> None: """Save last block height to persistent storage.""" if self.last_block_height: self._set_redis_value("last_block_height", self.last_block_height) def _save_notifications(self) -> None: """Save notifications with improved pruning.""" try: # Sort by timestamp before pruning to ensure we keep the most recent if len(self.notifications) > self.max_notifications: self.notifications.sort(key=lambda n: n.get("timestamp", ""), reverse=True) self.notifications = self.notifications[:self.max_notifications] self.state_manager.save_notifications(self.notifications) logging.info(f"[NotificationService] Saved {len(self.notifications)} notifications") except Exception as e: logging.error(f"[NotificationService] Error saving notifications: {e}") def add_notification(self, message: str, level: NotificationLevel = NotificationLevel.INFO, category: NotificationCategory = NotificationCategory.SYSTEM, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Add a new notification. Args: message (str): Notification message text level (NotificationLevel): Severity level category (NotificationCategory): Classification category data (dict, optional): Additional data for the notification Returns: dict: The created notification """ notification = { "id": str(uuid.uuid4()), "timestamp": self._get_current_time().isoformat(), "message": message, "level": level.value, "category": category.value, "read": False } if data: notification["data"] = data self.notifications.append(notification) self._save_notifications() logging.info(f"[NotificationService] Added notification: {message}") return notification def get_notifications(self, limit: int = 50, offset: int = 0, unread_only: bool = False, category: Optional[str] = None, level: Optional[str] = None) -> List[Dict[str, Any]]: """ Get filtered notifications with optimized filtering. Args: limit (int): Maximum number to return offset (int): Starting offset for pagination unread_only (bool): Only return unread notifications category (str): Filter by category level (str): Filter by level Returns: list: Filtered notifications """ # Apply all filters in a single pass filtered = [ n for n in self.notifications if (not unread_only or not n.get("read", False)) and (not category or n.get("category") == category) and (not level or n.get("level") == level) ] # Sort by timestamp (newest first) filtered = sorted(filtered, key=lambda n: n.get("timestamp", ""), reverse=True) # Apply pagination return filtered[offset:offset + limit] def get_unread_count(self) -> int: """Get count of unread notifications.""" return sum(1 for n in self.notifications if not n.get("read", False)) def mark_as_read(self, notification_id: Optional[str] = None) -> bool: """ Mark notification(s) as read. Args: notification_id (str, optional): ID of specific notification to mark read, or None to mark all as read Returns: bool: True if successful """ if notification_id: # Mark specific notification as read for n in self.notifications: if n.get("id") == notification_id: n["read"] = True logging.info(f"[NotificationService] Marked notification {notification_id} as read") break else: # Mark all as read for n in self.notifications: n["read"] = True logging.info(f"[NotificationService] Marked all {len(self.notifications)} notifications as read") self._save_notifications() return True def delete_notification(self, notification_id: str) -> bool: """ Delete a specific notification. Args: notification_id (str): ID of notification to delete Returns: bool: True if successful """ original_count = len(self.notifications) self.notifications = [n for n in self.notifications if n.get("id") != notification_id] deleted = original_count - len(self.notifications) if deleted > 0: logging.info(f"[NotificationService] Deleted notification {notification_id}") self._save_notifications() return deleted > 0 def clear_notifications(self, category: Optional[str] = None, older_than_days: Optional[int] = None) -> int: """ Clear notifications with optimized filtering. Args: category (str, optional): Only clear specific category older_than_days (int, optional): Only clear notifications older than this Returns: int: Number of notifications cleared """ original_count = len(self.notifications) cutoff_date = None if older_than_days: cutoff_date = self._get_current_time() - timedelta(days=older_than_days) # Apply filters in a single pass self.notifications = [ n for n in self.notifications if (not category or n.get("category") != category) and (not cutoff_date or self._parse_timestamp(n.get("timestamp", self._get_current_time().isoformat())) >= cutoff_date) ] cleared_count = original_count - len(self.notifications) if cleared_count > 0: logging.info(f"[NotificationService] Cleared {cleared_count} notifications") self._save_notifications() return cleared_count def check_and_generate_notifications(self, current_metrics: Dict[str, Any], previous_metrics: Optional[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Check metrics and generate notifications for significant events. Args: current_metrics: Current system metrics previous_metrics: Previous system metrics for comparison Returns: list: Newly created notifications """ new_notifications = [] try: # Skip if no metrics if not current_metrics: logging.warning("[NotificationService] No current metrics available, skipping notification checks") return new_notifications # Check for block updates (using persistent storage) last_block_height = current_metrics.get("last_block_height") if last_block_height and last_block_height != "N/A": if self.last_block_height is not None and self.last_block_height != last_block_height: logging.info(f"[NotificationService] Block change detected: {self.last_block_height} -> {last_block_height}") block_notification = self._generate_block_notification(current_metrics) if block_notification: new_notifications.append(block_notification) # Always update the stored last block height when it changes if self.last_block_height != last_block_height: self.last_block_height = last_block_height self._save_last_block_height() # Regular comparison with previous metrics if previous_metrics: # Check for daily stats if self._should_post_daily_stats(): stats_notification = self._generate_daily_stats(current_metrics) if stats_notification: new_notifications.append(stats_notification) # Check for significant hashrate drop hashrate_notification = self._check_hashrate_change(current_metrics, previous_metrics) if hashrate_notification: new_notifications.append(hashrate_notification) # Check for earnings and payout progress earnings_notification = self._check_earnings_progress(current_metrics, previous_metrics) if earnings_notification: new_notifications.append(earnings_notification) return new_notifications except Exception as e: logging.error(f"[NotificationService] Error generating notifications: {e}") error_notification = self.add_notification( f"Error generating notifications: {str(e)}", level=NotificationLevel.ERROR, category=NotificationCategory.SYSTEM ) return [error_notification] def _should_post_daily_stats(self) -> bool: """Check if it's time to post daily stats with improved clarity.""" now = self._get_current_time() # Only proceed if we're in the target hour and within first 5 minutes if now.hour != DEFAULT_TARGET_HOUR or now.minute >= NOTIFICATION_WINDOW_MINUTES: return False # If we have a last_daily_stats timestamp, check if it's a different day if self.last_daily_stats and now.date() <= self.last_daily_stats.date(): return False # All conditions met, update timestamp and return True logging.info(f"[NotificationService] Posting daily stats at {now.hour}:{now.minute}") self.last_daily_stats = now return True def _generate_daily_stats(self, metrics: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Generate daily stats notification.""" try: if not metrics: logging.warning("[NotificationService] No metrics available for daily stats") return None # Format hashrate with appropriate unit hashrate_24hr = metrics.get("hashrate_24hr", 0) hashrate_unit = metrics.get("hashrate_24hr_unit", "TH/s") # Format daily earnings daily_mined_sats = metrics.get("daily_mined_sats", 0) daily_profit_usd = metrics.get("daily_profit_usd", 0) # Build message message = f"Daily Mining Summary: {hashrate_24hr} {hashrate_unit} average hashrate, {daily_mined_sats} SATS mined (${daily_profit_usd:.2f})" # Add notification logging.info(f"[NotificationService] Generating daily stats notification: {message}") return self.add_notification( message, level=NotificationLevel.INFO, category=NotificationCategory.HASHRATE, data={ "hashrate": hashrate_24hr, "unit": hashrate_unit, "daily_sats": daily_mined_sats, "daily_profit": daily_profit_usd } ) except Exception as e: logging.error(f"[NotificationService] Error generating daily stats notification: {e}") return None def _generate_block_notification(self, metrics: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Generate notification for a new block found.""" try: last_block_height = metrics.get("last_block_height", "Unknown") last_block_earnings = metrics.get("last_block_earnings", "0") logging.info(f"[NotificationService] Generating block notification: height={last_block_height}, earnings={last_block_earnings}") message = f"New block found by the pool! Block #{last_block_height}, earnings: {last_block_earnings} SATS" return self.add_notification( message, level=NotificationLevel.SUCCESS, category=NotificationCategory.BLOCK, data={ "block_height": last_block_height, "earnings": last_block_earnings } ) except Exception as e: logging.error(f"[NotificationService] Error generating block notification: {e}") return None def _parse_numeric_value(self, value_str: Any) -> float: """Parse numeric values from strings that may include units.""" if isinstance(value_str, (int, float)): return float(value_str) if isinstance(value_str, str): # Extract just the numeric part parts = value_str.split() try: return float(parts[0]) except (ValueError, IndexError): pass return 0.0 def _check_hashrate_change(self, current: Dict[str, Any], previous: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Check for significant hashrate changes using 10-minute average.""" try: # Get 10min hashrate values current_10min = current.get("hashrate_10min", 0) previous_10min = previous.get("hashrate_10min", 0) # Log what we're comparing logging.debug(f"[NotificationService] Comparing 10min hashrates - current: {current_10min}, previous: {previous_10min}") # Skip if values are missing if not current_10min or not previous_10min: logging.debug("[NotificationService] Skipping hashrate check - missing values") return None # Parse values consistently current_value = self._parse_numeric_value(current_10min) previous_value = self._parse_numeric_value(previous_10min) logging.debug(f"[NotificationService] Converted 10min hashrates - current: {current_value}, previous: {previous_value}") # Skip if previous was zero (prevents division by zero) if previous_value == 0: logging.debug("[NotificationService] Skipping hashrate check - previous was zero") return None # Calculate percentage change percent_change = ((current_value - previous_value) / previous_value) * 100 logging.debug(f"[NotificationService] 10min hashrate change: {percent_change:.1f}%") # Significant decrease if percent_change <= -SIGNIFICANT_HASHRATE_CHANGE_PERCENT: message = f"Significant 10min hashrate drop detected: {abs(percent_change):.1f}% decrease" logging.info(f"[NotificationService] Generating hashrate notification: {message}") return self.add_notification( message, level=NotificationLevel.WARNING, category=NotificationCategory.HASHRATE, data={ "previous": previous_value, "current": current_value, "change": percent_change, "timeframe": "10min" } ) # Significant increase elif percent_change >= SIGNIFICANT_HASHRATE_CHANGE_PERCENT: message = f"10min hashrate increase detected: {percent_change:.1f}% increase" logging.info(f"[NotificationService] Generating hashrate notification: {message}") return self.add_notification( message, level=NotificationLevel.SUCCESS, category=NotificationCategory.HASHRATE, data={ "previous": previous_value, "current": current_value, "change": percent_change, "timeframe": "10min" } ) return None except Exception as e: logging.error(f"[NotificationService] Error checking hashrate change: {e}") return None def _check_earnings_progress(self, current: Dict[str, Any], previous: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Check for significant earnings progress or payout approach.""" try: current_unpaid = self._parse_numeric_value(current.get("unpaid_earnings", "0")) # Check if approaching payout if current.get("est_time_to_payout"): est_time = current.get("est_time_to_payout") # If estimated time is a number of days if est_time.isdigit() or (est_time[0] == '-' and est_time[1:].isdigit()): days = int(est_time) if 0 < days <= 1: if self._should_send_payout_notification(): message = f"Payout approaching! Estimated within 1 day" self.last_payout_notification_time = self._get_current_time() return self.add_notification( message, level=NotificationLevel.SUCCESS, category=NotificationCategory.EARNINGS, data={"days_to_payout": days} ) # If it says "next block" elif "next block" in est_time.lower(): if self._should_send_payout_notification(): message = f"Payout expected with next block!" self.last_payout_notification_time = self._get_current_time() return self.add_notification( message, level=NotificationLevel.SUCCESS, category=NotificationCategory.EARNINGS, data={"payout_imminent": True} ) # Check for payout (unpaid balance reset) if previous.get("unpaid_earnings"): previous_unpaid = self._parse_numeric_value(previous.get("unpaid_earnings", "0")) # If balance significantly decreased, likely a payout occurred if previous_unpaid > 0 and current_unpaid < previous_unpaid * 0.5: message = f"Payout received! Unpaid balance reset from {previous_unpaid} to {current_unpaid} BTC" return self.add_notification( message, level=NotificationLevel.SUCCESS, category=NotificationCategory.EARNINGS, data={ "previous_balance": previous_unpaid, "current_balance": current_unpaid, "payout_amount": previous_unpaid - current_unpaid } ) return None except Exception as e: logging.error(f"[NotificationService] Error checking earnings progress: {e}") return None def _should_send_payout_notification(self) -> bool: """Check if enough time has passed since the last payout notification.""" if self.last_payout_notification_time is None: return True time_since_last_notification = self._get_current_time() - self.last_payout_notification_time return time_since_last_notification.total_seconds() > ONE_DAY_SECONDS