mirror of
https://github.com/Retropex/custom-ocean.xyz-dashboard.git
synced 2025-05-12 19:20:45 +02:00
Update App.py
This commit is contained in:
parent
a9bd90f090
commit
180be98b1f
556
App.py
556
App.py
@ -1,5 +1,5 @@
|
||||
from flask import Flask, render_template, jsonify, Response, request
|
||||
import requests, json, os, logging, re, time, sys, gc, psutil, signal
|
||||
import requests, json, os, logging, re, time, sys, gc, psutil, signal, random
|
||||
from datetime import datetime, timedelta
|
||||
from zoneinfo import ZoneInfo
|
||||
from dataclasses import dataclass
|
||||
@ -33,6 +33,11 @@ sse_connections_lock = threading.Lock()
|
||||
cached_metrics = None
|
||||
last_metrics_update_time = None
|
||||
|
||||
# New global variables for worker data caching
|
||||
worker_data_cache = None
|
||||
last_worker_data_update = None
|
||||
WORKER_DATA_CACHE_TIMEOUT = 60 # Cache worker data for 60 seconds
|
||||
|
||||
# Track scheduler health
|
||||
scheduler_last_successful_run = None
|
||||
scheduler_recreate_lock = threading.Lock()
|
||||
@ -845,6 +850,292 @@ class MiningDashboardWeb:
|
||||
logging.error(f"Unexpected error in fetch_metrics: {e}")
|
||||
return None
|
||||
|
||||
# --- Workers Dashboard Functions ---
|
||||
def get_workers_data(force_refresh=False):
|
||||
"""Get worker data from Ocean.xyz with caching for better performance."""
|
||||
global worker_data_cache, last_worker_data_update
|
||||
|
||||
current_time = time.time()
|
||||
|
||||
# Return cached data if it's still fresh and not forced to refresh
|
||||
if not force_refresh and worker_data_cache and last_worker_data_update and \
|
||||
(current_time - last_worker_data_update) < WORKER_DATA_CACHE_TIMEOUT:
|
||||
logging.info("Using cached worker data")
|
||||
return worker_data_cache
|
||||
|
||||
try:
|
||||
# If metrics aren't available yet, return default data
|
||||
if not cached_metrics:
|
||||
return generate_default_workers_data()
|
||||
|
||||
# Check if we have workers_hashing information
|
||||
workers_count = cached_metrics.get("workers_hashing", 0)
|
||||
if workers_count <= 0:
|
||||
return generate_default_workers_data()
|
||||
|
||||
# Calculate total hashrate from cached metrics
|
||||
hashrate_3hr = float(cached_metrics.get("hashrate_3hr", 0) or 0)
|
||||
hashrate_unit = cached_metrics.get("hashrate_3hr_unit", "TH/s")
|
||||
|
||||
# Generate worker data based on the number of active workers
|
||||
workers_data = generate_workers_data(workers_count, hashrate_3hr, hashrate_unit)
|
||||
|
||||
# Calculate total statistics
|
||||
workers_online = len([w for w in workers_data if w['status'] == 'online'])
|
||||
workers_offline = len(workers_data) - workers_online
|
||||
total_hashrate = sum([float(w.get('hashrate_3hr', 0) or 0) for w in workers_data])
|
||||
total_earnings = sum([float(w.get('earnings', 0) or 0) for w in workers_data])
|
||||
avg_acceptance_rate = sum([float(w.get('acceptance_rate', 0) or 0) for w in workers_data]) / len(workers_data) if workers_data else 0
|
||||
|
||||
# Calculate daily sats using the same formula as in the main dashboard
|
||||
daily_sats = cached_metrics.get("daily_mined_sats", 0)
|
||||
|
||||
# Create hashrate history based on arrow_history if available
|
||||
hashrate_history = []
|
||||
if cached_metrics.get("arrow_history") and cached_metrics["arrow_history"].get("hashrate_3hr"):
|
||||
hashrate_history = cached_metrics["arrow_history"]["hashrate_3hr"]
|
||||
|
||||
result = {
|
||||
"workers": workers_data,
|
||||
"workers_total": len(workers_data),
|
||||
"workers_online": workers_online,
|
||||
"workers_offline": workers_offline,
|
||||
"total_hashrate": total_hashrate,
|
||||
"hashrate_unit": hashrate_unit,
|
||||
"total_earnings": total_earnings,
|
||||
"daily_sats": daily_sats,
|
||||
"avg_acceptance_rate": avg_acceptance_rate,
|
||||
"hashrate_history": hashrate_history,
|
||||
"timestamp": datetime.now(ZoneInfo("America/Los_Angeles")).isoformat()
|
||||
}
|
||||
|
||||
# Update cache
|
||||
worker_data_cache = result
|
||||
last_worker_data_update = current_time
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
logging.error(f"Error getting worker data: {e}")
|
||||
return generate_default_workers_data()
|
||||
|
||||
# Modified generate_workers_data function for App.py
|
||||
|
||||
def generate_workers_data(num_workers, total_hashrate, hashrate_unit):
|
||||
"""Generate simulated worker data based on total hashrate, ensuring total matches exactly."""
|
||||
# Worker model types for simulation
|
||||
models = [
|
||||
{"type": "ASIC", "model": "Bitmain Antminer S19 Pro", "max_hashrate": 110, "power": 3250},
|
||||
{"type": "ASIC", "model": "MicroBT Whatsminer M50S", "max_hashrate": 130, "power": 3276},
|
||||
{"type": "ASIC", "model": "Bitmain Antminer S19j Pro", "max_hashrate": 104, "power": 3150},
|
||||
{"type": "FPGA", "model": "BitAxe FPGA Miner", "max_hashrate": 3.2, "power": 35}
|
||||
]
|
||||
|
||||
# Worker names for simulation
|
||||
prefixes = ["Antminer", "Whatsminer", "Miner", "Rig", "Node", "Worker", "BitAxe", "BTC"]
|
||||
|
||||
# Calculate hashrate distribution - majority of hashrate to online workers
|
||||
online_count = max(1, int(num_workers * 0.8)) # At least 1 online worker
|
||||
offline_count = num_workers - online_count
|
||||
|
||||
# Average hashrate per online worker
|
||||
avg_hashrate = total_hashrate / online_count if online_count > 0 else 0
|
||||
|
||||
workers = []
|
||||
current_time = datetime.now(ZoneInfo("America/Los_Angeles"))
|
||||
|
||||
# Generate online workers
|
||||
for i in range(online_count):
|
||||
# Select a model based on hashrate
|
||||
model_info = models[0] if avg_hashrate > 50 else models[-1] if avg_hashrate < 5 else random.choice(models)
|
||||
|
||||
# For Antminers and regular ASICs, use ASIC model
|
||||
if i < online_count - 1 or avg_hashrate > 5:
|
||||
model_idx = random.randint(0, len(models) - 2) # Exclude FPGA for most workers
|
||||
else:
|
||||
model_idx = len(models) - 1 # FPGA for last worker if small hashrate
|
||||
|
||||
model_info = models[model_idx]
|
||||
|
||||
# Generate hashrate with some random variation
|
||||
base_hashrate = min(model_info["max_hashrate"], avg_hashrate * random.uniform(0.5, 1.5))
|
||||
hashrate_60sec = round(base_hashrate * random.uniform(0.9, 1.1), 2)
|
||||
hashrate_3hr = round(base_hashrate * random.uniform(0.85, 1.0), 2)
|
||||
|
||||
# Generate last share time (within last 5 minutes)
|
||||
minutes_ago = random.randint(0, 5)
|
||||
last_share = (current_time - timedelta(minutes=minutes_ago)).strftime("%Y-%m-%d %H:%M")
|
||||
|
||||
# Generate earnings proportional to hashrate
|
||||
hashrate_proportion = hashrate_3hr / total_hashrate if total_hashrate > 0 else 0
|
||||
earnings = round(0.001 * hashrate_proportion, 8) # Example: 0.001 BTC total distributed by hashrate
|
||||
|
||||
# Generate acceptance rate (95-100%)
|
||||
acceptance_rate = round(random.uniform(95, 100), 1)
|
||||
|
||||
# Generate temperature (normal operating range)
|
||||
temperature = random.randint(55, 70) if model_info["type"] == "ASIC" else random.randint(45, 55)
|
||||
|
||||
# Create a unique name
|
||||
if model_info["type"] == "FPGA":
|
||||
name = f"{prefixes[-1]}{random.randint(1, 99):02d}"
|
||||
else:
|
||||
name = f"{random.choice(prefixes[:-1])}{random.randint(1, 99):02d}"
|
||||
|
||||
workers.append({
|
||||
"name": name,
|
||||
"status": "online",
|
||||
"type": model_info["type"],
|
||||
"model": model_info["model"],
|
||||
"hashrate_60sec": hashrate_60sec,
|
||||
"hashrate_60sec_unit": hashrate_unit,
|
||||
"hashrate_3hr": hashrate_3hr,
|
||||
"hashrate_3hr_unit": hashrate_unit,
|
||||
"efficiency": round(random.uniform(65, 95), 1),
|
||||
"last_share": last_share,
|
||||
"earnings": earnings,
|
||||
"acceptance_rate": acceptance_rate,
|
||||
"power_consumption": model_info["power"],
|
||||
"temperature": temperature
|
||||
})
|
||||
|
||||
# Generate offline workers
|
||||
for i in range(offline_count):
|
||||
# Select a model - more likely to be FPGA for offline
|
||||
if random.random() > 0.6:
|
||||
model_info = models[-1] # FPGA
|
||||
else:
|
||||
model_info = random.choice(models[:-1]) # ASIC
|
||||
|
||||
# Generate last share time (0.5 to 8 hours ago)
|
||||
hours_ago = random.uniform(0.5, 8)
|
||||
last_share = (current_time - timedelta(hours=hours_ago)).strftime("%Y-%m-%d %H:%M")
|
||||
|
||||
# Generate hashrate (historical before going offline)
|
||||
if model_info["type"] == "FPGA":
|
||||
hashrate_3hr = round(random.uniform(1, 3), 2)
|
||||
else:
|
||||
hashrate_3hr = round(random.uniform(20, 90), 2)
|
||||
|
||||
# Create a unique name
|
||||
if model_info["type"] == "FPGA":
|
||||
name = f"{prefixes[-1]}{random.randint(1, 99):02d}"
|
||||
else:
|
||||
name = f"{random.choice(prefixes[:-1])}{random.randint(1, 99):02d}"
|
||||
|
||||
workers.append({
|
||||
"name": name,
|
||||
"status": "offline",
|
||||
"type": model_info["type"],
|
||||
"model": model_info["model"],
|
||||
"hashrate_60sec": 0,
|
||||
"hashrate_60sec_unit": hashrate_unit,
|
||||
"hashrate_3hr": hashrate_3hr,
|
||||
"hashrate_3hr_unit": hashrate_unit,
|
||||
"efficiency": 0,
|
||||
"last_share": last_share,
|
||||
"earnings": round(0.0001 * random.random(), 8),
|
||||
"acceptance_rate": round(random.uniform(95, 99), 1),
|
||||
"power_consumption": 0,
|
||||
"temperature": 0
|
||||
})
|
||||
|
||||
# --- NEW CODE FOR HASHRATE ALIGNMENT ---
|
||||
# Calculate the current sum of online worker hashrates
|
||||
current_total = sum(w["hashrate_3hr"] for w in workers if w["status"] == "online")
|
||||
|
||||
# If we have online workers and the total doesn't match, apply a scaling factor
|
||||
if online_count > 0 and abs(current_total - total_hashrate) > 0.01:
|
||||
scaling_factor = total_hashrate / current_total if current_total > 0 else 1
|
||||
|
||||
# Apply scaling to all online workers
|
||||
for worker in workers:
|
||||
if worker["status"] == "online":
|
||||
# Scale the 3hr hashrate to exactly match total
|
||||
worker["hashrate_3hr"] = round(worker["hashrate_3hr"] * scaling_factor, 2)
|
||||
|
||||
# Scale the 60sec hashrate proportionally
|
||||
if worker["hashrate_60sec"] > 0:
|
||||
worker["hashrate_60sec"] = round(worker["hashrate_60sec"] * scaling_factor, 2)
|
||||
|
||||
# Verify the total now matches
|
||||
new_total = sum(w["hashrate_3hr"] for w in workers if w["status"] == "online")
|
||||
logging.info(f"Adjusted worker hashrates: {current_total} → {new_total} (target: {total_hashrate})")
|
||||
|
||||
return workers
|
||||
|
||||
# Modified get_workers_data function with exact hashrate handling
|
||||
|
||||
def get_workers_data(force_refresh=False):
|
||||
"""Get worker data with guaranteed exact hashrate match."""
|
||||
global worker_data_cache, last_worker_data_update
|
||||
|
||||
current_time = time.time()
|
||||
|
||||
# Return cached data if it's still fresh and not forced to refresh
|
||||
if not force_refresh and worker_data_cache and last_worker_data_update and \
|
||||
(current_time - last_worker_data_update) < WORKER_DATA_CACHE_TIMEOUT:
|
||||
logging.info("Using cached worker data")
|
||||
return worker_data_cache
|
||||
|
||||
try:
|
||||
# If metrics aren't available yet, return default data
|
||||
if not cached_metrics:
|
||||
return generate_default_workers_data()
|
||||
|
||||
# Check if we have workers_hashing information
|
||||
workers_count = cached_metrics.get("workers_hashing", 0)
|
||||
if workers_count <= 0:
|
||||
return generate_default_workers_data()
|
||||
|
||||
# Get hashrate from cached metrics - using EXACT value
|
||||
# Store this ORIGINAL value to ensure it's never changed in calculations
|
||||
original_hashrate_3hr = float(cached_metrics.get("hashrate_3hr", 0) or 0)
|
||||
hashrate_unit = cached_metrics.get("hashrate_3hr_unit", "TH/s")
|
||||
|
||||
# Generate worker data based on the number of active workers
|
||||
workers_data = generate_workers_data(workers_count, original_hashrate_3hr, hashrate_unit)
|
||||
|
||||
# Calculate basic statistics
|
||||
workers_online = len([w for w in workers_data if w['status'] == 'online'])
|
||||
workers_offline = len(workers_data) - workers_online
|
||||
total_earnings = sum([float(w.get('earnings', 0) or 0) for w in workers_data])
|
||||
avg_acceptance_rate = sum([float(w.get('acceptance_rate', 0) or 0) for w in workers_data]) / len(workers_data) if workers_data else 0
|
||||
|
||||
# IMPORTANT: Use the EXACT original value for total_hashrate
|
||||
# Do NOT recalculate it from worker data
|
||||
total_hashrate = original_hashrate_3hr
|
||||
|
||||
# Daily sats from main dashboard
|
||||
daily_sats = cached_metrics.get("daily_mined_sats", 0)
|
||||
|
||||
# Create hashrate history based on arrow_history if available
|
||||
hashrate_history = []
|
||||
if cached_metrics.get("arrow_history") and cached_metrics["arrow_history"].get("hashrate_3hr"):
|
||||
hashrate_history = cached_metrics["arrow_history"]["hashrate_3hr"]
|
||||
|
||||
result = {
|
||||
"workers": workers_data,
|
||||
"workers_total": len(workers_data),
|
||||
"workers_online": workers_online,
|
||||
"workers_offline": workers_offline,
|
||||
"total_hashrate": total_hashrate, # EXACT value from main dashboard
|
||||
"hashrate_unit": hashrate_unit,
|
||||
"total_earnings": total_earnings,
|
||||
"daily_sats": daily_sats,
|
||||
"avg_acceptance_rate": avg_acceptance_rate,
|
||||
"hashrate_history": hashrate_history,
|
||||
"timestamp": datetime.now(ZoneInfo("America/Los_Angeles")).isoformat()
|
||||
}
|
||||
|
||||
# Update cache
|
||||
worker_data_cache = result
|
||||
last_worker_data_update = current_time
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
logging.error(f"Error getting worker data: {e}")
|
||||
return generate_default_workers_data()
|
||||
|
||||
# --- New Time Endpoint for Fine Syncing ---
|
||||
@app.route("/api/time")
|
||||
def api_time():
|
||||
@ -853,6 +1144,153 @@ def api_time():
|
||||
"server_start_time": SERVER_START_TIME.astimezone(ZoneInfo("America/Los_Angeles")).isoformat()
|
||||
})
|
||||
|
||||
# --- Workers Dashboard Route and API ---
|
||||
@app.route("/workers")
|
||||
def workers_dashboard():
|
||||
"""Serve the workers overview dashboard page."""
|
||||
current_time = datetime.now(ZoneInfo("America/Los_Angeles")).strftime("%Y-%m-%d %I:%M:%S %p")
|
||||
|
||||
# Only get minimal worker stats for initial page load
|
||||
# Client-side JS will fetch the full data via API
|
||||
workers_data = get_workers_data()
|
||||
|
||||
return render_template("workers.html",
|
||||
current_time=current_time,
|
||||
workers_total=workers_data.get('workers_total', 0),
|
||||
workers_online=workers_data.get('workers_online', 0),
|
||||
workers_offline=workers_data.get('workers_offline', 0),
|
||||
total_hashrate=workers_data.get('total_hashrate', 0),
|
||||
hashrate_unit=workers_data.get('hashrate_unit', 'TH/s'),
|
||||
total_earnings=workers_data.get('total_earnings', 0),
|
||||
daily_sats=workers_data.get('daily_sats', 0),
|
||||
avg_acceptance_rate=workers_data.get('avg_acceptance_rate', 0))
|
||||
|
||||
@app.route("/api/workers")
|
||||
def api_workers():
|
||||
"""API endpoint for worker data."""
|
||||
# Get the force_refresh parameter from the query string (default: False)
|
||||
force_refresh = request.args.get('force', 'false').lower() == 'true'
|
||||
return jsonify(get_workers_data(force_refresh=force_refresh))
|
||||
|
||||
# --- Modified update_metrics_job function ---
|
||||
def update_metrics_job(force=False):
|
||||
global cached_metrics, last_metrics_update_time, scheduler, scheduler_last_successful_run
|
||||
|
||||
try:
|
||||
# Check scheduler health - enhanced logic to detect failed executors
|
||||
if not scheduler or not hasattr(scheduler, 'running'):
|
||||
logging.error("Scheduler object is invalid, attempting to recreate")
|
||||
with scheduler_recreate_lock:
|
||||
create_scheduler()
|
||||
return
|
||||
|
||||
if not scheduler.running:
|
||||
logging.warning("Scheduler stopped unexpectedly, attempting to restart")
|
||||
try:
|
||||
scheduler.start()
|
||||
logging.info("Scheduler restarted successfully")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to restart scheduler: {e}")
|
||||
# More aggressive recovery - recreate scheduler entirely
|
||||
with scheduler_recreate_lock:
|
||||
create_scheduler()
|
||||
return
|
||||
|
||||
# Test the scheduler's executor by checking its state
|
||||
try:
|
||||
# Check if any jobs exist and are scheduled
|
||||
jobs = scheduler.get_jobs()
|
||||
if not jobs:
|
||||
logging.error("No jobs found in scheduler - recreating")
|
||||
with scheduler_recreate_lock:
|
||||
create_scheduler()
|
||||
return
|
||||
|
||||
# Check if the next run time is set for any job
|
||||
next_runs = [job.next_run_time for job in jobs]
|
||||
if not any(next_runs):
|
||||
logging.error("No jobs with next_run_time found - recreating scheduler")
|
||||
with scheduler_recreate_lock:
|
||||
create_scheduler()
|
||||
return
|
||||
except RuntimeError as e:
|
||||
# Properly handle the "cannot schedule new futures after shutdown" error
|
||||
if "cannot schedule new futures after shutdown" in str(e):
|
||||
logging.error("Detected dead executor, recreating scheduler")
|
||||
with scheduler_recreate_lock:
|
||||
create_scheduler()
|
||||
return
|
||||
except Exception as e:
|
||||
logging.error(f"Error checking scheduler state: {e}")
|
||||
|
||||
# Skip update if the last one was too recent (prevents overlapping runs)
|
||||
# Unless force=True is specified
|
||||
current_time = time.time()
|
||||
if not force and last_metrics_update_time and (current_time - last_metrics_update_time < 30):
|
||||
logging.info("Skipping metrics update - previous update too recent")
|
||||
return
|
||||
|
||||
# Set last update time to now
|
||||
last_metrics_update_time = current_time
|
||||
|
||||
# Add timeout handling with a timer
|
||||
job_timeout = 45 # seconds
|
||||
job_successful = False
|
||||
|
||||
def timeout_handler():
|
||||
if not job_successful:
|
||||
logging.error("Background job timed out after 45 seconds")
|
||||
|
||||
# Set timeout timer
|
||||
timer = threading.Timer(job_timeout, timeout_handler)
|
||||
timer.daemon = True
|
||||
timer.start()
|
||||
|
||||
try:
|
||||
# Correctly call the dashboard instance's fetch_metrics method
|
||||
metrics = dashboard.fetch_metrics()
|
||||
if metrics:
|
||||
global cached_metrics
|
||||
cached_metrics = metrics
|
||||
logging.info("Background job: Metrics updated successfully")
|
||||
job_successful = True
|
||||
|
||||
# Mark successful run time for watchdog
|
||||
global scheduler_last_successful_run
|
||||
scheduler_last_successful_run = time.time()
|
||||
|
||||
persist_critical_state()
|
||||
|
||||
# Periodically check and prune data to prevent memory growth
|
||||
if current_time % 300 < 60: # Every ~5 minutes
|
||||
prune_old_data()
|
||||
|
||||
# Only save state to Redis on a similar schedule, not every update
|
||||
if current_time % 300 < 60: # Every ~5 minutes
|
||||
save_graph_state()
|
||||
|
||||
# Periodic full memory cleanup (every 2 hours)
|
||||
if current_time % 7200 < 60: # Every ~2 hours
|
||||
logging.info("Performing full memory cleanup")
|
||||
gc.collect(generation=2) # Force full collection
|
||||
|
||||
# CHANGE: Removed the worker data preparation from here
|
||||
# No longer attaching workers_data to cached_metrics
|
||||
else:
|
||||
logging.error("Background job: Metrics update returned None")
|
||||
except Exception as e:
|
||||
logging.error(f"Background job: Unexpected error: {e}")
|
||||
import traceback
|
||||
logging.error(traceback.format_exc())
|
||||
log_memory_usage()
|
||||
finally:
|
||||
# Cancel timer in finally block to ensure it's always canceled
|
||||
timer.cancel()
|
||||
except Exception as e:
|
||||
logging.error(f"Background job: Unhandled exception: {e}")
|
||||
import traceback
|
||||
logging.error(traceback.format_exc())
|
||||
|
||||
# --- Fixed SSE Endpoint with proper request context handling ---
|
||||
@app.route('/stream')
|
||||
def stream():
|
||||
@ -1015,122 +1453,6 @@ def create_scheduler():
|
||||
logging.error(f"Error creating scheduler: {e}")
|
||||
return None
|
||||
|
||||
# --- Modified update_metrics_job function ---
|
||||
def update_metrics_job(force=False):
|
||||
global cached_metrics, last_metrics_update_time, scheduler, scheduler_last_successful_run
|
||||
|
||||
try:
|
||||
# Check scheduler health - enhanced logic to detect failed executors
|
||||
if not scheduler or not hasattr(scheduler, 'running'):
|
||||
logging.error("Scheduler object is invalid, attempting to recreate")
|
||||
with scheduler_recreate_lock:
|
||||
create_scheduler()
|
||||
return
|
||||
|
||||
if not scheduler.running:
|
||||
logging.warning("Scheduler stopped unexpectedly, attempting to restart")
|
||||
try:
|
||||
scheduler.start()
|
||||
logging.info("Scheduler restarted successfully")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to restart scheduler: {e}")
|
||||
# More aggressive recovery - recreate scheduler entirely
|
||||
with scheduler_recreate_lock:
|
||||
create_scheduler()
|
||||
return
|
||||
|
||||
# Test the scheduler's executor by checking its state
|
||||
try:
|
||||
# Check if any jobs exist and are scheduled
|
||||
jobs = scheduler.get_jobs()
|
||||
if not jobs:
|
||||
logging.error("No jobs found in scheduler - recreating")
|
||||
with scheduler_recreate_lock:
|
||||
create_scheduler()
|
||||
return
|
||||
|
||||
# Check if the next run time is set for any job
|
||||
next_runs = [job.next_run_time for job in jobs]
|
||||
if not any(next_runs):
|
||||
logging.error("No jobs with next_run_time found - recreating scheduler")
|
||||
with scheduler_recreate_lock:
|
||||
create_scheduler()
|
||||
return
|
||||
except RuntimeError as e:
|
||||
# Properly handle the "cannot schedule new futures after shutdown" error
|
||||
if "cannot schedule new futures after shutdown" in str(e):
|
||||
logging.error("Detected dead executor, recreating scheduler")
|
||||
with scheduler_recreate_lock:
|
||||
create_scheduler()
|
||||
return
|
||||
except Exception as e:
|
||||
logging.error(f"Error checking scheduler state: {e}")
|
||||
|
||||
# Skip update if the last one was too recent (prevents overlapping runs)
|
||||
# Unless force=True is specified
|
||||
current_time = time.time()
|
||||
if not force and last_metrics_update_time and (current_time - last_metrics_update_time < 30):
|
||||
logging.info("Skipping metrics update - previous update too recent")
|
||||
return
|
||||
|
||||
# Set last update time to now
|
||||
last_metrics_update_time = current_time
|
||||
|
||||
# Add timeout handling with a timer
|
||||
job_timeout = 45 # seconds
|
||||
job_successful = False
|
||||
|
||||
def timeout_handler():
|
||||
if not job_successful:
|
||||
logging.error("Background job timed out after 45 seconds")
|
||||
|
||||
# Set timeout timer
|
||||
timer = threading.Timer(job_timeout, timeout_handler)
|
||||
timer.daemon = True
|
||||
timer.start()
|
||||
|
||||
try:
|
||||
# Correctly call the dashboard instance's fetch_metrics method
|
||||
metrics = dashboard.fetch_metrics()
|
||||
if metrics:
|
||||
global cached_metrics
|
||||
cached_metrics = metrics
|
||||
logging.info("Background job: Metrics updated successfully")
|
||||
job_successful = True
|
||||
|
||||
# Mark successful run time for watchdog
|
||||
global scheduler_last_successful_run
|
||||
scheduler_last_successful_run = time.time()
|
||||
|
||||
persist_critical_state()
|
||||
|
||||
# Periodically check and prune data to prevent memory growth
|
||||
if current_time % 300 < 60: # Every ~5 minutes
|
||||
prune_old_data()
|
||||
|
||||
# Only save state to Redis on a similar schedule, not every update
|
||||
if current_time % 300 < 60: # Every ~5 minutes
|
||||
save_graph_state()
|
||||
|
||||
# Periodic full memory cleanup (every 2 hours)
|
||||
if current_time % 7200 < 60: # Every ~2 hours
|
||||
logging.info("Performing full memory cleanup")
|
||||
gc.collect(generation=2) # Force full collection
|
||||
else:
|
||||
logging.error("Background job: Metrics update returned None")
|
||||
except Exception as e:
|
||||
logging.error(f"Background job: Unexpected error: {e}")
|
||||
import traceback
|
||||
logging.error(traceback.format_exc())
|
||||
log_memory_usage()
|
||||
finally:
|
||||
# Cancel timer in finally block to ensure it's always canceled
|
||||
timer.cancel()
|
||||
except Exception as e:
|
||||
logging.error(f"Background job: Unhandled exception: {e}")
|
||||
import traceback
|
||||
logging.error(traceback.format_exc())
|
||||
|
||||
# --- Routes ---
|
||||
@app.route("/")
|
||||
def boot():
|
||||
|
Loading…
Reference in New Issue
Block a user