mirror of
https://github.com/Retropex/custom-ocean.xyz-dashboard.git
synced 2025-05-12 03:00:45 +02:00

Updated the application to use a configurable timezone instead of hardcoding "America/Los_Angeles". This change impacts the dashboard, API endpoints, and worker services. Timezone is now fetched from a configuration file or environment variable, enhancing flexibility in time display. New API endpoints for available timezones and the current configured timezone have been added. The frontend now allows users to select their timezone from a dropdown menu, which is stored in local storage for future use. Timestamps in the UI have been updated to reflect the selected timezone.
470 lines
22 KiB
Python
470 lines
22 KiB
Python
"""
|
|
State manager module for handling persistent state and history.
|
|
"""
|
|
import logging
|
|
import json
|
|
import time
|
|
import gc
|
|
import threading
|
|
import redis
|
|
from config import get_timezone
|
|
|
|
# Global variables for arrow history, legacy hashrate history, and a log of full metrics snapshots.
|
|
arrow_history = {} # stored per second
|
|
hashrate_history = []
|
|
metrics_log = []
|
|
|
|
# Limits for data collections to prevent memory growth
|
|
MAX_HISTORY_ENTRIES = 180 # 3 hours worth at 1 min intervals
|
|
|
|
# Lock for thread safety
|
|
state_lock = threading.Lock()
|
|
|
|
class StateManager:
|
|
"""Manager for persistent state and history data."""
|
|
|
|
def __init__(self, redis_url=None):
|
|
"""
|
|
Initialize the state manager.
|
|
|
|
Args:
|
|
redis_url (str, optional): Redis URL for persistent storage
|
|
"""
|
|
self.redis_client = self._connect_to_redis(redis_url) if redis_url else None
|
|
self.STATE_KEY = "graph_state"
|
|
self.last_save_time = 0
|
|
|
|
# Load state if available
|
|
self.load_graph_state()
|
|
|
|
def _connect_to_redis(self, redis_url):
|
|
"""
|
|
Connect to Redis with retry logic.
|
|
|
|
Args:
|
|
redis_url (str): Redis URL
|
|
|
|
Returns:
|
|
redis.Redis: Redis client or None if connection failed
|
|
"""
|
|
if not redis_url:
|
|
logging.info("Redis URL not configured, using in-memory state only.")
|
|
return None
|
|
|
|
retry_count = 0
|
|
max_retries = 3
|
|
|
|
while retry_count < max_retries:
|
|
try:
|
|
client = redis.Redis.from_url(redis_url)
|
|
client.ping() # Test the connection
|
|
logging.info(f"Connected to Redis at {redis_url}")
|
|
return client
|
|
except Exception as e:
|
|
retry_count += 1
|
|
if retry_count < max_retries:
|
|
logging.warning(f"Redis connection attempt {retry_count} failed: {e}. Retrying...")
|
|
time.sleep(1) # Wait before retrying
|
|
else:
|
|
logging.error(f"Could not connect to Redis after {max_retries} attempts: {e}")
|
|
return None
|
|
|
|
def load_graph_state(self):
|
|
"""Load graph state from Redis with support for the optimized format."""
|
|
global arrow_history, hashrate_history, metrics_log
|
|
|
|
if not self.redis_client:
|
|
logging.info("Redis not available, using in-memory state.")
|
|
return
|
|
|
|
try:
|
|
# Check version to handle format changes
|
|
version = self.redis_client.get(f"{self.STATE_KEY}_version")
|
|
version = version.decode('utf-8') if version else "1.0"
|
|
|
|
state_json = self.redis_client.get(self.STATE_KEY)
|
|
if state_json:
|
|
state = json.loads(state_json)
|
|
|
|
# Handle different versions of the data format
|
|
if version == "2.0": # Optimized format
|
|
# Restore arrow_history
|
|
compact_arrow_history = state.get("arrow_history", {})
|
|
for key, values in compact_arrow_history.items():
|
|
arrow_history[key] = [
|
|
{"time": entry.get("t", ""),
|
|
"value": entry.get("v", 0),
|
|
"arrow": entry.get("a", "")} # Use saved arrow value
|
|
for entry in values
|
|
]
|
|
|
|
# Restore hashrate_history
|
|
hashrate_history = state.get("hashrate_history", [])
|
|
|
|
# Restore metrics_log
|
|
compact_metrics_log = state.get("metrics_log", [])
|
|
metrics_log = []
|
|
for entry in compact_metrics_log:
|
|
metrics_log.append({
|
|
"timestamp": entry.get("ts", ""),
|
|
"metrics": entry.get("m", {})
|
|
})
|
|
else: # Original format
|
|
arrow_history = state.get("arrow_history", {})
|
|
hashrate_history = state.get("hashrate_history", [])
|
|
metrics_log = state.get("metrics_log", [])
|
|
|
|
logging.info(f"Loaded graph state from Redis (format version {version}).")
|
|
else:
|
|
logging.info("No previous graph state found in Redis.")
|
|
except Exception as e:
|
|
logging.error(f"Error loading graph state from Redis: {e}")
|
|
|
|
def save_graph_state(self):
|
|
"""Save graph state to Redis with optimized frequency, pruning, and data reduction."""
|
|
if not self.redis_client:
|
|
logging.info("Redis not available, skipping state save.")
|
|
return
|
|
|
|
# Check if we've saved recently to avoid too frequent saves
|
|
# Only save at most once every 5 minutes
|
|
current_time = time.time()
|
|
if hasattr(self, 'last_save_time') and current_time - self.last_save_time < 300: # 300 seconds = 5 minutes
|
|
logging.debug("Skipping Redis save - last save was less than 5 minutes ago")
|
|
return
|
|
|
|
# Update the last save time
|
|
self.last_save_time = current_time
|
|
|
|
# Prune data first to reduce volume
|
|
self.prune_old_data()
|
|
|
|
# Create compact versions of the data structures for Redis storage
|
|
try:
|
|
# 1. Create compact arrow_history with minimal data
|
|
compact_arrow_history = {}
|
|
for key, values in arrow_history.items():
|
|
if isinstance(values, list) and values:
|
|
# Only store recent history (last 2 hours)
|
|
recent_values = values[-180:] if len(values) > 180 else values
|
|
# Use shorter field names and preserve arrow directions
|
|
compact_arrow_history[key] = [
|
|
{"t": entry["time"], "v": entry["value"], "a": entry["arrow"]}
|
|
for entry in recent_values
|
|
]
|
|
|
|
# 2. Only keep essential hashrate_history
|
|
compact_hashrate_history = hashrate_history[-60:] if len(hashrate_history) > 60 else hashrate_history
|
|
|
|
# 3. Only keep recent metrics_log entries (last 30 minutes)
|
|
# This is typically the largest data structure
|
|
compact_metrics_log = []
|
|
if metrics_log:
|
|
# Keep only last 30 entries (30 minutes assuming 1-minute updates)
|
|
recent_logs = metrics_log[-30:]
|
|
|
|
for entry in recent_logs:
|
|
# Only keep necessary fields from each metrics entry
|
|
if "metrics" in entry and "timestamp" in entry:
|
|
metrics_copy = {}
|
|
original_metrics = entry["metrics"]
|
|
|
|
# Only copy the most important metrics for historical tracking
|
|
essential_keys = [
|
|
"hashrate_60sec", "hashrate_24hr", "btc_price",
|
|
"workers_hashing", "unpaid_earnings", "difficulty",
|
|
"network_hashrate", "daily_profit_usd"
|
|
]
|
|
|
|
for key in essential_keys:
|
|
if key in original_metrics:
|
|
metrics_copy[key] = original_metrics[key]
|
|
|
|
# Skip arrow_history within metrics as we already stored it separately
|
|
compact_metrics_log.append({
|
|
"ts": entry["timestamp"],
|
|
"m": metrics_copy
|
|
})
|
|
|
|
# Create the final state object
|
|
state = {
|
|
"arrow_history": compact_arrow_history,
|
|
"hashrate_history": compact_hashrate_history,
|
|
"metrics_log": compact_metrics_log
|
|
}
|
|
|
|
# Convert to JSON once to reuse and measure size
|
|
state_json = json.dumps(state)
|
|
data_size_kb = len(state_json) / 1024
|
|
|
|
# Log data size for monitoring
|
|
logging.info(f"Saving graph state to Redis: {data_size_kb:.2f} KB (optimized format)")
|
|
|
|
# Only save if data size is reasonable (adjust threshold as needed)
|
|
if data_size_kb > 2000: # 2MB warning threshold (reduced from 5MB)
|
|
logging.warning(f"Redis save data size is still large: {data_size_kb:.2f} KB")
|
|
|
|
# Store version info to handle future format changes
|
|
self.redis_client.set(f"{self.STATE_KEY}_version", "2.0")
|
|
self.redis_client.set(self.STATE_KEY, state_json)
|
|
logging.info(f"Successfully saved graph state to Redis ({data_size_kb:.2f} KB)")
|
|
except Exception as e:
|
|
logging.error(f"Error saving graph state to Redis: {e}")
|
|
|
|
def prune_old_data(self):
|
|
"""Remove old data to prevent memory growth with optimized strategy."""
|
|
global arrow_history, metrics_log
|
|
|
|
with state_lock:
|
|
# Prune arrow_history with more sophisticated approach
|
|
for key in arrow_history:
|
|
if isinstance(arrow_history[key], list):
|
|
if len(arrow_history[key]) > MAX_HISTORY_ENTRIES:
|
|
# For most recent data (last hour) - keep every point
|
|
recent_data = arrow_history[key][-60:]
|
|
|
|
# For older data, reduce resolution by keeping every other point
|
|
older_data = arrow_history[key][:-60]
|
|
if len(older_data) > 0:
|
|
sparse_older_data = [older_data[i] for i in range(0, len(older_data), 2)]
|
|
arrow_history[key] = sparse_older_data + recent_data
|
|
else:
|
|
arrow_history[key] = recent_data
|
|
|
|
logging.info(f"Pruned {key} history from {len(arrow_history[key])} to {len(sparse_older_data + recent_data) if older_data else len(recent_data)} entries")
|
|
|
|
# Prune metrics_log more aggressively
|
|
if len(metrics_log) > MAX_HISTORY_ENTRIES:
|
|
# Keep most recent entries at full resolution
|
|
recent_logs = metrics_log[-60:]
|
|
|
|
# Reduce resolution of older entries
|
|
older_logs = metrics_log[:-60]
|
|
if len(older_logs) > 0:
|
|
sparse_older_logs = [older_logs[i] for i in range(0, len(older_logs), 3)] # Keep every 3rd entry
|
|
metrics_log = sparse_older_logs + recent_logs
|
|
logging.info(f"Pruned metrics log from {len(metrics_log)} to {len(sparse_older_logs + recent_logs)} entries")
|
|
|
|
# Free memory more aggressively
|
|
gc.collect()
|
|
|
|
def persist_critical_state(self, cached_metrics, scheduler_last_successful_run, last_metrics_update_time):
|
|
"""
|
|
Store critical state in Redis for recovery after worker restarts.
|
|
|
|
Args:
|
|
cached_metrics (dict): Current metrics
|
|
scheduler_last_successful_run (float): Timestamp of last successful scheduler run
|
|
last_metrics_update_time (float): Timestamp of last metrics update
|
|
"""
|
|
if not self.redis_client:
|
|
return
|
|
|
|
try:
|
|
# Only persist if we have valid data
|
|
if cached_metrics and cached_metrics.get("server_timestamp"):
|
|
state = {
|
|
"cached_metrics_timestamp": cached_metrics.get("server_timestamp"),
|
|
"last_successful_run": scheduler_last_successful_run,
|
|
"last_update_time": last_metrics_update_time
|
|
}
|
|
self.redis_client.set("critical_state", json.dumps(state))
|
|
logging.info(f"Persisted critical state to Redis, timestamp: {cached_metrics.get('server_timestamp')}")
|
|
except Exception as e:
|
|
logging.error(f"Error persisting critical state: {e}")
|
|
|
|
def load_critical_state(self):
|
|
"""
|
|
Recover critical state variables after a worker restart.
|
|
|
|
Returns:
|
|
tuple: (last_successful_run, last_update_time)
|
|
"""
|
|
if not self.redis_client:
|
|
return None, None
|
|
|
|
try:
|
|
state_json = self.redis_client.get("critical_state")
|
|
if state_json:
|
|
state = json.loads(state_json.decode('utf-8'))
|
|
last_successful_run = state.get("last_successful_run")
|
|
last_update_time = state.get("last_update_time")
|
|
|
|
logging.info(f"Loaded critical state from Redis, last run: {last_successful_run}")
|
|
|
|
# We don't restore cached_metrics itself, as we'll fetch fresh data
|
|
# Just note that we have state to recover from
|
|
logging.info(f"Last metrics timestamp from Redis: {state.get('cached_metrics_timestamp')}")
|
|
|
|
return last_successful_run, last_update_time
|
|
except Exception as e:
|
|
logging.error(f"Error loading critical state: {e}")
|
|
|
|
return None, None
|
|
|
|
def update_metrics_history(self, metrics):
|
|
"""
|
|
Update history collections with new metrics data.
|
|
|
|
Args:
|
|
metrics (dict): New metrics data
|
|
"""
|
|
global arrow_history, hashrate_history, metrics_log
|
|
|
|
# Skip if metrics is None
|
|
if not metrics:
|
|
return
|
|
|
|
arrow_keys = [
|
|
"pool_total_hashrate", "hashrate_24hr", "hashrate_3hr", "hashrate_10min",
|
|
"hashrate_60sec", "block_number", "btc_price", "network_hashrate",
|
|
"difficulty", "daily_revenue", "daily_power_cost", "daily_profit_usd",
|
|
"monthly_profit_usd", "daily_mined_sats", "monthly_mined_sats", "unpaid_earnings",
|
|
"estimated_earnings_per_day_sats", "estimated_earnings_next_block_sats", "estimated_rewards_in_window_sats",
|
|
"workers_hashing"
|
|
]
|
|
|
|
# --- Bucket by second (Los Angeles Time) with thread safety ---
|
|
from datetime import datetime
|
|
from zoneinfo import ZoneInfo
|
|
|
|
current_second = datetime.now(ZoneInfo(get_timezone())).strftime("%H:%M:%S")
|
|
|
|
with state_lock:
|
|
for key in arrow_keys:
|
|
if metrics.get(key) is not None:
|
|
current_val = metrics[key]
|
|
arrow = ""
|
|
|
|
# Get the corresponding unit key if available
|
|
unit_key = f"{key}_unit"
|
|
current_unit = metrics.get(unit_key, "")
|
|
|
|
if key in arrow_history and arrow_history[key]:
|
|
try:
|
|
previous_val = arrow_history[key][-1]["value"]
|
|
previous_unit = arrow_history[key][-1].get("unit", "")
|
|
previous_arrow = arrow_history[key][-1].get("arrow", "") # Get previous arrow
|
|
|
|
# Use the convert_to_ths function to normalize both values before comparison
|
|
if key.startswith("hashrate") and current_unit:
|
|
from models import convert_to_ths
|
|
norm_curr_val = convert_to_ths(float(current_val), current_unit)
|
|
norm_prev_val = convert_to_ths(float(previous_val), previous_unit if previous_unit else "th/s")
|
|
|
|
# Lower the threshold to 0.05% for more sensitivity
|
|
if norm_curr_val > norm_prev_val * 1.0001:
|
|
arrow = "↑"
|
|
elif norm_curr_val < norm_prev_val * 0.9999:
|
|
arrow = "↓"
|
|
else:
|
|
arrow = previous_arrow # Preserve previous arrow if change is insignificant
|
|
else:
|
|
# For non-hashrate values or when units are missing
|
|
# Try to convert to float for comparison
|
|
try:
|
|
curr_float = float(current_val)
|
|
prev_float = float(previous_val)
|
|
|
|
# Lower the threshold to 0.05% for more sensitivity
|
|
if curr_float > prev_float * 1.0001:
|
|
arrow = "↑"
|
|
elif curr_float < prev_float * 0.9999:
|
|
arrow = "↓"
|
|
else:
|
|
arrow = previous_arrow # Preserve previous arrow
|
|
except (ValueError, TypeError):
|
|
# If values can't be converted to float, compare directly
|
|
if current_val != previous_val:
|
|
arrow = "↑" if current_val > previous_val else "↓"
|
|
else:
|
|
arrow = previous_arrow # Preserve previous arrow
|
|
except Exception as e:
|
|
logging.error(f"Error calculating arrow for {key}: {e}")
|
|
# Keep previous arrow on error instead of empty string
|
|
if arrow_history[key] and arrow_history[key][-1].get("arrow"):
|
|
arrow = arrow_history[key][-1]["arrow"]
|
|
|
|
if key not in arrow_history:
|
|
arrow_history[key] = []
|
|
|
|
if not arrow_history[key] or arrow_history[key][-1]["time"] != current_second:
|
|
# Create new entry
|
|
entry = {
|
|
"time": current_second,
|
|
"value": current_val,
|
|
"arrow": arrow,
|
|
}
|
|
# Add unit information if available
|
|
if current_unit:
|
|
entry["unit"] = current_unit
|
|
|
|
arrow_history[key].append(entry)
|
|
else:
|
|
# Update existing entry
|
|
arrow_history[key][-1]["value"] = current_val
|
|
# Only update arrow if it's not empty - this preserves arrows between changes
|
|
if arrow:
|
|
arrow_history[key][-1]["arrow"] = arrow
|
|
# Update unit if available
|
|
if current_unit:
|
|
arrow_history[key][-1]["unit"] = current_unit
|
|
|
|
# Cap history to three hours worth (180 entries)
|
|
if len(arrow_history[key]) > MAX_HISTORY_ENTRIES:
|
|
arrow_history[key] = arrow_history[key][-MAX_HISTORY_ENTRIES:]
|
|
|
|
# --- Aggregate arrow_history by minute for the graph ---
|
|
aggregated_history = {}
|
|
for key, entries in arrow_history.items():
|
|
minute_groups = {}
|
|
for entry in entries:
|
|
minute = entry["time"][:5] # extract HH:MM
|
|
minute_groups[minute] = entry # take last entry for that minute
|
|
|
|
# Sort by time to ensure chronological order
|
|
aggregated_history[key] = sorted(list(minute_groups.values()),
|
|
key=lambda x: x["time"])
|
|
|
|
# Only keep the most recent 60 data points for the graph display
|
|
aggregated_history[key] = aggregated_history[key][-60:] if len(aggregated_history[key]) > 60 else aggregated_history[key]
|
|
|
|
metrics["arrow_history"] = aggregated_history
|
|
metrics["history"] = hashrate_history
|
|
|
|
entry = {"timestamp": datetime.now().isoformat(), "metrics": metrics}
|
|
metrics_log.append(entry)
|
|
# Cap the metrics log to three hours worth (180 entries)
|
|
if len(metrics_log) > MAX_HISTORY_ENTRIES:
|
|
metrics_log = metrics_log[-MAX_HISTORY_ENTRIES:]
|
|
|
|
def save_notifications(self, notifications):
|
|
"""Save notifications to persistent storage."""
|
|
try:
|
|
# If we have Redis, use it
|
|
if self.redis_client:
|
|
notifications_json = json.dumps(notifications)
|
|
self.redis_client.set("dashboard_notifications", notifications_json)
|
|
return True
|
|
else:
|
|
# Otherwise just keep in memory
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"Error saving notifications: {e}")
|
|
return False
|
|
|
|
def get_notifications(self):
|
|
"""Retrieve notifications from persistent storage."""
|
|
try:
|
|
# If we have Redis, use it
|
|
if self.redis_client:
|
|
notifications_json = self.redis_client.get("dashboard_notifications")
|
|
if notifications_json:
|
|
return json.loads(notifications_json)
|
|
|
|
# Return empty list if not found or no Redis
|
|
return []
|
|
except Exception as e:
|
|
logging.error(f"Error retrieving notifications: {e}")
|
|
return []
|