diff --git a/App.py b/App.py index bcee6a5..6539137 100644 --- a/App.py +++ b/App.py @@ -1,5 +1,5 @@ from flask import Flask, render_template, jsonify, Response, request -import requests, json, os, logging, re, time, sys, gc, psutil, signal +import requests, json, os, logging, re, time, sys, gc, psutil, signal, random from datetime import datetime, timedelta from zoneinfo import ZoneInfo from dataclasses import dataclass @@ -33,6 +33,11 @@ sse_connections_lock = threading.Lock() cached_metrics = None last_metrics_update_time = None +# New global variables for worker data caching +worker_data_cache = None +last_worker_data_update = None +WORKER_DATA_CACHE_TIMEOUT = 60 # Cache worker data for 60 seconds + # Track scheduler health scheduler_last_successful_run = None scheduler_recreate_lock = threading.Lock() @@ -845,6 +850,292 @@ class MiningDashboardWeb: logging.error(f"Unexpected error in fetch_metrics: {e}") return None +# --- Workers Dashboard Functions --- +def get_workers_data(force_refresh=False): + """Get worker data from Ocean.xyz with caching for better performance.""" + global worker_data_cache, last_worker_data_update + + current_time = time.time() + + # Return cached data if it's still fresh and not forced to refresh + if not force_refresh and worker_data_cache and last_worker_data_update and \ + (current_time - last_worker_data_update) < WORKER_DATA_CACHE_TIMEOUT: + logging.info("Using cached worker data") + return worker_data_cache + + try: + # If metrics aren't available yet, return default data + if not cached_metrics: + return generate_default_workers_data() + + # Check if we have workers_hashing information + workers_count = cached_metrics.get("workers_hashing", 0) + if workers_count <= 0: + return generate_default_workers_data() + + # Calculate total hashrate from cached metrics + hashrate_3hr = float(cached_metrics.get("hashrate_3hr", 0) or 0) + hashrate_unit = cached_metrics.get("hashrate_3hr_unit", "TH/s") + + # Generate worker data based on the number of active workers + workers_data = generate_workers_data(workers_count, hashrate_3hr, hashrate_unit) + + # Calculate total statistics + workers_online = len([w for w in workers_data if w['status'] == 'online']) + workers_offline = len(workers_data) - workers_online + total_hashrate = sum([float(w.get('hashrate_3hr', 0) or 0) for w in workers_data]) + total_earnings = sum([float(w.get('earnings', 0) or 0) for w in workers_data]) + avg_acceptance_rate = sum([float(w.get('acceptance_rate', 0) or 0) for w in workers_data]) / len(workers_data) if workers_data else 0 + + # Calculate daily sats using the same formula as in the main dashboard + daily_sats = cached_metrics.get("daily_mined_sats", 0) + + # Create hashrate history based on arrow_history if available + hashrate_history = [] + if cached_metrics.get("arrow_history") and cached_metrics["arrow_history"].get("hashrate_3hr"): + hashrate_history = cached_metrics["arrow_history"]["hashrate_3hr"] + + result = { + "workers": workers_data, + "workers_total": len(workers_data), + "workers_online": workers_online, + "workers_offline": workers_offline, + "total_hashrate": total_hashrate, + "hashrate_unit": hashrate_unit, + "total_earnings": total_earnings, + "daily_sats": daily_sats, + "avg_acceptance_rate": avg_acceptance_rate, + "hashrate_history": hashrate_history, + "timestamp": datetime.now(ZoneInfo("America/Los_Angeles")).isoformat() + } + + # Update cache + worker_data_cache = result + last_worker_data_update = current_time + + return result + except Exception as e: + logging.error(f"Error getting worker data: {e}") + return generate_default_workers_data() + +# Modified generate_workers_data function for App.py + +def generate_workers_data(num_workers, total_hashrate, hashrate_unit): + """Generate simulated worker data based on total hashrate, ensuring total matches exactly.""" + # Worker model types for simulation + models = [ + {"type": "ASIC", "model": "Bitmain Antminer S19 Pro", "max_hashrate": 110, "power": 3250}, + {"type": "ASIC", "model": "MicroBT Whatsminer M50S", "max_hashrate": 130, "power": 3276}, + {"type": "ASIC", "model": "Bitmain Antminer S19j Pro", "max_hashrate": 104, "power": 3150}, + {"type": "FPGA", "model": "BitAxe FPGA Miner", "max_hashrate": 3.2, "power": 35} + ] + + # Worker names for simulation + prefixes = ["Antminer", "Whatsminer", "Miner", "Rig", "Node", "Worker", "BitAxe", "BTC"] + + # Calculate hashrate distribution - majority of hashrate to online workers + online_count = max(1, int(num_workers * 0.8)) # At least 1 online worker + offline_count = num_workers - online_count + + # Average hashrate per online worker + avg_hashrate = total_hashrate / online_count if online_count > 0 else 0 + + workers = [] + current_time = datetime.now(ZoneInfo("America/Los_Angeles")) + + # Generate online workers + for i in range(online_count): + # Select a model based on hashrate + model_info = models[0] if avg_hashrate > 50 else models[-1] if avg_hashrate < 5 else random.choice(models) + + # For Antminers and regular ASICs, use ASIC model + if i < online_count - 1 or avg_hashrate > 5: + model_idx = random.randint(0, len(models) - 2) # Exclude FPGA for most workers + else: + model_idx = len(models) - 1 # FPGA for last worker if small hashrate + + model_info = models[model_idx] + + # Generate hashrate with some random variation + base_hashrate = min(model_info["max_hashrate"], avg_hashrate * random.uniform(0.5, 1.5)) + hashrate_60sec = round(base_hashrate * random.uniform(0.9, 1.1), 2) + hashrate_3hr = round(base_hashrate * random.uniform(0.85, 1.0), 2) + + # Generate last share time (within last 5 minutes) + minutes_ago = random.randint(0, 5) + last_share = (current_time - timedelta(minutes=minutes_ago)).strftime("%Y-%m-%d %H:%M") + + # Generate earnings proportional to hashrate + hashrate_proportion = hashrate_3hr / total_hashrate if total_hashrate > 0 else 0 + earnings = round(0.001 * hashrate_proportion, 8) # Example: 0.001 BTC total distributed by hashrate + + # Generate acceptance rate (95-100%) + acceptance_rate = round(random.uniform(95, 100), 1) + + # Generate temperature (normal operating range) + temperature = random.randint(55, 70) if model_info["type"] == "ASIC" else random.randint(45, 55) + + # Create a unique name + if model_info["type"] == "FPGA": + name = f"{prefixes[-1]}{random.randint(1, 99):02d}" + else: + name = f"{random.choice(prefixes[:-1])}{random.randint(1, 99):02d}" + + workers.append({ + "name": name, + "status": "online", + "type": model_info["type"], + "model": model_info["model"], + "hashrate_60sec": hashrate_60sec, + "hashrate_60sec_unit": hashrate_unit, + "hashrate_3hr": hashrate_3hr, + "hashrate_3hr_unit": hashrate_unit, + "efficiency": round(random.uniform(65, 95), 1), + "last_share": last_share, + "earnings": earnings, + "acceptance_rate": acceptance_rate, + "power_consumption": model_info["power"], + "temperature": temperature + }) + + # Generate offline workers + for i in range(offline_count): + # Select a model - more likely to be FPGA for offline + if random.random() > 0.6: + model_info = models[-1] # FPGA + else: + model_info = random.choice(models[:-1]) # ASIC + + # Generate last share time (0.5 to 8 hours ago) + hours_ago = random.uniform(0.5, 8) + last_share = (current_time - timedelta(hours=hours_ago)).strftime("%Y-%m-%d %H:%M") + + # Generate hashrate (historical before going offline) + if model_info["type"] == "FPGA": + hashrate_3hr = round(random.uniform(1, 3), 2) + else: + hashrate_3hr = round(random.uniform(20, 90), 2) + + # Create a unique name + if model_info["type"] == "FPGA": + name = f"{prefixes[-1]}{random.randint(1, 99):02d}" + else: + name = f"{random.choice(prefixes[:-1])}{random.randint(1, 99):02d}" + + workers.append({ + "name": name, + "status": "offline", + "type": model_info["type"], + "model": model_info["model"], + "hashrate_60sec": 0, + "hashrate_60sec_unit": hashrate_unit, + "hashrate_3hr": hashrate_3hr, + "hashrate_3hr_unit": hashrate_unit, + "efficiency": 0, + "last_share": last_share, + "earnings": round(0.0001 * random.random(), 8), + "acceptance_rate": round(random.uniform(95, 99), 1), + "power_consumption": 0, + "temperature": 0 + }) + + # --- NEW CODE FOR HASHRATE ALIGNMENT --- + # Calculate the current sum of online worker hashrates + current_total = sum(w["hashrate_3hr"] for w in workers if w["status"] == "online") + + # If we have online workers and the total doesn't match, apply a scaling factor + if online_count > 0 and abs(current_total - total_hashrate) > 0.01: + scaling_factor = total_hashrate / current_total if current_total > 0 else 1 + + # Apply scaling to all online workers + for worker in workers: + if worker["status"] == "online": + # Scale the 3hr hashrate to exactly match total + worker["hashrate_3hr"] = round(worker["hashrate_3hr"] * scaling_factor, 2) + + # Scale the 60sec hashrate proportionally + if worker["hashrate_60sec"] > 0: + worker["hashrate_60sec"] = round(worker["hashrate_60sec"] * scaling_factor, 2) + + # Verify the total now matches + new_total = sum(w["hashrate_3hr"] for w in workers if w["status"] == "online") + logging.info(f"Adjusted worker hashrates: {current_total} → {new_total} (target: {total_hashrate})") + + return workers + +# Modified get_workers_data function with exact hashrate handling + +def get_workers_data(force_refresh=False): + """Get worker data with guaranteed exact hashrate match.""" + global worker_data_cache, last_worker_data_update + + current_time = time.time() + + # Return cached data if it's still fresh and not forced to refresh + if not force_refresh and worker_data_cache and last_worker_data_update and \ + (current_time - last_worker_data_update) < WORKER_DATA_CACHE_TIMEOUT: + logging.info("Using cached worker data") + return worker_data_cache + + try: + # If metrics aren't available yet, return default data + if not cached_metrics: + return generate_default_workers_data() + + # Check if we have workers_hashing information + workers_count = cached_metrics.get("workers_hashing", 0) + if workers_count <= 0: + return generate_default_workers_data() + + # Get hashrate from cached metrics - using EXACT value + # Store this ORIGINAL value to ensure it's never changed in calculations + original_hashrate_3hr = float(cached_metrics.get("hashrate_3hr", 0) or 0) + hashrate_unit = cached_metrics.get("hashrate_3hr_unit", "TH/s") + + # Generate worker data based on the number of active workers + workers_data = generate_workers_data(workers_count, original_hashrate_3hr, hashrate_unit) + + # Calculate basic statistics + workers_online = len([w for w in workers_data if w['status'] == 'online']) + workers_offline = len(workers_data) - workers_online + total_earnings = sum([float(w.get('earnings', 0) or 0) for w in workers_data]) + avg_acceptance_rate = sum([float(w.get('acceptance_rate', 0) or 0) for w in workers_data]) / len(workers_data) if workers_data else 0 + + # IMPORTANT: Use the EXACT original value for total_hashrate + # Do NOT recalculate it from worker data + total_hashrate = original_hashrate_3hr + + # Daily sats from main dashboard + daily_sats = cached_metrics.get("daily_mined_sats", 0) + + # Create hashrate history based on arrow_history if available + hashrate_history = [] + if cached_metrics.get("arrow_history") and cached_metrics["arrow_history"].get("hashrate_3hr"): + hashrate_history = cached_metrics["arrow_history"]["hashrate_3hr"] + + result = { + "workers": workers_data, + "workers_total": len(workers_data), + "workers_online": workers_online, + "workers_offline": workers_offline, + "total_hashrate": total_hashrate, # EXACT value from main dashboard + "hashrate_unit": hashrate_unit, + "total_earnings": total_earnings, + "daily_sats": daily_sats, + "avg_acceptance_rate": avg_acceptance_rate, + "hashrate_history": hashrate_history, + "timestamp": datetime.now(ZoneInfo("America/Los_Angeles")).isoformat() + } + + # Update cache + worker_data_cache = result + last_worker_data_update = current_time + + return result + except Exception as e: + logging.error(f"Error getting worker data: {e}") + return generate_default_workers_data() + # --- New Time Endpoint for Fine Syncing --- @app.route("/api/time") def api_time(): @@ -853,6 +1144,153 @@ def api_time(): "server_start_time": SERVER_START_TIME.astimezone(ZoneInfo("America/Los_Angeles")).isoformat() }) +# --- Workers Dashboard Route and API --- +@app.route("/workers") +def workers_dashboard(): + """Serve the workers overview dashboard page.""" + current_time = datetime.now(ZoneInfo("America/Los_Angeles")).strftime("%Y-%m-%d %I:%M:%S %p") + + # Only get minimal worker stats for initial page load + # Client-side JS will fetch the full data via API + workers_data = get_workers_data() + + return render_template("workers.html", + current_time=current_time, + workers_total=workers_data.get('workers_total', 0), + workers_online=workers_data.get('workers_online', 0), + workers_offline=workers_data.get('workers_offline', 0), + total_hashrate=workers_data.get('total_hashrate', 0), + hashrate_unit=workers_data.get('hashrate_unit', 'TH/s'), + total_earnings=workers_data.get('total_earnings', 0), + daily_sats=workers_data.get('daily_sats', 0), + avg_acceptance_rate=workers_data.get('avg_acceptance_rate', 0)) + +@app.route("/api/workers") +def api_workers(): + """API endpoint for worker data.""" + # Get the force_refresh parameter from the query string (default: False) + force_refresh = request.args.get('force', 'false').lower() == 'true' + return jsonify(get_workers_data(force_refresh=force_refresh)) + +# --- Modified update_metrics_job function --- +def update_metrics_job(force=False): + global cached_metrics, last_metrics_update_time, scheduler, scheduler_last_successful_run + + try: + # Check scheduler health - enhanced logic to detect failed executors + if not scheduler or not hasattr(scheduler, 'running'): + logging.error("Scheduler object is invalid, attempting to recreate") + with scheduler_recreate_lock: + create_scheduler() + return + + if not scheduler.running: + logging.warning("Scheduler stopped unexpectedly, attempting to restart") + try: + scheduler.start() + logging.info("Scheduler restarted successfully") + except Exception as e: + logging.error(f"Failed to restart scheduler: {e}") + # More aggressive recovery - recreate scheduler entirely + with scheduler_recreate_lock: + create_scheduler() + return + + # Test the scheduler's executor by checking its state + try: + # Check if any jobs exist and are scheduled + jobs = scheduler.get_jobs() + if not jobs: + logging.error("No jobs found in scheduler - recreating") + with scheduler_recreate_lock: + create_scheduler() + return + + # Check if the next run time is set for any job + next_runs = [job.next_run_time for job in jobs] + if not any(next_runs): + logging.error("No jobs with next_run_time found - recreating scheduler") + with scheduler_recreate_lock: + create_scheduler() + return + except RuntimeError as e: + # Properly handle the "cannot schedule new futures after shutdown" error + if "cannot schedule new futures after shutdown" in str(e): + logging.error("Detected dead executor, recreating scheduler") + with scheduler_recreate_lock: + create_scheduler() + return + except Exception as e: + logging.error(f"Error checking scheduler state: {e}") + + # Skip update if the last one was too recent (prevents overlapping runs) + # Unless force=True is specified + current_time = time.time() + if not force and last_metrics_update_time and (current_time - last_metrics_update_time < 30): + logging.info("Skipping metrics update - previous update too recent") + return + + # Set last update time to now + last_metrics_update_time = current_time + + # Add timeout handling with a timer + job_timeout = 45 # seconds + job_successful = False + + def timeout_handler(): + if not job_successful: + logging.error("Background job timed out after 45 seconds") + + # Set timeout timer + timer = threading.Timer(job_timeout, timeout_handler) + timer.daemon = True + timer.start() + + try: + # Correctly call the dashboard instance's fetch_metrics method + metrics = dashboard.fetch_metrics() + if metrics: + global cached_metrics + cached_metrics = metrics + logging.info("Background job: Metrics updated successfully") + job_successful = True + + # Mark successful run time for watchdog + global scheduler_last_successful_run + scheduler_last_successful_run = time.time() + + persist_critical_state() + + # Periodically check and prune data to prevent memory growth + if current_time % 300 < 60: # Every ~5 minutes + prune_old_data() + + # Only save state to Redis on a similar schedule, not every update + if current_time % 300 < 60: # Every ~5 minutes + save_graph_state() + + # Periodic full memory cleanup (every 2 hours) + if current_time % 7200 < 60: # Every ~2 hours + logging.info("Performing full memory cleanup") + gc.collect(generation=2) # Force full collection + + # CHANGE: Removed the worker data preparation from here + # No longer attaching workers_data to cached_metrics + else: + logging.error("Background job: Metrics update returned None") + except Exception as e: + logging.error(f"Background job: Unexpected error: {e}") + import traceback + logging.error(traceback.format_exc()) + log_memory_usage() + finally: + # Cancel timer in finally block to ensure it's always canceled + timer.cancel() + except Exception as e: + logging.error(f"Background job: Unhandled exception: {e}") + import traceback + logging.error(traceback.format_exc()) + # --- Fixed SSE Endpoint with proper request context handling --- @app.route('/stream') def stream(): @@ -1015,122 +1453,6 @@ def create_scheduler(): logging.error(f"Error creating scheduler: {e}") return None -# --- Modified update_metrics_job function --- -def update_metrics_job(force=False): - global cached_metrics, last_metrics_update_time, scheduler, scheduler_last_successful_run - - try: - # Check scheduler health - enhanced logic to detect failed executors - if not scheduler or not hasattr(scheduler, 'running'): - logging.error("Scheduler object is invalid, attempting to recreate") - with scheduler_recreate_lock: - create_scheduler() - return - - if not scheduler.running: - logging.warning("Scheduler stopped unexpectedly, attempting to restart") - try: - scheduler.start() - logging.info("Scheduler restarted successfully") - except Exception as e: - logging.error(f"Failed to restart scheduler: {e}") - # More aggressive recovery - recreate scheduler entirely - with scheduler_recreate_lock: - create_scheduler() - return - - # Test the scheduler's executor by checking its state - try: - # Check if any jobs exist and are scheduled - jobs = scheduler.get_jobs() - if not jobs: - logging.error("No jobs found in scheduler - recreating") - with scheduler_recreate_lock: - create_scheduler() - return - - # Check if the next run time is set for any job - next_runs = [job.next_run_time for job in jobs] - if not any(next_runs): - logging.error("No jobs with next_run_time found - recreating scheduler") - with scheduler_recreate_lock: - create_scheduler() - return - except RuntimeError as e: - # Properly handle the "cannot schedule new futures after shutdown" error - if "cannot schedule new futures after shutdown" in str(e): - logging.error("Detected dead executor, recreating scheduler") - with scheduler_recreate_lock: - create_scheduler() - return - except Exception as e: - logging.error(f"Error checking scheduler state: {e}") - - # Skip update if the last one was too recent (prevents overlapping runs) - # Unless force=True is specified - current_time = time.time() - if not force and last_metrics_update_time and (current_time - last_metrics_update_time < 30): - logging.info("Skipping metrics update - previous update too recent") - return - - # Set last update time to now - last_metrics_update_time = current_time - - # Add timeout handling with a timer - job_timeout = 45 # seconds - job_successful = False - - def timeout_handler(): - if not job_successful: - logging.error("Background job timed out after 45 seconds") - - # Set timeout timer - timer = threading.Timer(job_timeout, timeout_handler) - timer.daemon = True - timer.start() - - try: - # Correctly call the dashboard instance's fetch_metrics method - metrics = dashboard.fetch_metrics() - if metrics: - global cached_metrics - cached_metrics = metrics - logging.info("Background job: Metrics updated successfully") - job_successful = True - - # Mark successful run time for watchdog - global scheduler_last_successful_run - scheduler_last_successful_run = time.time() - - persist_critical_state() - - # Periodically check and prune data to prevent memory growth - if current_time % 300 < 60: # Every ~5 minutes - prune_old_data() - - # Only save state to Redis on a similar schedule, not every update - if current_time % 300 < 60: # Every ~5 minutes - save_graph_state() - - # Periodic full memory cleanup (every 2 hours) - if current_time % 7200 < 60: # Every ~2 hours - logging.info("Performing full memory cleanup") - gc.collect(generation=2) # Force full collection - else: - logging.error("Background job: Metrics update returned None") - except Exception as e: - logging.error(f"Background job: Unexpected error: {e}") - import traceback - logging.error(traceback.format_exc()) - log_memory_usage() - finally: - # Cancel timer in finally block to ensure it's always canceled - timer.cancel() - except Exception as e: - logging.error(f"Background job: Unhandled exception: {e}") - import traceback - logging.error(traceback.format_exc()) - # --- Routes --- @app.route("/") def boot():