from flask import Flask, render_template, jsonify, Response, request import requests, json, os, logging, re, time, sys, gc, psutil, signal from datetime import datetime, timedelta from zoneinfo import ZoneInfo from dataclasses import dataclass from concurrent.futures import ThreadPoolExecutor from bs4 import BeautifulSoup from flask_caching import Cache import redis from apscheduler.schedulers.background import BackgroundScheduler import threading app = Flask(__name__) # Set up caching using a simple in-memory cache. cache = Cache(app, config={'CACHE_TYPE': 'SimpleCache', 'CACHE_DEFAULT_TIMEOUT': 10}) # Global variables for arrow history, legacy hashrate history, and a log of full metrics snapshots. arrow_history = {} # stored per second hashrate_history = [] metrics_log = [] # Limits for data collections to prevent memory growth MAX_HISTORY_ENTRIES = 180 # 3 hours worth at 1 min intervals MAX_SSE_CONNECTIONS = 10 # Maximum concurrent SSE connections MAX_SSE_CONNECTION_TIME = 900 # 15 minutes maximum SSE connection time (increased from 10 min) # Track active SSE connections active_sse_connections = 0 sse_connections_lock = threading.Lock() # Global variable to hold the cached metrics updated by the background job. cached_metrics = None last_metrics_update_time = None # Track scheduler health scheduler_last_successful_run = None scheduler_recreate_lock = threading.Lock() # Global lock for thread safety on shared state. state_lock = threading.Lock() # Configure logging. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # --- Global Server Start Time (Los Angeles Time) --- SERVER_START_TIME = datetime.now(ZoneInfo("America/Los_Angeles")) # --- Disable Client Caching for All Responses --- @app.after_request def add_header(response): response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate" response.headers["Pragma"] = "no-cache" response.headers["Expires"] = "0" return response # --- Memory usage monitoring --- def log_memory_usage(): """Log current memory usage""" try: process = psutil.Process(os.getpid()) mem_info = process.memory_info() logging.info(f"Memory usage: {mem_info.rss / 1024 / 1024:.2f} MB (RSS)") # Log the size of key data structures logging.info(f"Arrow history entries: {sum(len(v) for v in arrow_history.values() if isinstance(v, list))}") logging.info(f"Metrics log entries: {len(metrics_log)}") logging.info(f"Active SSE connections: {active_sse_connections}") except Exception as e: logging.error(f"Error logging memory usage: {e}") # --- Redis Connection for Shared State (fixed) --- def get_redis_client(): """Get a Redis client with connection retry logic.""" REDIS_URL = os.environ.get("REDIS_URL") # Make Redis truly optional - if no URL provided, don't attempt connection if not REDIS_URL: logging.info("Redis URL not configured, using in-memory state only.") return None retry_count = 0 max_retries = 3 while retry_count < max_retries: try: # Removed compress parameter as it's not supported in your version client = redis.Redis.from_url(REDIS_URL) client.ping() # Test the connection logging.info(f"Connected to Redis at {REDIS_URL}") return client except Exception as e: retry_count += 1 if retry_count < max_retries: logging.warning(f"Redis connection attempt {retry_count} failed: {e}. Retrying...") time.sleep(1) # Wait before retrying else: logging.error(f"Could not connect to Redis after {max_retries} attempts: {e}") return None # Get Redis client with retry logic redis_client = get_redis_client() STATE_KEY = "graph_state" # --- Modified Load Graph State Function --- def load_graph_state(): """Load graph state from Redis with support for the optimized format.""" global arrow_history, hashrate_history, metrics_log if redis_client: try: # Check version to handle format changes version = redis_client.get(f"{STATE_KEY}_version") version = version.decode('utf-8') if version else "1.0" state_json = redis_client.get(STATE_KEY) if state_json: state = json.loads(state_json) # Handle different versions of the data format if version == "2.0": # Optimized format # Restore arrow_history compact_arrow_history = state.get("arrow_history", {}) for key, values in compact_arrow_history.items(): arrow_history[key] = [ {"time": entry.get("t", ""), "value": entry.get("v", 0), "arrow": ""} # Default empty arrow for entry in values ] # Restore hashrate_history hashrate_history = state.get("hashrate_history", []) # Restore metrics_log compact_metrics_log = state.get("metrics_log", []) metrics_log = [] for entry in compact_metrics_log: metrics_log.append({ "timestamp": entry.get("ts", ""), "metrics": entry.get("m", {}) }) else: # Original format arrow_history = state.get("arrow_history", {}) hashrate_history = state.get("hashrate_history", []) metrics_log = state.get("metrics_log", []) logging.info(f"Loaded graph state from Redis (format version {version}).") else: logging.info("No previous graph state found in Redis.") except Exception as e: logging.error(f"Error loading graph state from Redis: {e}") else: logging.info("Redis not available, using in-memory state.") # --- Save Graph State with Advanced Optimizations --- def save_graph_state(): """Save graph state to Redis with optimized frequency, pruning, and data reduction.""" if redis_client: # Check if we've saved recently to avoid too frequent saves # Only save at most once every 5 minutes current_time = time.time() if hasattr(save_graph_state, 'last_save_time') and \ current_time - save_graph_state.last_save_time < 300: # 300 seconds = 5 minutes logging.debug("Skipping Redis save - last save was less than 5 minutes ago") return # Update the last save time save_graph_state.last_save_time = current_time # Prune data first to reduce volume prune_old_data() # Create compact versions of the data structures for Redis storage try: # 1. Create compact arrow_history with minimal data compact_arrow_history = {} for key, values in arrow_history.items(): if isinstance(values, list) and values: # Only store recent history (last 2 hours) recent_values = values[-120:] if len(values) > 120 else values # Use shorter field names and remove unnecessary fields compact_arrow_history[key] = [ {"t": entry["time"], "v": entry["value"]} for entry in recent_values ] # 2. Only keep essential hashrate_history compact_hashrate_history = hashrate_history[-60:] if len(hashrate_history) > 60 else hashrate_history # 3. Only keep recent metrics_log entries (last 30 minutes) # This is typically the largest data structure compact_metrics_log = [] if metrics_log: # Keep only last 30 entries (30 minutes assuming 1-minute updates) recent_logs = metrics_log[-30:] for entry in recent_logs: # Only keep necessary fields from each metrics entry if "metrics" in entry and "timestamp" in entry: metrics_copy = {} original_metrics = entry["metrics"] # Only copy the most important metrics for historical tracking essential_keys = [ "hashrate_60sec", "hashrate_24hr", "btc_price", "workers_hashing", "unpaid_earnings", "difficulty", "network_hashrate", "daily_profit_usd" ] for key in essential_keys: if key in original_metrics: metrics_copy[key] = original_metrics[key] # Skip arrow_history within metrics as we already stored it separately compact_metrics_log.append({ "ts": entry["timestamp"], "m": metrics_copy }) # Create the final state object state = { "arrow_history": compact_arrow_history, "hashrate_history": compact_hashrate_history, "metrics_log": compact_metrics_log } # Convert to JSON once to reuse and measure size state_json = json.dumps(state) data_size_kb = len(state_json) / 1024 # Log data size for monitoring logging.info(f"Saving graph state to Redis: {data_size_kb:.2f} KB (optimized format)") # Only save if data size is reasonable (adjust threshold as needed) if data_size_kb > 2000: # 2MB warning threshold (reduced from 5MB) logging.warning(f"Redis save data size is still large: {data_size_kb:.2f} KB") # Store version info to handle future format changes redis_client.set(f"{STATE_KEY}_version", "2.0") redis_client.set(STATE_KEY, state_json) logging.info(f"Successfully saved graph state to Redis ({data_size_kb:.2f} KB)") except Exception as e: logging.error(f"Error saving graph state to Redis: {e}") else: logging.info("Redis not available, skipping state save.") # Load persisted state on startup. load_graph_state() # --- Clean up old data --- def prune_old_data(): """Remove old data to prevent memory growth with optimized strategy""" global arrow_history, metrics_log with state_lock: # Prune arrow_history with more sophisticated approach for key in arrow_history: if isinstance(arrow_history[key], list): if len(arrow_history[key]) > MAX_HISTORY_ENTRIES: # For most recent data (last hour) - keep every point recent_data = arrow_history[key][-60:] # For older data, reduce resolution by keeping every other point older_data = arrow_history[key][:-60] if len(older_data) > 0: sparse_older_data = [older_data[i] for i in range(0, len(older_data), 2)] arrow_history[key] = sparse_older_data + recent_data else: arrow_history[key] = recent_data logging.info(f"Pruned {key} history from {len(arrow_history[key])} to {len(sparse_older_data + recent_data) if older_data else len(recent_data)} entries") # Prune metrics_log more aggressively if len(metrics_log) > MAX_HISTORY_ENTRIES: # Keep most recent entries at full resolution recent_logs = metrics_log[-60:] # Reduce resolution of older entries older_logs = metrics_log[:-60] if len(older_logs) > 0: sparse_older_logs = [older_logs[i] for i in range(0, len(older_logs), 3)] # Keep every 3rd entry metrics_log = sparse_older_logs + recent_logs logging.info(f"Pruned metrics log from {len(metrics_log)} to {len(sparse_older_logs + recent_logs)} entries") # Free memory more aggressively gc.collect() # Log memory usage after pruning log_memory_usage() # --- State persistence function --- def persist_critical_state(): """Store critical state in Redis for recovery after worker restarts""" if redis_client: try: # Only persist if we have valid data if cached_metrics and cached_metrics.get("server_timestamp"): state = { "cached_metrics_timestamp": cached_metrics.get("server_timestamp"), "last_successful_run": scheduler_last_successful_run, "last_update_time": last_metrics_update_time } redis_client.set("critical_state", json.dumps(state)) logging.info(f"Persisted critical state to Redis, timestamp: {cached_metrics.get('server_timestamp')}") except Exception as e: logging.error(f"Error persisting critical state: {e}") # --- Custom Template Filter --- @app.template_filter('commafy') def commafy(value): try: return "{:,}".format(int(value)) except Exception: return value # --- Configuration Management --- CONFIG_FILE = "config.json" def load_config(): default_config = { "power_cost": 0.0, "power_usage": 0.0, "wallet": "bc1py5zmrtssheq3shd8cptpl5l5m3txxr5afynyg2gyvam6w78s4dlqqnt4v9" } if os.path.exists(CONFIG_FILE): try: with open(CONFIG_FILE, "r") as f: config = json.load(f) return config except Exception as e: logging.error(f"Error loading config: {e}") return default_config # --- Data Structures --- @dataclass class OceanData: pool_total_hashrate: float = None pool_total_hashrate_unit: str = None hashrate_24hr: float = None hashrate_24hr_unit: str = None hashrate_3hr: float = None hashrate_3hr_unit: str = None hashrate_10min: float = None hashrate_10min_unit: str = None hashrate_5min: float = None hashrate_5min_unit: str = None hashrate_60sec: float = None hashrate_60sec_unit: str = None estimated_earnings_per_day: float = None estimated_earnings_next_block: float = None estimated_rewards_in_window: float = None workers_hashing: int = None unpaid_earnings: float = None est_time_to_payout: str = None last_block: str = None last_block_height: str = None last_block_time: str = None blocks_found: str = None total_last_share: str = "N/A" # Field for BTC earned for the last block, now in sats. last_block_earnings: str = None def convert_to_ths(value: float, unit: str) -> float: """Convert any hashrate unit to TH/s equivalent.""" unit = unit.lower() if 'ph/s' in unit: return value * 1000 # 1 PH/s = 1000 TH/s elif 'eh/s' in unit: return value * 1000000 # 1 EH/s = 1,000,000 TH/s elif 'gh/s' in unit: return value / 1000 # 1 TH/s = 1000 GH/s elif 'mh/s' in unit: return value / 1000000 # 1 TH/s = 1,000,000 MH/s elif 'th/s' in unit: return value else: # Log unexpected unit logging.warning(f"Unexpected hashrate unit: {unit}, defaulting to treating as TH/s") return value # --- Data Fetching Functions --- def get_ocean_data(session: requests.Session, wallet: str) -> OceanData: base_url = "https://ocean.xyz" stats_url = f"{base_url}/stats/{wallet}" headers = { 'User-Agent': 'Mozilla/5.0', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Cache-Control': 'no-cache' } # Create an empty data object to populate data = OceanData() try: response = session.get(stats_url, headers=headers, timeout=10) if not response.ok: logging.error(f"Error fetching ocean data: status code {response.status_code}") return None soup = BeautifulSoup(response.text, 'html.parser') # Safely extract pool status information try: pool_status = soup.find("p", id="pool-status-item") if pool_status: text = pool_status.get_text(strip=True) m_total = re.search(r'HASHRATE:\s*([\d\.]+)\s*(\w+/s)', text, re.IGNORECASE) if m_total: raw_val = float(m_total.group(1)) unit = m_total.group(2) data.pool_total_hashrate = raw_val data.pool_total_hashrate_unit = unit span = pool_status.find("span", class_="pool-status-newline") if span: last_block_text = span.get_text(strip=True) m_block = re.search(r'LAST BLOCK:\s*(\d+\s*\(.*\))', last_block_text, re.IGNORECASE) if m_block: full_last_block = m_block.group(1) data.last_block = full_last_block match = re.match(r'(\d+)\s*\((.*?)\)', full_last_block) if match: data.last_block_height = match.group(1) data.last_block_time = match.group(2) else: data.last_block_height = full_last_block data.last_block_time = "" except Exception as e: logging.error(f"Error parsing pool status: {e}") # Parse the earnings value from the earnings table and convert to sats. try: earnings_table = soup.find('tbody', id='earnings-tablerows') if earnings_table: latest_row = earnings_table.find('tr', class_='table-row') if latest_row: cells = latest_row.find_all('td', class_='table-cell') if len(cells) >= 3: earnings_text = cells[2].get_text(strip=True) earnings_value = earnings_text.replace('BTC', '').strip() try: btc_earnings = float(earnings_value) sats = int(round(btc_earnings * 100000000)) data.last_block_earnings = str(sats) except Exception: data.last_block_earnings = earnings_value except Exception as e: logging.error(f"Error parsing earnings data: {e}") # Parse hashrate data from the hashrates table try: time_mapping = { '24 hrs': ('hashrate_24hr', 'hashrate_24hr_unit'), '3 hrs': ('hashrate_3hr', 'hashrate_3hr_unit'), '10 min': ('hashrate_10min', 'hashrate_10min_unit'), '5 min': ('hashrate_5min', 'hashrate_5min_unit'), '60 sec': ('hashrate_60sec', 'hashrate_60sec_unit') } hashrate_table = soup.find('tbody', id='hashrates-tablerows') if hashrate_table: for row in hashrate_table.find_all('tr', class_='table-row'): cells = row.find_all('td', class_='table-cell') if len(cells) >= 2: period_text = cells[0].get_text(strip=True).lower() hashrate_str = cells[1].get_text(strip=True).lower() try: parts = hashrate_str.split() hashrate_val = float(parts[0]) unit = parts[1] if len(parts) > 1 else 'th/s' for key, (attr, unit_attr) in time_mapping.items(): if key.lower() in period_text: setattr(data, attr, hashrate_val) setattr(data, unit_attr, unit) break except Exception as e: logging.error(f"Error parsing hashrate '{hashrate_str}': {e}") except Exception as e: logging.error(f"Error parsing hashrate table: {e}") # Parse lifetime stats data try: lifetime_snap = soup.find('div', id='lifetimesnap-statcards') if lifetime_snap: for container in lifetime_snap.find_all('div', class_='blocks dashboard-container'): label_div = container.find('div', class_='blocks-label') if label_div: label_text = label_div.get_text(strip=True).lower() earnings_span = label_div.find_next('span', class_=lambda x: x != 'tooltiptext') if earnings_span: span_text = earnings_span.get_text(strip=True) try: earnings_value = float(span_text.split()[0].replace(',', '')) if "earnings" in label_text and "day" in label_text: data.estimated_earnings_per_day = earnings_value except Exception: pass except Exception as e: logging.error(f"Error parsing lifetime stats: {e}") # Parse payout stats data try: payout_snap = soup.find('div', id='payoutsnap-statcards') if payout_snap: for container in payout_snap.find_all('div', class_='blocks dashboard-container'): label_div = container.find('div', class_='blocks-label') if label_div: label_text = label_div.get_text(strip=True).lower() earnings_span = label_div.find_next('span', class_=lambda x: x != 'tooltiptext') if earnings_span: span_text = earnings_span.get_text(strip=True) try: earnings_value = float(span_text.split()[0].replace(',', '')) if "earnings" in label_text and "block" in label_text: data.estimated_earnings_next_block = earnings_value elif "rewards" in label_text and "window" in label_text: data.estimated_rewards_in_window = earnings_value except Exception: pass except Exception as e: logging.error(f"Error parsing payout stats: {e}") # Parse user stats data try: usersnap = soup.find('div', id='usersnap-statcards') if usersnap: for container in usersnap.find_all('div', class_='blocks dashboard-container'): label_div = container.find('div', class_='blocks-label') if label_div: label_text = label_div.get_text(strip=True).lower() value_span = label_div.find_next('span', class_=lambda x: x != 'tooltiptext') if value_span: span_text = value_span.get_text(strip=True) if "workers currently hashing" in label_text: try: data.workers_hashing = int(span_text.replace(",", "")) except Exception: pass elif "unpaid earnings" in label_text and "btc" in span_text.lower(): try: data.unpaid_earnings = float(span_text.split()[0].replace(',', '')) except Exception: pass elif "estimated time until minimum payout" in label_text: data.est_time_to_payout = span_text except Exception as e: logging.error(f"Error parsing user stats: {e}") # Parse blocks found data try: blocks_container = soup.find(lambda tag: tag.name == "div" and "blocks found" in tag.get_text(strip=True).lower()) if blocks_container: span = blocks_container.find_next_sibling("span") if span: num_match = re.search(r'(\d+)', span.get_text(strip=True)) if num_match: data.blocks_found = num_match.group(1) except Exception as e: logging.error(f"Error parsing blocks found: {e}") # Parse last share time data try: workers_table = soup.find("tbody", id="workers-tablerows") if workers_table: for row in workers_table.find_all("tr", class_="table-row"): cells = row.find_all("td") if cells and cells[0].get_text(strip=True).lower().startswith("total"): last_share_str = cells[2].get_text(strip=True) try: naive_dt = datetime.strptime(last_share_str, "%Y-%m-%d %H:%M") utc_dt = naive_dt.replace(tzinfo=ZoneInfo("UTC")) la_dt = utc_dt.astimezone(ZoneInfo("America/Los_Angeles")) data.total_last_share = la_dt.strftime("%Y-%m-%d %I:%M %p") except Exception as e: logging.error(f"Error converting last share time '{last_share_str}': {e}") data.total_last_share = last_share_str break except Exception as e: logging.error(f"Error parsing last share time: {e}") return data except Exception as e: logging.error(f"Error fetching Ocean data: {e}") return None def fetch_url(session: requests.Session, url: str, timeout: int = 5): try: return session.get(url, timeout=timeout) except Exception as e: logging.error(f"Error fetching {url}: {e}") return None def get_bitcoin_stats(session: requests.Session, cache_data: dict): """Fetch Bitcoin network statistics with improved error handling and caching.""" urls = { "difficulty": "https://blockchain.info/q/getdifficulty", "hashrate": "https://blockchain.info/q/hashrate", "ticker": "https://blockchain.info/ticker", "blockcount": "https://blockchain.info/q/getblockcount" } # Use previous cached values as defaults if available difficulty = cache_data.get("difficulty") network_hashrate = cache_data.get("network_hashrate") btc_price = cache_data.get("btc_price") block_count = cache_data.get("block_count") try: with ThreadPoolExecutor(max_workers=4) as executor: futures = {key: executor.submit(fetch_url, session, url) for key, url in urls.items()} responses = {key: futures[key].result(timeout=5) for key in futures} # Process each response individually with error handling if responses["difficulty"] and responses["difficulty"].ok: try: difficulty = float(responses["difficulty"].text) cache_data["difficulty"] = difficulty except (ValueError, TypeError) as e: logging.error(f"Error parsing difficulty: {e}") if responses["hashrate"] and responses["hashrate"].ok: try: network_hashrate = float(responses["hashrate"].text) * 1e9 cache_data["network_hashrate"] = network_hashrate except (ValueError, TypeError) as e: logging.error(f"Error parsing network hashrate: {e}") if responses["ticker"] and responses["ticker"].ok: try: ticker_data = responses["ticker"].json() btc_price = float(ticker_data.get("USD", {}).get("last", btc_price)) cache_data["btc_price"] = btc_price except (ValueError, TypeError, json.JSONDecodeError) as e: logging.error(f"Error parsing BTC price: {e}") if responses["blockcount"] and responses["blockcount"].ok: try: block_count = int(responses["blockcount"].text) cache_data["block_count"] = block_count except (ValueError, TypeError) as e: logging.error(f"Error parsing block count: {e}") except Exception as e: logging.error(f"Error fetching Bitcoin stats: {e}") return difficulty, network_hashrate, btc_price, block_count # --- Dashboard Class --- class MiningDashboardWeb: def __init__(self, power_cost, power_usage, wallet): self.power_cost = power_cost self.power_usage = power_usage self.wallet = wallet self.cache = {} self.sats_per_btc = 100_000_000 self.previous_values = {} self.session = requests.Session() def fetch_metrics(self): # Add execution time tracking start_time = time.time() try: with ThreadPoolExecutor(max_workers=2) as executor: future_ocean = executor.submit(get_ocean_data, self.session, self.wallet) future_btc = executor.submit(get_bitcoin_stats, self.session, self.cache) try: ocean_data = future_ocean.result(timeout=15) btc_stats = future_btc.result(timeout=15) except Exception as e: logging.error(f"Error fetching metrics concurrently: {e}") return None if ocean_data is None: logging.error("Failed to retrieve Ocean data") return None difficulty, network_hashrate, btc_price, block_count = btc_stats # If we failed to get network hashrate, use a reasonable default to prevent division by zero if network_hashrate is None: logging.warning("Using default network hashrate") network_hashrate = 500e18 # ~500 EH/s as a reasonable fallback # If we failed to get BTC price, use a reasonable default if btc_price is None: logging.warning("Using default BTC price") btc_price = 75000 # $75,000 as a reasonable fallback # Convert hashrates to a common unit (TH/s) for consistency hr3 = ocean_data.hashrate_3hr or 0 hr3_unit = (ocean_data.hashrate_3hr_unit or 'th/s').lower() local_hashrate = convert_to_ths(hr3, hr3_unit) * 1e12 # Convert to H/s for calculation hash_proportion = local_hashrate / network_hashrate if network_hashrate else 0 block_reward = 3.125 blocks_per_day = 86400 / 600 daily_btc_gross = hash_proportion * block_reward * blocks_per_day daily_btc_net = daily_btc_gross * (1 - 0.02 - 0.028) daily_revenue = round(daily_btc_net * btc_price, 2) if btc_price is not None else None daily_power_cost = round((self.power_usage / 1000) * self.power_cost * 24, 2) daily_profit_usd = round(daily_revenue - daily_power_cost, 2) if daily_revenue is not None else None monthly_profit_usd = round(daily_profit_usd * 30, 2) if daily_profit_usd is not None else None daily_mined_sats = int(round(daily_btc_net * self.sats_per_btc)) monthly_mined_sats = daily_mined_sats * 30 # Use default 0 for earnings if scraping returned None. estimated_earnings_per_day = ocean_data.estimated_earnings_per_day if ocean_data.estimated_earnings_per_day is not None else 0 estimated_earnings_next_block = ocean_data.estimated_earnings_next_block if ocean_data.estimated_earnings_next_block is not None else 0 estimated_rewards_in_window = ocean_data.estimated_rewards_in_window if ocean_data.estimated_rewards_in_window is not None else 0 metrics = { 'pool_total_hashrate': ocean_data.pool_total_hashrate, 'pool_total_hashrate_unit': ocean_data.pool_total_hashrate_unit, 'hashrate_24hr': ocean_data.hashrate_24hr, 'hashrate_24hr_unit': ocean_data.hashrate_24hr_unit, 'hashrate_3hr': ocean_data.hashrate_3hr, 'hashrate_3hr_unit': ocean_data.hashrate_3hr_unit, 'hashrate_10min': ocean_data.hashrate_10min, 'hashrate_10min_unit': ocean_data.hashrate_10min_unit, 'hashrate_5min': ocean_data.hashrate_5min, 'hashrate_5min_unit': ocean_data.hashrate_5min_unit, 'hashrate_60sec': ocean_data.hashrate_60sec, 'hashrate_60sec_unit': ocean_data.hashrate_60sec_unit, 'workers_hashing': ocean_data.workers_hashing, 'btc_price': btc_price, 'block_number': block_count, 'network_hashrate': (network_hashrate / 1e18) if network_hashrate else None, 'difficulty': difficulty, 'daily_btc_net': daily_btc_net, 'estimated_earnings_per_day': estimated_earnings_per_day, 'daily_revenue': daily_revenue, 'daily_power_cost': daily_power_cost, 'daily_profit_usd': daily_profit_usd, 'monthly_profit_usd': monthly_profit_usd, 'daily_mined_sats': daily_mined_sats, 'monthly_mined_sats': monthly_mined_sats, 'estimated_earnings_next_block': estimated_earnings_next_block, 'estimated_rewards_in_window': estimated_rewards_in_window, 'unpaid_earnings': ocean_data.unpaid_earnings, 'est_time_to_payout': ocean_data.est_time_to_payout, 'last_block_height': ocean_data.last_block_height, 'last_block_time': ocean_data.last_block_time, 'total_last_share': ocean_data.total_last_share, 'blocks_found': ocean_data.blocks_found or "0", # Last block earnings (in sats) 'last_block_earnings': ocean_data.last_block_earnings } metrics['estimated_earnings_per_day_sats'] = int(round(estimated_earnings_per_day * self.sats_per_btc)) metrics['estimated_earnings_next_block_sats'] = int(round(estimated_earnings_next_block * self.sats_per_btc)) metrics['estimated_rewards_in_window_sats'] = int(round(estimated_rewards_in_window * self.sats_per_btc)) # Ensure we have at least one data point for history at startup if not arrow_history.get('hashrate_60sec') and ocean_data.hashrate_60sec: logging.info("Initializing hashrate_60sec history with first data point") current_minute = datetime.now(ZoneInfo("America/Los_Angeles")).strftime("%H:%M") current_second = datetime.now(ZoneInfo("America/Los_Angeles")).strftime("%H:%M:%S") if 'hashrate_60sec' not in arrow_history: arrow_history['hashrate_60sec'] = [] # Add a starter point for the chart arrow_history['hashrate_60sec'].append({ "time": current_second, "value": float(ocean_data.hashrate_60sec), "arrow": "" }) # Add a second point slightly offset to ensure chart renders arrow_history['hashrate_60sec'].append({ "time": current_second.replace(current_second[-1], str(int(current_second[-1])+1 % 10)), "value": float(ocean_data.hashrate_60sec), "arrow": "" }) logging.info(f"Added initial data points for chart: {ocean_data.hashrate_60sec} {ocean_data.hashrate_60sec_unit}") arrow_keys = [ "pool_total_hashrate", "hashrate_24hr", "hashrate_3hr", "hashrate_10min", "hashrate_60sec", "block_number", "btc_price", "network_hashrate", "difficulty", "daily_revenue", "daily_power_cost", "daily_profit_usd", "monthly_profit_usd", "daily_mined_sats", "monthly_mined_sats", "unpaid_earnings", "estimated_earnings_per_day_sats", "estimated_earnings_next_block_sats", "estimated_rewards_in_window_sats", "workers_hashing" ] # --- Bucket by second (Los Angeles Time) with thread safety --- current_second = datetime.now(ZoneInfo("America/Los_Angeles")).strftime("%H:%M:%S") with state_lock: for key in arrow_keys: if metrics.get(key) is not None: current_val = metrics[key] arrow = "" if key in arrow_history and arrow_history[key]: previous_val = arrow_history[key][-1]["value"] if current_val > previous_val: arrow = "↑" elif current_val < previous_val: arrow = "↓" if key not in arrow_history: arrow_history[key] = [] if not arrow_history[key] or arrow_history[key][-1]["time"] != current_second: arrow_history[key].append({ "time": current_second, "value": current_val, "arrow": arrow }) else: arrow_history[key][-1]["value"] = current_val arrow_history[key][-1]["arrow"] = arrow # Cap history to three hours worth (180 entries) if len(arrow_history[key]) > MAX_HISTORY_ENTRIES: arrow_history[key] = arrow_history[key][-MAX_HISTORY_ENTRIES:] # --- Aggregate arrow_history by minute for the graph --- aggregated_history = {} for key, entries in arrow_history.items(): minute_groups = {} for entry in entries: minute = entry["time"][:5] # extract HH:MM minute_groups[minute] = entry # take last entry for that minute aggregated_history[key] = list(minute_groups.values()) metrics["arrow_history"] = aggregated_history metrics["history"] = hashrate_history global metrics_log entry = {"timestamp": datetime.now().isoformat(), "metrics": metrics} metrics_log.append(entry) # Cap the metrics log to three hours worth (180 entries) if len(metrics_log) > MAX_HISTORY_ENTRIES: metrics_log = metrics_log[-MAX_HISTORY_ENTRIES:] # --- Add server timestamps to the response in Los Angeles Time --- metrics["server_timestamp"] = datetime.now(ZoneInfo("America/Los_Angeles")).isoformat() metrics["server_start_time"] = SERVER_START_TIME.astimezone(ZoneInfo("America/Los_Angeles")).isoformat() # Log execution time execution_time = time.time() - start_time metrics["execution_time"] = execution_time if execution_time > 10: logging.warning(f"Metrics fetch took {execution_time:.2f} seconds") else: logging.info(f"Metrics fetch completed in {execution_time:.2f} seconds") return metrics except Exception as e: logging.error(f"Unexpected error in fetch_metrics: {e}") return None # --- New Time Endpoint for Fine Syncing --- @app.route("/api/time") def api_time(): return jsonify({ "server_timestamp": datetime.now(ZoneInfo("America/Los_Angeles")).isoformat(), "server_start_time": SERVER_START_TIME.astimezone(ZoneInfo("America/Los_Angeles")).isoformat() }) # --- Fixed SSE Endpoint with proper request context handling --- @app.route('/stream') def stream(): # Important: Capture any request context information BEFORE the generator # This ensures we're not trying to access request outside its context def event_stream(): global active_sse_connections, cached_metrics client_id = None try: # Check if we're at the connection limit with sse_connections_lock: if active_sse_connections >= MAX_SSE_CONNECTIONS: logging.warning(f"Connection limit reached ({MAX_SSE_CONNECTIONS}), refusing new SSE connection") yield f"data: {{\"error\": \"Too many connections, please try again later\", \"retry\": 5000}}\n\n" return active_sse_connections += 1 client_id = f"client-{int(time.time() * 1000) % 10000}" logging.info(f"SSE {client_id}: Connection established (total: {active_sse_connections})") # Set a maximum connection time - increased to 15 minutes for better user experience end_time = time.time() + MAX_SSE_CONNECTION_TIME last_timestamp = None # Send initial data immediately to prevent delay in dashboard updates if cached_metrics: yield f"data: {json.dumps(cached_metrics)}\n\n" last_timestamp = cached_metrics.get("server_timestamp") else: # Send ping if no data available yet yield f"data: {{\"type\": \"ping\", \"client_id\": \"{client_id}\"}}\n\n" # Main event loop with improved error handling while time.time() < end_time: try: # Send data only if it's changed if cached_metrics and cached_metrics.get("server_timestamp") != last_timestamp: data = json.dumps(cached_metrics) last_timestamp = cached_metrics.get("server_timestamp") yield f"data: {data}\n\n" # Send regular pings about every 30 seconds to keep connection alive if int(time.time()) % 30 == 0: yield f"data: {{\"type\": \"ping\", \"time\": {int(time.time())}, \"connections\": {active_sse_connections}}}\n\n" # Sleep to reduce CPU usage time.sleep(1) # Warn client 60 seconds before timeout so client can prepare to reconnect remaining_time = end_time - time.time() if remaining_time < 60 and int(remaining_time) % 15 == 0: # Every 15 sec in last minute yield f"data: {{\"type\": \"timeout_warning\", \"remaining\": {int(remaining_time)}}}\n\n" except Exception as e: logging.error(f"SSE {client_id}: Error in stream: {e}") time.sleep(2) # Prevent tight error loops # Connection timeout reached - send a reconnect instruction to client logging.info(f"SSE {client_id}: Connection timeout reached ({MAX_SSE_CONNECTION_TIME}s)") yield f"data: {{\"type\": \"timeout\", \"message\": \"Connection timeout reached\", \"reconnect\": true}}\n\n" except GeneratorExit: # This is how we detect client disconnection logging.info(f"SSE {client_id}: Client disconnected (GeneratorExit)") # Don't yield here - just let the generator exit normally finally: # Always decrement the connection counter when done with sse_connections_lock: active_sse_connections = max(0, active_sse_connections - 1) logging.info(f"SSE {client_id}: Connection closed (remaining: {active_sse_connections})") # Configure response with improved error handling try: response = Response(event_stream(), mimetype="text/event-stream") response.headers['Cache-Control'] = 'no-cache' response.headers['X-Accel-Buffering'] = 'no' # Disable nginx buffering response.headers['Access-Control-Allow-Origin'] = '*' # Allow CORS return response except Exception as e: logging.error(f"Error creating SSE response: {e}") return jsonify({"error": "Internal server error"}), 500 # Duplicate stream endpoint for the dashboard path @app.route('/dashboard/stream') def dashboard_stream(): """Duplicate of the stream endpoint for the dashboard route.""" return stream() # --- SchedulerWatchdog to monitor and recover --- def scheduler_watchdog(): """Periodically check if the scheduler is running and healthy""" global scheduler, scheduler_last_successful_run try: # If no successful run in past 2 minutes, consider the scheduler dead if (scheduler_last_successful_run is None or time.time() - scheduler_last_successful_run > 120): logging.warning("Scheduler watchdog: No successful runs detected in last 2 minutes") # Check if actual scheduler exists and is reported as running if not scheduler or not getattr(scheduler, 'running', False): logging.error("Scheduler watchdog: Scheduler appears to be dead, recreating") # Use the lock to avoid multiple threads recreating simultaneously with scheduler_recreate_lock: create_scheduler() except Exception as e: logging.error(f"Error in scheduler watchdog: {e}") # --- Create Scheduler --- def create_scheduler(): """Create and configure a new scheduler instance with proper error handling.""" try: # Stop existing scheduler if it exists global scheduler if 'scheduler' in globals() and scheduler: try: # Check if scheduler is running before attempting to shut it down if hasattr(scheduler, 'running') and scheduler.running: logging.info("Shutting down existing scheduler before creating a new one") scheduler.shutdown(wait=False) except Exception as e: logging.error(f"Error shutting down existing scheduler: {e}") # Create a new scheduler with more robust configuration new_scheduler = BackgroundScheduler( job_defaults={ 'coalesce': True, # Combine multiple missed runs into a single one 'max_instances': 1, # Prevent job overlaps 'misfire_grace_time': 30 # Allow misfires up to 30 seconds } ) # Add the update job new_scheduler.add_job( func=update_metrics_job, trigger="interval", seconds=60, id='update_metrics_job', replace_existing=True ) # Add watchdog job - runs every 30 seconds to check scheduler health new_scheduler.add_job( func=scheduler_watchdog, trigger="interval", seconds=30, id='scheduler_watchdog', replace_existing=True ) # Start the scheduler new_scheduler.start() logging.info("Scheduler created and started successfully") return new_scheduler except Exception as e: logging.error(f"Error creating scheduler: {e}") return None # --- Background Job Function With Enhanced Resilience --- def update_metrics_job(): global cached_metrics, last_metrics_update_time, scheduler, scheduler_last_successful_run try: # Check scheduler health - enhanced logic to detect failed executors if not scheduler or not hasattr(scheduler, 'running'): logging.error("Scheduler object is invalid, attempting to recreate") with scheduler_recreate_lock: create_scheduler() return if not scheduler.running: logging.warning("Scheduler stopped unexpectedly, attempting to restart") try: scheduler.start() logging.info("Scheduler restarted successfully") except Exception as e: logging.error(f"Failed to restart scheduler: {e}") # More aggressive recovery - recreate scheduler entirely with scheduler_recreate_lock: create_scheduler() return # Test the scheduler's executor by checking its state - using a safer approach # that doesn't rely on adding a test job (which was failing before) try: # Check if any jobs exist and are scheduled jobs = scheduler.get_jobs() if not jobs: logging.error("No jobs found in scheduler - recreating") with scheduler_recreate_lock: create_scheduler() return # Check if the next run time is set for any job next_runs = [job.next_run_time for job in jobs] if not any(next_runs): logging.error("No jobs with next_run_time found - recreating scheduler") with scheduler_recreate_lock: create_scheduler() return except RuntimeError as e: # Properly handle the "cannot schedule new futures after shutdown" error if "cannot schedule new futures after shutdown" in str(e): logging.error("Detected dead executor, recreating scheduler") with scheduler_recreate_lock: create_scheduler() return # Other errors can pass through except Exception as e: logging.error(f"Error checking scheduler state: {e}") # Continue with job since this might be a different issue # Skip update if the last one was too recent (prevents overlapping runs) current_time = time.time() if last_metrics_update_time and (current_time - last_metrics_update_time < 30): logging.info("Skipping metrics update - previous update too recent") return # Set last update time to now last_metrics_update_time = current_time # Add timeout handling with a timer job_timeout = 45 # seconds # Add success flag job_successful = False def timeout_handler(): if not job_successful: logging.error("Background job timed out after 45 seconds") # Could implement more aggressive recovery here # Set timeout timer timer = threading.Timer(job_timeout, timeout_handler) timer.daemon = True timer.start() try: # Correctly call the dashboard instance's fetch_metrics method metrics = dashboard.fetch_metrics() if metrics: global cached_metrics cached_metrics = metrics logging.info("Background job: Metrics updated successfully") job_successful = True # Mark successful run time for watchdog global scheduler_last_successful_run scheduler_last_successful_run = time.time() # ADD THE FUNCTION CALL HERE persist_critical_state() # Periodically check and prune data to prevent memory growth if current_time % 300 < 60: # Every ~5 minutes prune_old_data() # Only save state to Redis on a similar schedule, not every update if current_time % 300 < 60: # Every ~5 minutes save_graph_state() # Periodic full memory cleanup (every 2 hours) if current_time % 7200 < 60: # Every ~2 hours logging.info("Performing full memory cleanup") gc.collect(generation=2) # Force full collection else: logging.error("Background job: Metrics update returned None") except Exception as e: logging.error(f"Background job: Unexpected error: {e}") # Log traceback for debugging import traceback logging.error(traceback.format_exc()) # Log memory usage when errors occur to help diagnose issues log_memory_usage() finally: # Cancel timer in finally block to ensure it's always canceled timer.cancel() except Exception as e: logging.error(f"Background job: Unhandled exception: {e}") # Log traceback for debugging import traceback logging.error(traceback.format_exc()) # --- Routes --- @app.route("/") def boot(): """Serve the boot sequence page.""" return render_template("boot.html", base_url=request.host_url.rstrip('/')) @app.route("/dashboard") def dashboard(): """Serve the main dashboard page.""" if cached_metrics is None: update_metrics_job() current_time = datetime.now(ZoneInfo("America/Los_Angeles")).strftime("%Y-%m-%d %I:%M:%S %p") return render_template("index.html", metrics=cached_metrics, current_time=current_time) @app.route("/api/metrics") def api_metrics(): if cached_metrics is None: update_metrics_job() return jsonify(cached_metrics) # Health check endpoint with detailed diagnostics @app.route("/api/health") def health_check(): """Health check endpoint with enhanced system diagnostics.""" # Calculate uptime uptime_seconds = (datetime.now(ZoneInfo("America/Los_Angeles")) - SERVER_START_TIME).total_seconds() # Get process memory usage try: process = psutil.Process(os.getpid()) mem_info = process.memory_info() memory_usage_mb = mem_info.rss / 1024 / 1024 memory_percent = process.memory_percent() except Exception as e: logging.error(f"Error getting memory usage: {e}") memory_usage_mb = 0 memory_percent = 0 # Check data freshness data_age = 0 if cached_metrics and cached_metrics.get("server_timestamp"): try: last_update = datetime.fromisoformat(cached_metrics["server_timestamp"]) data_age = (datetime.now(ZoneInfo("America/Los_Angeles")) - last_update).total_seconds() except Exception as e: logging.error(f"Error calculating data age: {e}") # Determine health status health_status = "healthy" if data_age > 300: # Data older than 5 minutes health_status = "degraded" if not cached_metrics: health_status = "unhealthy" # Build response with detailed diagnostics status = { "status": health_status, "uptime": uptime_seconds, "uptime_formatted": f"{int(uptime_seconds // 3600)}h {int((uptime_seconds % 3600) // 60)}m {int(uptime_seconds % 60)}s", "connections": active_sse_connections, "memory": { "usage_mb": round(memory_usage_mb, 2), "percent": round(memory_percent, 2) }, "data": { "last_update": cached_metrics.get("server_timestamp") if cached_metrics else None, "age_seconds": int(data_age), "available": cached_metrics is not None }, "scheduler": { "running": scheduler.running if hasattr(scheduler, "running") else False, "last_successful_run": scheduler_last_successful_run }, "redis": { "connected": redis_client is not None }, "timestamp": datetime.now(ZoneInfo("America/Los_Angeles")).isoformat() } # Log health check if status is not healthy if health_status != "healthy": logging.warning(f"Health check returning {health_status} status: {status}") return jsonify(status) # Add enhanced scheduler health check endpoint @app.route("/api/scheduler-health") def scheduler_health(): try: scheduler_status = { "running": scheduler.running if hasattr(scheduler, "running") else False, "job_count": len(scheduler.get_jobs()) if hasattr(scheduler, "get_jobs") else 0, "next_run": str(scheduler.get_jobs()[0].next_run_time) if hasattr(scheduler, "get_jobs") and scheduler.get_jobs() else None, "last_update": last_metrics_update_time, "time_since_update": time.time() - last_metrics_update_time if last_metrics_update_time else None, "last_successful_run": scheduler_last_successful_run, "time_since_successful": time.time() - scheduler_last_successful_run if scheduler_last_successful_run else None } return jsonify(scheduler_status) except Exception as e: return jsonify({"error": str(e)}), 500 # Add a health check route that can attempt to fix the scheduler if needed @app.route("/api/fix-scheduler", methods=["POST"]) def fix_scheduler(): try: with scheduler_recreate_lock: new_scheduler = create_scheduler() if new_scheduler: global scheduler scheduler = new_scheduler return jsonify({"status": "success", "message": "Scheduler recreated successfully"}) else: return jsonify({"status": "error", "message": "Failed to recreate scheduler"}), 500 except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @app.errorhandler(404) def page_not_found(e): return render_template("error.html", message="Page not found."), 404 @app.errorhandler(500) def internal_server_error(e): logging.error("Internal server error: %s", e) return render_template("error.html", message="Internal server error."), 500 @app.route("/api/force-refresh", methods=["POST"]) def force_refresh(): """Emergency endpoint to force metrics refresh.""" logging.warning("Emergency force-refresh requested") try: # Force fetch new metrics metrics = dashboard.fetch_metrics() if metrics: global cached_metrics, scheduler_last_successful_run cached_metrics = metrics scheduler_last_successful_run = time.time() logging.info(f"Force refresh successful, new timestamp: {metrics['server_timestamp']}") return jsonify({"status": "success", "message": "Metrics refreshed", "timestamp": metrics['server_timestamp']}) else: return jsonify({"status": "error", "message": "Failed to fetch metrics"}), 500 except Exception as e: logging.error(f"Force refresh error: {e}") return jsonify({"status": "error", "message": str(e)}), 500 class RobustMiddleware: def __init__(self, app): self.app = app def __call__(self, environ, start_response): try: return self.app(environ, start_response) except Exception as e: logging.exception("Unhandled exception in WSGI app") start_response("500 Internal Server Error", [("Content-Type", "text/html")]) return [b"