custom-ocean.xyz-dashboard/state_manager.py
2025-03-27 09:51:42 -07:00

466 lines
22 KiB
Python

"""
State manager module for handling persistent state and history.
"""
import logging
import json
import time
import gc
import threading
import redis
# Global variables for arrow history, legacy hashrate history, and a log of full metrics snapshots.
arrow_history = {} # stored per second
hashrate_history = []
metrics_log = []
# Limits for data collections to prevent memory growth
MAX_HISTORY_ENTRIES = 180 # 3 hours worth at 1 min intervals
# Lock for thread safety
state_lock = threading.Lock()
class StateManager:
"""Manager for persistent state and history data."""
def __init__(self, redis_url=None):
"""
Initialize the state manager.
Args:
redis_url (str, optional): Redis URL for persistent storage
"""
self.redis_client = self._connect_to_redis(redis_url) if redis_url else None
self.STATE_KEY = "graph_state"
self.last_save_time = 0
# Load state if available
self.load_graph_state()
def _connect_to_redis(self, redis_url):
"""
Connect to Redis with retry logic.
Args:
redis_url (str): Redis URL
Returns:
redis.Redis: Redis client or None if connection failed
"""
if not redis_url:
logging.info("Redis URL not configured, using in-memory state only.")
return None
retry_count = 0
max_retries = 3
while retry_count < max_retries:
try:
client = redis.Redis.from_url(redis_url)
client.ping() # Test the connection
logging.info(f"Connected to Redis at {redis_url}")
return client
except Exception as e:
retry_count += 1
if retry_count < max_retries:
logging.warning(f"Redis connection attempt {retry_count} failed: {e}. Retrying...")
time.sleep(1) # Wait before retrying
else:
logging.error(f"Could not connect to Redis after {max_retries} attempts: {e}")
return None
def load_graph_state(self):
"""Load graph state from Redis with support for the optimized format."""
global arrow_history, hashrate_history, metrics_log
if not self.redis_client:
logging.info("Redis not available, using in-memory state.")
return
try:
# Check version to handle format changes
version = self.redis_client.get(f"{self.STATE_KEY}_version")
version = version.decode('utf-8') if version else "1.0"
state_json = self.redis_client.get(self.STATE_KEY)
if state_json:
state = json.loads(state_json)
# Handle different versions of the data format
if version == "2.0": # Optimized format
# Restore arrow_history
compact_arrow_history = state.get("arrow_history", {})
for key, values in compact_arrow_history.items():
arrow_history[key] = [
{"time": entry.get("t", ""),
"value": entry.get("v", 0),
"arrow": entry.get("a", "")} # Use saved arrow value
for entry in values
]
# Restore hashrate_history
hashrate_history = state.get("hashrate_history", [])
# Restore metrics_log
compact_metrics_log = state.get("metrics_log", [])
metrics_log = []
for entry in compact_metrics_log:
metrics_log.append({
"timestamp": entry.get("ts", ""),
"metrics": entry.get("m", {})
})
else: # Original format
arrow_history = state.get("arrow_history", {})
hashrate_history = state.get("hashrate_history", [])
metrics_log = state.get("metrics_log", [])
logging.info(f"Loaded graph state from Redis (format version {version}).")
else:
logging.info("No previous graph state found in Redis.")
except Exception as e:
logging.error(f"Error loading graph state from Redis: {e}")
def save_graph_state(self):
"""Save graph state to Redis with optimized frequency, pruning, and data reduction."""
if not self.redis_client:
logging.info("Redis not available, skipping state save.")
return
# Check if we've saved recently to avoid too frequent saves
# Only save at most once every 5 minutes
current_time = time.time()
if hasattr(self, 'last_save_time') and current_time - self.last_save_time < 300: # 300 seconds = 5 minutes
logging.debug("Skipping Redis save - last save was less than 5 minutes ago")
return
# Update the last save time
self.last_save_time = current_time
# Prune data first to reduce volume
self.prune_old_data()
# Create compact versions of the data structures for Redis storage
try:
# 1. Create compact arrow_history with minimal data
compact_arrow_history = {}
for key, values in arrow_history.items():
if isinstance(values, list) and values:
# Only store recent history (last 2 hours)
recent_values = values[-120:] if len(values) > 120 else values
# Use shorter field names and preserve arrow directions
compact_arrow_history[key] = [
{"t": entry["time"], "v": entry["value"], "a": entry["arrow"]}
for entry in recent_values
]
# 2. Only keep essential hashrate_history
compact_hashrate_history = hashrate_history[-60:] if len(hashrate_history) > 60 else hashrate_history
# 3. Only keep recent metrics_log entries (last 30 minutes)
# This is typically the largest data structure
compact_metrics_log = []
if metrics_log:
# Keep only last 30 entries (30 minutes assuming 1-minute updates)
recent_logs = metrics_log[-30:]
for entry in recent_logs:
# Only keep necessary fields from each metrics entry
if "metrics" in entry and "timestamp" in entry:
metrics_copy = {}
original_metrics = entry["metrics"]
# Only copy the most important metrics for historical tracking
essential_keys = [
"hashrate_60sec", "hashrate_24hr", "btc_price",
"workers_hashing", "unpaid_earnings", "difficulty",
"network_hashrate", "daily_profit_usd"
]
for key in essential_keys:
if key in original_metrics:
metrics_copy[key] = original_metrics[key]
# Skip arrow_history within metrics as we already stored it separately
compact_metrics_log.append({
"ts": entry["timestamp"],
"m": metrics_copy
})
# Create the final state object
state = {
"arrow_history": compact_arrow_history,
"hashrate_history": compact_hashrate_history,
"metrics_log": compact_metrics_log
}
# Convert to JSON once to reuse and measure size
state_json = json.dumps(state)
data_size_kb = len(state_json) / 1024
# Log data size for monitoring
logging.info(f"Saving graph state to Redis: {data_size_kb:.2f} KB (optimized format)")
# Only save if data size is reasonable (adjust threshold as needed)
if data_size_kb > 2000: # 2MB warning threshold (reduced from 5MB)
logging.warning(f"Redis save data size is still large: {data_size_kb:.2f} KB")
# Store version info to handle future format changes
self.redis_client.set(f"{self.STATE_KEY}_version", "2.0")
self.redis_client.set(self.STATE_KEY, state_json)
logging.info(f"Successfully saved graph state to Redis ({data_size_kb:.2f} KB)")
except Exception as e:
logging.error(f"Error saving graph state to Redis: {e}")
def prune_old_data(self):
"""Remove old data to prevent memory growth with optimized strategy."""
global arrow_history, metrics_log
with state_lock:
# Prune arrow_history with more sophisticated approach
for key in arrow_history:
if isinstance(arrow_history[key], list):
if len(arrow_history[key]) > MAX_HISTORY_ENTRIES:
# For most recent data (last hour) - keep every point
recent_data = arrow_history[key][-60:]
# For older data, reduce resolution by keeping every other point
older_data = arrow_history[key][:-60]
if len(older_data) > 0:
sparse_older_data = [older_data[i] for i in range(0, len(older_data), 2)]
arrow_history[key] = sparse_older_data + recent_data
else:
arrow_history[key] = recent_data
logging.info(f"Pruned {key} history from {len(arrow_history[key])} to {len(sparse_older_data + recent_data) if older_data else len(recent_data)} entries")
# Prune metrics_log more aggressively
if len(metrics_log) > MAX_HISTORY_ENTRIES:
# Keep most recent entries at full resolution
recent_logs = metrics_log[-60:]
# Reduce resolution of older entries
older_logs = metrics_log[:-60]
if len(older_logs) > 0:
sparse_older_logs = [older_logs[i] for i in range(0, len(older_logs), 3)] # Keep every 3rd entry
metrics_log = sparse_older_logs + recent_logs
logging.info(f"Pruned metrics log from {len(metrics_log)} to {len(sparse_older_logs + recent_logs)} entries")
# Free memory more aggressively
gc.collect()
def persist_critical_state(self, cached_metrics, scheduler_last_successful_run, last_metrics_update_time):
"""
Store critical state in Redis for recovery after worker restarts.
Args:
cached_metrics (dict): Current metrics
scheduler_last_successful_run (float): Timestamp of last successful scheduler run
last_metrics_update_time (float): Timestamp of last metrics update
"""
if not self.redis_client:
return
try:
# Only persist if we have valid data
if cached_metrics and cached_metrics.get("server_timestamp"):
state = {
"cached_metrics_timestamp": cached_metrics.get("server_timestamp"),
"last_successful_run": scheduler_last_successful_run,
"last_update_time": last_metrics_update_time
}
self.redis_client.set("critical_state", json.dumps(state))
logging.info(f"Persisted critical state to Redis, timestamp: {cached_metrics.get('server_timestamp')}")
except Exception as e:
logging.error(f"Error persisting critical state: {e}")
def load_critical_state(self):
"""
Recover critical state variables after a worker restart.
Returns:
tuple: (last_successful_run, last_update_time)
"""
if not self.redis_client:
return None, None
try:
state_json = self.redis_client.get("critical_state")
if state_json:
state = json.loads(state_json.decode('utf-8'))
last_successful_run = state.get("last_successful_run")
last_update_time = state.get("last_update_time")
logging.info(f"Loaded critical state from Redis, last run: {last_successful_run}")
# We don't restore cached_metrics itself, as we'll fetch fresh data
# Just note that we have state to recover from
logging.info(f"Last metrics timestamp from Redis: {state.get('cached_metrics_timestamp')}")
return last_successful_run, last_update_time
except Exception as e:
logging.error(f"Error loading critical state: {e}")
return None, None
def update_metrics_history(self, metrics):
"""
Update history collections with new metrics data.
Args:
metrics (dict): New metrics data
"""
global arrow_history, hashrate_history, metrics_log
# Skip if metrics is None
if not metrics:
return
arrow_keys = [
"pool_total_hashrate", "hashrate_24hr", "hashrate_3hr", "hashrate_10min",
"hashrate_60sec", "block_number", "btc_price", "network_hashrate",
"difficulty", "daily_revenue", "daily_power_cost", "daily_profit_usd",
"monthly_profit_usd", "daily_mined_sats", "monthly_mined_sats", "unpaid_earnings",
"estimated_earnings_per_day_sats", "estimated_earnings_next_block_sats", "estimated_rewards_in_window_sats",
"workers_hashing"
]
# --- Bucket by second (Los Angeles Time) with thread safety ---
from datetime import datetime
from zoneinfo import ZoneInfo
current_second = datetime.now(ZoneInfo("America/Los_Angeles")).strftime("%H:%M:%S")
with state_lock:
for key in arrow_keys:
if metrics.get(key) is not None:
current_val = metrics[key]
arrow = ""
# Get the corresponding unit key if available
unit_key = f"{key}_unit"
current_unit = metrics.get(unit_key, "")
if key in arrow_history and arrow_history[key]:
try:
previous_val = arrow_history[key][-1]["value"]
previous_unit = arrow_history[key][-1].get("unit", "")
previous_arrow = arrow_history[key][-1].get("arrow", "") # Get previous arrow
# Use the convert_to_ths function to normalize both values before comparison
if key.startswith("hashrate") and current_unit:
from models import convert_to_ths
norm_curr_val = convert_to_ths(float(current_val), current_unit)
norm_prev_val = convert_to_ths(float(previous_val), previous_unit if previous_unit else "th/s")
# Lower the threshold to 0.05% for more sensitivity
if norm_curr_val > norm_prev_val * 1.0005:
arrow = ""
elif norm_curr_val < norm_prev_val * 0.9995:
arrow = ""
else:
arrow = previous_arrow # Preserve previous arrow if change is insignificant
else:
# For non-hashrate values or when units are missing
# Try to convert to float for comparison
try:
curr_float = float(current_val)
prev_float = float(previous_val)
# Lower the threshold to 0.05% for more sensitivity
if curr_float > prev_float * 1.0005:
arrow = ""
elif curr_float < prev_float * 0.9995:
arrow = ""
else:
arrow = previous_arrow # Preserve previous arrow
except (ValueError, TypeError):
# If values can't be converted to float, compare directly
if current_val != previous_val:
arrow = "" if current_val > previous_val else ""
else:
arrow = previous_arrow # Preserve previous arrow
except Exception as e:
logging.error(f"Error calculating arrow for {key}: {e}")
# Keep previous arrow on error instead of empty string
if arrow_history[key] and arrow_history[key][-1].get("arrow"):
arrow = arrow_history[key][-1]["arrow"]
if key not in arrow_history:
arrow_history[key] = []
if not arrow_history[key] or arrow_history[key][-1]["time"] != current_second:
# Create new entry
entry = {
"time": current_second,
"value": current_val,
"arrow": arrow,
}
# Add unit information if available
if current_unit:
entry["unit"] = current_unit
arrow_history[key].append(entry)
else:
# Update existing entry
arrow_history[key][-1]["value"] = current_val
# Only update arrow if it's not empty - this preserves arrows between changes
if arrow:
arrow_history[key][-1]["arrow"] = arrow
# Update unit if available
if current_unit:
arrow_history[key][-1]["unit"] = current_unit
# Cap history to three hours worth (180 entries)
if len(arrow_history[key]) > MAX_HISTORY_ENTRIES:
arrow_history[key] = arrow_history[key][-MAX_HISTORY_ENTRIES:]
# --- Aggregate arrow_history by minute for the graph ---
aggregated_history = {}
for key, entries in arrow_history.items():
minute_groups = {}
for entry in entries:
minute = entry["time"][:5] # extract HH:MM
minute_groups[minute] = entry # take last entry for that minute
# Sort by time to ensure chronological order
aggregated_history[key] = sorted(list(minute_groups.values()),
key=lambda x: x["time"])
metrics["arrow_history"] = aggregated_history
metrics["history"] = hashrate_history
entry = {"timestamp": datetime.now().isoformat(), "metrics": metrics}
metrics_log.append(entry)
# Cap the metrics log to three hours worth (180 entries)
if len(metrics_log) > MAX_HISTORY_ENTRIES:
metrics_log = metrics_log[-MAX_HISTORY_ENTRIES:]
def save_notifications(self, notifications):
"""Save notifications to persistent storage."""
try:
# If we have Redis, use it
if self.redis_client:
notifications_json = json.dumps(notifications)
self.redis_client.set("dashboard_notifications", notifications_json)
return True
else:
# Otherwise just keep in memory
return True
except Exception as e:
logging.error(f"Error saving notifications: {e}")
return False
def get_notifications(self):
"""Retrieve notifications from persistent storage."""
try:
# If we have Redis, use it
if self.redis_client:
notifications_json = self.redis_client.get("dashboard_notifications")
if notifications_json:
return json.loads(notifications_json)
# Return empty list if not found or no Redis
return []
except Exception as e:
logging.error(f"Error retrieving notifications: {e}")
return []