mirror of
https://github.com/Retropex/custom-ocean.xyz-dashboard.git
synced 2025-05-12 19:20:45 +02:00
Add files via upload
This commit is contained in:
parent
8dca2c41a4
commit
3566f2127c
493
notification_service.py
Normal file
493
notification_service.py
Normal file
@ -0,0 +1,493 @@
|
||||
# notification_service.py
|
||||
import logging
|
||||
import json
|
||||
import time
|
||||
import uuid
|
||||
from datetime import datetime, timedelta
|
||||
from enum import Enum
|
||||
from collections import deque
|
||||
|
||||
class NotificationLevel(Enum):
|
||||
INFO = "info"
|
||||
SUCCESS = "success"
|
||||
WARNING = "warning"
|
||||
ERROR = "error"
|
||||
|
||||
class NotificationCategory(Enum):
|
||||
HASHRATE = "hashrate"
|
||||
BLOCK = "block"
|
||||
WORKER = "worker"
|
||||
EARNINGS = "earnings"
|
||||
SYSTEM = "system"
|
||||
|
||||
class NotificationService:
|
||||
"""Service for managing mining dashboard notifications."""
|
||||
|
||||
def __init__(self, state_manager):
|
||||
"""Initialize with state manager for persistence."""
|
||||
self.state_manager = state_manager
|
||||
self.notifications = []
|
||||
self.daily_stats_time = "00:00:00" # When to post daily stats (midnight)
|
||||
self.last_daily_stats = None
|
||||
self.max_notifications = 100 # Maximum number to store
|
||||
|
||||
# Load existing notifications from state
|
||||
self._load_notifications()
|
||||
|
||||
def _load_notifications(self):
|
||||
"""Load notifications from persistent storage."""
|
||||
try:
|
||||
stored_notifications = self.state_manager.get_notifications()
|
||||
if stored_notifications:
|
||||
self.notifications = stored_notifications
|
||||
logging.info(f"Loaded {len(self.notifications)} notifications from storage")
|
||||
except Exception as e:
|
||||
logging.error(f"Error loading notifications: {e}")
|
||||
|
||||
def _save_notifications(self):
|
||||
"""Save notifications to persistent storage."""
|
||||
try:
|
||||
# Prune to max size before saving
|
||||
if len(self.notifications) > self.max_notifications:
|
||||
self.notifications = self.notifications[-self.max_notifications:]
|
||||
|
||||
self.state_manager.save_notifications(self.notifications)
|
||||
except Exception as e:
|
||||
logging.error(f"Error saving notifications: {e}")
|
||||
|
||||
def add_notification(self, message, level=NotificationLevel.INFO, category=NotificationCategory.SYSTEM, data=None):
|
||||
"""
|
||||
Add a new notification.
|
||||
|
||||
Args:
|
||||
message (str): Notification message text
|
||||
level (NotificationLevel): Severity level
|
||||
category (NotificationCategory): Classification category
|
||||
data (dict, optional): Additional data for the notification
|
||||
|
||||
Returns:
|
||||
dict: The created notification
|
||||
"""
|
||||
notification = {
|
||||
"id": str(uuid.uuid4()),
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"message": message,
|
||||
"level": level.value,
|
||||
"category": category.value,
|
||||
"read": False
|
||||
}
|
||||
|
||||
if data:
|
||||
notification["data"] = data
|
||||
|
||||
self.notifications.append(notification)
|
||||
self._save_notifications()
|
||||
|
||||
return notification
|
||||
|
||||
def get_notifications(self, limit=50, offset=0, unread_only=False, category=None, level=None):
|
||||
"""
|
||||
Get filtered notifications.
|
||||
|
||||
Args:
|
||||
limit (int): Maximum number to return
|
||||
offset (int): Starting offset for pagination
|
||||
unread_only (bool): Only return unread notifications
|
||||
category (str): Filter by category
|
||||
level (str): Filter by level
|
||||
|
||||
Returns:
|
||||
list: Filtered notifications
|
||||
"""
|
||||
filtered = self.notifications
|
||||
|
||||
# Apply filters
|
||||
if unread_only:
|
||||
filtered = [n for n in filtered if not n.get("read", False)]
|
||||
|
||||
if category:
|
||||
filtered = [n for n in filtered if n.get("category") == category]
|
||||
|
||||
if level:
|
||||
filtered = [n for n in filtered if n.get("level") == level]
|
||||
|
||||
# Sort by timestamp (newest first)
|
||||
filtered = sorted(filtered, key=lambda n: n.get("timestamp", ""), reverse=True)
|
||||
|
||||
# Apply pagination
|
||||
paginated = filtered[offset:offset + limit]
|
||||
|
||||
return paginated
|
||||
|
||||
def get_unread_count(self):
|
||||
"""Get count of unread notifications."""
|
||||
return sum(1 for n in self.notifications if not n.get("read", False))
|
||||
|
||||
def mark_as_read(self, notification_id=None):
|
||||
"""
|
||||
Mark notification(s) as read.
|
||||
|
||||
Args:
|
||||
notification_id (str, optional): ID of specific notification to mark read,
|
||||
or None to mark all as read
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
"""
|
||||
if notification_id:
|
||||
# Mark specific notification as read
|
||||
for n in self.notifications:
|
||||
if n.get("id") == notification_id:
|
||||
n["read"] = True
|
||||
break
|
||||
else:
|
||||
# Mark all as read
|
||||
for n in self.notifications:
|
||||
n["read"] = True
|
||||
|
||||
self._save_notifications()
|
||||
return True
|
||||
|
||||
def delete_notification(self, notification_id):
|
||||
"""
|
||||
Delete a specific notification.
|
||||
|
||||
Args:
|
||||
notification_id (str): ID of notification to delete
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
"""
|
||||
self.notifications = [n for n in self.notifications if n.get("id") != notification_id]
|
||||
self._save_notifications()
|
||||
return True
|
||||
|
||||
def clear_notifications(self, category=None, older_than_days=None):
|
||||
"""
|
||||
Clear notifications.
|
||||
|
||||
Args:
|
||||
category (str, optional): Only clear specific category
|
||||
older_than_days (int, optional): Only clear notifications older than this
|
||||
|
||||
Returns:
|
||||
int: Number of notifications cleared
|
||||
"""
|
||||
original_count = len(self.notifications)
|
||||
|
||||
if category and older_than_days:
|
||||
cutoff_date = datetime.now() - timedelta(days=older_than_days)
|
||||
self.notifications = [
|
||||
n for n in self.notifications
|
||||
if n.get("category") != category or
|
||||
datetime.fromisoformat(n.get("timestamp", datetime.now().isoformat())) >= cutoff_date
|
||||
]
|
||||
elif category:
|
||||
self.notifications = [n for n in self.notifications if n.get("category") != category]
|
||||
elif older_than_days:
|
||||
cutoff_date = datetime.now() - timedelta(days=older_than_days)
|
||||
self.notifications = [
|
||||
n for n in self.notifications
|
||||
if datetime.fromisoformat(n.get("timestamp", datetime.now().isoformat())) >= cutoff_date
|
||||
]
|
||||
else:
|
||||
self.notifications = []
|
||||
|
||||
self._save_notifications()
|
||||
return original_count - len(self.notifications)
|
||||
|
||||
def check_and_generate_notifications(self, current_metrics, previous_metrics):
|
||||
"""
|
||||
Check metrics and generate notifications for significant events.
|
||||
|
||||
Args:
|
||||
current_metrics (dict): Current mining metrics
|
||||
previous_metrics (dict): Previous mining metrics
|
||||
|
||||
Returns:
|
||||
list: Newly created notifications
|
||||
"""
|
||||
new_notifications = []
|
||||
|
||||
try:
|
||||
# Check for daily stats
|
||||
if self._should_post_daily_stats():
|
||||
stats_notification = self._generate_daily_stats(current_metrics)
|
||||
if stats_notification:
|
||||
new_notifications.append(stats_notification)
|
||||
|
||||
# Check for blocks found
|
||||
if previous_metrics and current_metrics:
|
||||
if self._check_blocks_found(current_metrics, previous_metrics):
|
||||
block_notification = self._generate_block_notification(current_metrics)
|
||||
if block_notification:
|
||||
new_notifications.append(block_notification)
|
||||
|
||||
# Check for significant hashrate drop
|
||||
if previous_metrics and current_metrics:
|
||||
hashrate_notification = self._check_hashrate_change(current_metrics, previous_metrics)
|
||||
if hashrate_notification:
|
||||
new_notifications.append(hashrate_notification)
|
||||
|
||||
# Check for worker status changes
|
||||
if previous_metrics and current_metrics:
|
||||
worker_notification = self._check_worker_status_change(current_metrics, previous_metrics)
|
||||
if worker_notification:
|
||||
new_notifications.append(worker_notification)
|
||||
|
||||
# Check for earnings and payout progress
|
||||
if previous_metrics and current_metrics:
|
||||
earnings_notification = self._check_earnings_progress(current_metrics, previous_metrics)
|
||||
if earnings_notification:
|
||||
new_notifications.append(earnings_notification)
|
||||
|
||||
return new_notifications
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error generating notifications: {e}")
|
||||
error_notification = self.add_notification(
|
||||
f"Error generating notifications: {str(e)}",
|
||||
level=NotificationLevel.ERROR,
|
||||
category=NotificationCategory.SYSTEM
|
||||
)
|
||||
return [error_notification]
|
||||
|
||||
def _should_post_daily_stats(self):
|
||||
"""Check if it's time to post daily stats."""
|
||||
now = datetime.now()
|
||||
|
||||
# If we have a last_daily_stats timestamp
|
||||
if self.last_daily_stats:
|
||||
# Check if it's been more than a day
|
||||
if (now - self.last_daily_stats).days >= 1:
|
||||
# Check if we're past the scheduled time
|
||||
current_time_str = now.strftime("%H:%M:%S")
|
||||
if current_time_str >= self.daily_stats_time:
|
||||
self.last_daily_stats = now
|
||||
return True
|
||||
return False
|
||||
else:
|
||||
# First time - post if it's past the scheduled time
|
||||
current_time_str = now.strftime("%H:%M:%S")
|
||||
if current_time_str >= self.daily_stats_time:
|
||||
self.last_daily_stats = now
|
||||
return True
|
||||
return False
|
||||
|
||||
def _generate_daily_stats(self, metrics):
|
||||
"""Generate daily stats notification."""
|
||||
try:
|
||||
if not metrics:
|
||||
return None
|
||||
|
||||
# Format hashrate with appropriate unit
|
||||
hashrate_24hr = metrics.get("hashrate_24hr", 0)
|
||||
hashrate_unit = metrics.get("hashrate_24hr_unit", "TH/s")
|
||||
|
||||
# Format daily earnings
|
||||
daily_mined_sats = metrics.get("daily_mined_sats", 0)
|
||||
daily_profit_usd = metrics.get("daily_profit_usd", 0)
|
||||
|
||||
# Build message
|
||||
message = f"Daily Mining Summary: {hashrate_24hr} {hashrate_unit} average hashrate, {daily_mined_sats} sats mined (${daily_profit_usd:.2f})"
|
||||
|
||||
# Add notification
|
||||
return self.add_notification(
|
||||
message,
|
||||
level=NotificationLevel.INFO,
|
||||
category=NotificationCategory.HASHRATE,
|
||||
data={
|
||||
"hashrate": hashrate_24hr,
|
||||
"unit": hashrate_unit,
|
||||
"daily_sats": daily_mined_sats,
|
||||
"daily_profit": daily_profit_usd
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Error generating daily stats notification: {e}")
|
||||
return None
|
||||
|
||||
def _check_blocks_found(self, current, previous):
|
||||
"""Check if new blocks were found by the pool."""
|
||||
current_blocks = current.get("blocks_found", "0")
|
||||
previous_blocks = previous.get("blocks_found", "0")
|
||||
|
||||
try:
|
||||
return int(current_blocks) > int(previous_blocks)
|
||||
except (ValueError, TypeError):
|
||||
return False
|
||||
|
||||
def _generate_block_notification(self, metrics):
|
||||
"""Generate notification for a new block found."""
|
||||
try:
|
||||
blocks_found = metrics.get("blocks_found", "0")
|
||||
last_block_height = metrics.get("last_block_height", "Unknown")
|
||||
last_block_earnings = metrics.get("last_block_earnings", "0")
|
||||
|
||||
message = f"New block found by the pool! Block #{last_block_height}, earnings: {last_block_earnings} sats"
|
||||
|
||||
return self.add_notification(
|
||||
message,
|
||||
level=NotificationLevel.SUCCESS,
|
||||
category=NotificationCategory.BLOCK,
|
||||
data={
|
||||
"blocks_found": blocks_found,
|
||||
"block_height": last_block_height,
|
||||
"earnings": last_block_earnings
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Error generating block notification: {e}")
|
||||
return None
|
||||
|
||||
def _check_hashrate_change(self, current, previous):
|
||||
"""Check for significant hashrate changes."""
|
||||
try:
|
||||
current_3hr = current.get("hashrate_3hr", 0)
|
||||
previous_3hr = previous.get("hashrate_3hr", 0)
|
||||
|
||||
# Skip if values are missing
|
||||
if not current_3hr or not previous_3hr:
|
||||
return None
|
||||
|
||||
# Convert to float
|
||||
current_3hr = float(current_3hr)
|
||||
previous_3hr = float(previous_3hr)
|
||||
|
||||
# Skip if previous was zero (prevents division by zero)
|
||||
if previous_3hr == 0:
|
||||
return None
|
||||
|
||||
# Calculate percentage change
|
||||
percent_change = ((current_3hr - previous_3hr) / previous_3hr) * 100
|
||||
|
||||
# Significant decrease (more than 15%)
|
||||
if percent_change <= -15:
|
||||
message = f"Significant hashrate drop detected: {abs(percent_change):.1f}% decrease"
|
||||
return self.add_notification(
|
||||
message,
|
||||
level=NotificationLevel.WARNING,
|
||||
category=NotificationCategory.HASHRATE,
|
||||
data={
|
||||
"previous": previous_3hr,
|
||||
"current": current_3hr,
|
||||
"change": percent_change
|
||||
}
|
||||
)
|
||||
|
||||
# Significant increase (more than 15%)
|
||||
elif percent_change >= 15:
|
||||
message = f"Hashrate increase detected: {percent_change:.1f}% increase"
|
||||
return self.add_notification(
|
||||
message,
|
||||
level=NotificationLevel.SUCCESS,
|
||||
category=NotificationCategory.HASHRATE,
|
||||
data={
|
||||
"previous": previous_3hr,
|
||||
"current": current_3hr,
|
||||
"change": percent_change
|
||||
}
|
||||
)
|
||||
|
||||
return None
|
||||
except Exception as e:
|
||||
logging.error(f"Error checking hashrate change: {e}")
|
||||
return None
|
||||
|
||||
def _check_worker_status_change(self, current, previous):
|
||||
"""Check for worker status changes."""
|
||||
try:
|
||||
current_workers = current.get("workers_hashing", 0)
|
||||
previous_workers = previous.get("workers_hashing", 0)
|
||||
|
||||
# Skip if values are missing
|
||||
if current_workers is None or previous_workers is None:
|
||||
return None
|
||||
|
||||
# Worker(s) went offline
|
||||
if current_workers < previous_workers:
|
||||
diff = previous_workers - current_workers
|
||||
message = f"{diff} worker{'s' if diff > 1 else ''} went offline"
|
||||
return self.add_notification(
|
||||
message,
|
||||
level=NotificationLevel.WARNING,
|
||||
category=NotificationCategory.WORKER,
|
||||
data={
|
||||
"previous_count": previous_workers,
|
||||
"current_count": current_workers,
|
||||
"difference": diff
|
||||
}
|
||||
)
|
||||
|
||||
# Worker(s) came online
|
||||
elif current_workers > previous_workers:
|
||||
diff = current_workers - previous_workers
|
||||
message = f"{diff} new worker{'s' if diff > 1 else ''} came online"
|
||||
return self.add_notification(
|
||||
message,
|
||||
level=NotificationLevel.SUCCESS,
|
||||
category=NotificationCategory.WORKER,
|
||||
data={
|
||||
"previous_count": previous_workers,
|
||||
"current_count": current_workers,
|
||||
"difference": diff
|
||||
}
|
||||
)
|
||||
|
||||
return None
|
||||
except Exception as e:
|
||||
logging.error(f"Error checking worker status change: {e}")
|
||||
return None
|
||||
|
||||
def _check_earnings_progress(self, current, previous):
|
||||
"""Check for significant earnings progress or payout approach."""
|
||||
try:
|
||||
current_unpaid = float(current.get("unpaid_earnings", "0").split()[0]) if isinstance(current.get("unpaid_earnings"), str) else current.get("unpaid_earnings", 0)
|
||||
|
||||
# Check if approaching payout
|
||||
if current.get("est_time_to_payout"):
|
||||
est_time = current.get("est_time_to_payout")
|
||||
|
||||
# If estimated time is a number of days
|
||||
if est_time.isdigit() or (est_time[0] == '-' and est_time[1:].isdigit()):
|
||||
days = int(est_time)
|
||||
if 0 < days <= 1:
|
||||
message = f"Payout approaching! Estimated within 1 day"
|
||||
return self.add_notification(
|
||||
message,
|
||||
level=NotificationLevel.SUCCESS,
|
||||
category=NotificationCategory.EARNINGS,
|
||||
data={"days_to_payout": days}
|
||||
)
|
||||
# If it says "next block"
|
||||
elif "next block" in est_time.lower():
|
||||
message = f"Payout expected with next block!"
|
||||
return self.add_notification(
|
||||
message,
|
||||
level=NotificationLevel.SUCCESS,
|
||||
category=NotificationCategory.EARNINGS,
|
||||
data={"payout_imminent": True}
|
||||
)
|
||||
|
||||
# Check for payout (unpaid balance reset)
|
||||
if previous.get("unpaid_earnings"):
|
||||
previous_unpaid = float(previous.get("unpaid_earnings", "0").split()[0]) if isinstance(previous.get("unpaid_earnings"), str) else previous.get("unpaid_earnings", 0)
|
||||
|
||||
# If balance significantly decreased, likely a payout occurred
|
||||
if previous_unpaid > 0 and current_unpaid < previous_unpaid * 0.5:
|
||||
message = f"Payout received! Unpaid balance reset from {previous_unpaid} to {current_unpaid} BTC"
|
||||
return self.add_notification(
|
||||
message,
|
||||
level=NotificationLevel.SUCCESS,
|
||||
category=NotificationCategory.EARNINGS,
|
||||
data={
|
||||
"previous_balance": previous_unpaid,
|
||||
"current_balance": current_unpaid,
|
||||
"payout_amount": previous_unpaid - current_unpaid
|
||||
}
|
||||
)
|
||||
|
||||
return None
|
||||
except Exception as e:
|
||||
logging.error(f"Error checking earnings progress: {e}")
|
||||
return None
|
Loading…
Reference in New Issue
Block a user