1158 lines
40 KiB
Python
1158 lines
40 KiB
Python
"""
|
||
Flask主应用 - 统一管理三个Streamlit应用
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
|
||
# 【修复】尽早设置环境变量,确保所有模块都使用无缓冲模式
|
||
os.environ['PYTHONIOENCODING'] = 'utf-8'
|
||
os.environ['PYTHONUTF8'] = '1'
|
||
os.environ['PYTHONUNBUFFERED'] = '1' # 禁用Python输出缓冲,确保日志实时输出
|
||
|
||
import subprocess
|
||
import time
|
||
import threading
|
||
from datetime import datetime
|
||
from queue import Queue
|
||
from flask import Flask, render_template, request, jsonify, Response
|
||
from flask_socketio import SocketIO, emit
|
||
import atexit
|
||
import requests
|
||
from loguru import logger
|
||
import importlib
|
||
from pathlib import Path
|
||
from MindSpider.main import MindSpider
|
||
|
||
# 导入ReportEngine
|
||
try:
|
||
from ReportEngine.flask_interface import report_bp, initialize_report_engine
|
||
REPORT_ENGINE_AVAILABLE = True
|
||
except ImportError as e:
|
||
logger.error(f"ReportEngine导入失败: {e}")
|
||
REPORT_ENGINE_AVAILABLE = False
|
||
|
||
app = Flask(__name__)
|
||
app.config['SECRET_KEY'] = 'Dedicated-to-creating-a-concise-and-versatile-public-opinion-analysis-platform'
|
||
socketio = SocketIO(app, cors_allowed_origins="*")
|
||
|
||
# eventlet 在客户端主动断开时偶尔会抛出 ConnectionAbortedError,这里做一次防御性包裹,
|
||
# 避免无意义的堆栈污染日志(仅在 eventlet 可用时启用)。
|
||
def _patch_eventlet_disconnect_logging():
|
||
try:
|
||
import eventlet.wsgi # type: ignore
|
||
except Exception as exc: # pragma: no cover - 仅在生产环境有效
|
||
logger.debug(f"eventlet 不可用,跳过断开补丁: {exc}")
|
||
return
|
||
|
||
try:
|
||
original_finish = eventlet.wsgi.HttpProtocol.finish # type: ignore[attr-defined]
|
||
except Exception as exc: # pragma: no cover
|
||
logger.debug(f"eventlet 缺少 HttpProtocol.finish,跳过断开补丁: {exc}")
|
||
return
|
||
|
||
def _safe_finish(self, *args, **kwargs): # pragma: no cover - 运行时才会触发
|
||
try:
|
||
return original_finish(self, *args, **kwargs)
|
||
except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError) as exc:
|
||
try:
|
||
environ = getattr(self, 'environ', {}) or {}
|
||
method = environ.get('REQUEST_METHOD', '')
|
||
path = environ.get('PATH_INFO', '')
|
||
logger.warning(f"客户端已主动断开,忽略异常: {method} {path} ({exc})")
|
||
except Exception:
|
||
logger.warning(f"客户端已主动断开,忽略异常: {exc}")
|
||
return
|
||
|
||
eventlet.wsgi.HttpProtocol.finish = _safe_finish # type: ignore[attr-defined]
|
||
logger.info("已对 eventlet 连接中断进行安全防护")
|
||
|
||
_patch_eventlet_disconnect_logging()
|
||
|
||
# 注册ReportEngine Blueprint
|
||
if REPORT_ENGINE_AVAILABLE:
|
||
app.register_blueprint(report_bp, url_prefix='/api/report')
|
||
logger.info("ReportEngine接口已注册")
|
||
else:
|
||
logger.info("ReportEngine不可用,跳过接口注册")
|
||
|
||
# 创建日志目录
|
||
LOG_DIR = Path('logs')
|
||
LOG_DIR.mkdir(exist_ok=True)
|
||
|
||
CONFIG_MODULE_NAME = 'config'
|
||
CONFIG_FILE_PATH = Path(__file__).resolve().parent / 'config.py'
|
||
CONFIG_KEYS = [
|
||
'HOST',
|
||
'PORT',
|
||
'DB_DIALECT',
|
||
'DB_HOST',
|
||
'DB_PORT',
|
||
'DB_USER',
|
||
'DB_PASSWORD',
|
||
'DB_NAME',
|
||
'DB_CHARSET',
|
||
'INSIGHT_ENGINE_API_KEY',
|
||
'INSIGHT_ENGINE_BASE_URL',
|
||
'INSIGHT_ENGINE_MODEL_NAME',
|
||
'MEDIA_ENGINE_API_KEY',
|
||
'MEDIA_ENGINE_BASE_URL',
|
||
'MEDIA_ENGINE_MODEL_NAME',
|
||
'QUERY_ENGINE_API_KEY',
|
||
'QUERY_ENGINE_BASE_URL',
|
||
'QUERY_ENGINE_MODEL_NAME',
|
||
'REPORT_ENGINE_API_KEY',
|
||
'REPORT_ENGINE_BASE_URL',
|
||
'REPORT_ENGINE_MODEL_NAME',
|
||
'FORUM_HOST_API_KEY',
|
||
'FORUM_HOST_BASE_URL',
|
||
'FORUM_HOST_MODEL_NAME',
|
||
'KEYWORD_OPTIMIZER_API_KEY',
|
||
'KEYWORD_OPTIMIZER_BASE_URL',
|
||
'KEYWORD_OPTIMIZER_MODEL_NAME',
|
||
'TAVILY_API_KEY',
|
||
'BOCHA_WEB_SEARCH_API_KEY'
|
||
]
|
||
|
||
|
||
def _load_config_module():
|
||
"""Load or reload the config module to ensure latest values are available."""
|
||
importlib.invalidate_caches()
|
||
module = sys.modules.get(CONFIG_MODULE_NAME)
|
||
try:
|
||
if module is None:
|
||
module = importlib.import_module(CONFIG_MODULE_NAME)
|
||
else:
|
||
module = importlib.reload(module)
|
||
except ModuleNotFoundError:
|
||
return None
|
||
return module
|
||
|
||
|
||
def read_config_values():
|
||
"""Return the current configuration values that are exposed to the frontend."""
|
||
try:
|
||
# 重新加载配置以获取最新的 Settings 实例
|
||
from config import reload_settings, settings
|
||
reload_settings()
|
||
|
||
values = {}
|
||
for key in CONFIG_KEYS:
|
||
# 从 Pydantic Settings 实例读取值
|
||
value = getattr(settings, key, None)
|
||
# Convert to string for uniform handling on the frontend.
|
||
if value is None:
|
||
values[key] = ''
|
||
else:
|
||
values[key] = str(value)
|
||
return values
|
||
except Exception as exc:
|
||
logger.exception(f"读取配置失败: {exc}")
|
||
return {}
|
||
|
||
|
||
def _serialize_config_value(value):
|
||
"""Serialize Python values back to a config.py assignment-friendly string."""
|
||
if isinstance(value, bool):
|
||
return 'True' if value else 'False'
|
||
if isinstance(value, (int, float)):
|
||
return str(value)
|
||
if value is None:
|
||
return 'None'
|
||
|
||
value_str = str(value)
|
||
escaped = value_str.replace('\\', '\\\\').replace('"', '\\"')
|
||
return f'"{escaped}"'
|
||
|
||
|
||
def write_config_values(updates):
|
||
"""Persist configuration updates to .env file (Pydantic Settings source)."""
|
||
from pathlib import Path
|
||
|
||
# 确定 .env 文件路径(与 config.py 中的逻辑一致)
|
||
project_root = Path(__file__).resolve().parent
|
||
cwd_env = Path.cwd() / ".env"
|
||
env_file_path = cwd_env if cwd_env.exists() else (project_root / ".env")
|
||
|
||
# 读取现有的 .env 文件内容
|
||
env_lines = []
|
||
env_key_indices = {} # 记录每个键在文件中的索引位置
|
||
if env_file_path.exists():
|
||
env_lines = env_file_path.read_text(encoding='utf-8').splitlines()
|
||
# 提取已存在的键及其索引
|
||
for i, line in enumerate(env_lines):
|
||
line_stripped = line.strip()
|
||
if line_stripped and not line_stripped.startswith('#'):
|
||
if '=' in line_stripped:
|
||
key = line_stripped.split('=')[0].strip()
|
||
env_key_indices[key] = i
|
||
|
||
# 更新或添加配置项
|
||
for key, raw_value in updates.items():
|
||
# 格式化值用于 .env 文件(不需要引号,除非是字符串且包含空格)
|
||
if raw_value is None or raw_value == '':
|
||
env_value = ''
|
||
elif isinstance(raw_value, (int, float)):
|
||
env_value = str(raw_value)
|
||
elif isinstance(raw_value, bool):
|
||
env_value = 'True' if raw_value else 'False'
|
||
else:
|
||
value_str = str(raw_value)
|
||
# 如果包含空格或特殊字符,需要引号
|
||
if ' ' in value_str or '\n' in value_str or '#' in value_str:
|
||
escaped = value_str.replace('\\', '\\\\').replace('"', '\\"')
|
||
env_value = f'"{escaped}"'
|
||
else:
|
||
env_value = value_str
|
||
|
||
# 更新或添加配置项
|
||
if key in env_key_indices:
|
||
# 更新现有行
|
||
env_lines[env_key_indices[key]] = f'{key}={env_value}'
|
||
else:
|
||
# 添加新行到文件末尾
|
||
env_lines.append(f'{key}={env_value}')
|
||
|
||
# 写入 .env 文件
|
||
env_file_path.parent.mkdir(parents=True, exist_ok=True)
|
||
env_file_path.write_text('\n'.join(env_lines) + '\n', encoding='utf-8')
|
||
|
||
# 重新加载配置模块(这会重新读取 .env 文件并创建新的 Settings 实例)
|
||
_load_config_module()
|
||
|
||
|
||
system_state_lock = threading.Lock()
|
||
system_state = {
|
||
'started': False,
|
||
'starting': False
|
||
}
|
||
|
||
|
||
def _set_system_state(*, started=None, starting=None):
|
||
"""Safely update the cached system state flags."""
|
||
with system_state_lock:
|
||
if started is not None:
|
||
system_state['started'] = started
|
||
if starting is not None:
|
||
system_state['starting'] = starting
|
||
|
||
|
||
def _get_system_state():
|
||
"""Return a shallow copy of the system state flags."""
|
||
with system_state_lock:
|
||
return system_state.copy()
|
||
|
||
|
||
def _prepare_system_start():
|
||
"""Mark the system as starting if it is not already running or starting."""
|
||
with system_state_lock:
|
||
if system_state['started']:
|
||
return False, '系统已启动'
|
||
if system_state['starting']:
|
||
return False, '系统正在启动'
|
||
system_state['starting'] = True
|
||
return True, None
|
||
|
||
|
||
def initialize_system_components():
|
||
"""启动所有依赖组件(Streamlit 子应用、ForumEngine、ReportEngine)。"""
|
||
logs = []
|
||
errors = []
|
||
|
||
spider = MindSpider()
|
||
if spider.initialize_database():
|
||
logger.info("数据库初始化成功")
|
||
else:
|
||
logger.error("数据库初始化失败")
|
||
|
||
try:
|
||
stop_forum_engine()
|
||
logs.append("已停止 ForumEngine 监控器以避免文件冲突")
|
||
except Exception as exc: # pragma: no cover - 安全捕获
|
||
message = f"停止 ForumEngine 时发生异常: {exc}"
|
||
logs.append(message)
|
||
logger.exception(message)
|
||
|
||
processes['forum']['status'] = 'stopped'
|
||
|
||
for app_name, script_path in STREAMLIT_SCRIPTS.items():
|
||
logs.append(f"检查文件: {script_path}")
|
||
if os.path.exists(script_path):
|
||
success, message = start_streamlit_app(app_name, script_path, processes[app_name]['port'])
|
||
logs.append(f"{app_name}: {message}")
|
||
if success:
|
||
startup_success, startup_message = wait_for_app_startup(app_name, 30)
|
||
logs.append(f"{app_name} 启动检查: {startup_message}")
|
||
if not startup_success:
|
||
errors.append(f"{app_name} 启动失败: {startup_message}")
|
||
else:
|
||
errors.append(f"{app_name} 启动失败: {message}")
|
||
else:
|
||
msg = f"文件不存在: {script_path}"
|
||
logs.append(f"错误: {msg}")
|
||
errors.append(f"{app_name}: {msg}")
|
||
|
||
forum_started = False
|
||
try:
|
||
start_forum_engine()
|
||
processes['forum']['status'] = 'running'
|
||
logs.append("ForumEngine 启动完成")
|
||
forum_started = True
|
||
except Exception as exc: # pragma: no cover - 保底捕获
|
||
error_msg = f"ForumEngine 启动失败: {exc}"
|
||
logs.append(error_msg)
|
||
errors.append(error_msg)
|
||
|
||
if REPORT_ENGINE_AVAILABLE:
|
||
try:
|
||
if initialize_report_engine():
|
||
logs.append("ReportEngine 初始化成功")
|
||
else:
|
||
msg = "ReportEngine 初始化失败"
|
||
logs.append(msg)
|
||
errors.append(msg)
|
||
except Exception as exc: # pragma: no cover
|
||
msg = f"ReportEngine 初始化异常: {exc}"
|
||
logs.append(msg)
|
||
errors.append(msg)
|
||
|
||
if errors:
|
||
cleanup_processes()
|
||
processes['forum']['status'] = 'stopped'
|
||
if forum_started:
|
||
try:
|
||
stop_forum_engine()
|
||
except Exception: # pragma: no cover
|
||
logger.exception("停止ForumEngine失败")
|
||
return False, logs, errors
|
||
|
||
return True, logs, []
|
||
|
||
# 初始化ForumEngine的forum.log文件
|
||
def init_forum_log():
|
||
"""初始化forum.log文件"""
|
||
try:
|
||
forum_log_file = LOG_DIR / "forum.log"
|
||
# 检查文件不存在则创建并且写一个开始,存在就清空写一个开始
|
||
if not forum_log_file.exists():
|
||
with open(forum_log_file, 'w', encoding='utf-8') as f:
|
||
start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
f.write(f"=== ForumEngine 系统初始化 - {start_time} ===\n")
|
||
logger.info(f"ForumEngine: forum.log 已初始化")
|
||
else:
|
||
with open(forum_log_file, 'w', encoding='utf-8') as f:
|
||
start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
f.write(f"=== ForumEngine 系统初始化 - {start_time} ===\n")
|
||
logger.info(f"ForumEngine: forum.log 已初始化")
|
||
except Exception as e:
|
||
logger.exception(f"ForumEngine: 初始化forum.log失败: {e}")
|
||
|
||
# 初始化forum.log
|
||
init_forum_log()
|
||
|
||
# 启动ForumEngine智能监控
|
||
def start_forum_engine():
|
||
"""启动ForumEngine论坛"""
|
||
try:
|
||
from ForumEngine.monitor import start_forum_monitoring
|
||
logger.info("ForumEngine: 启动论坛...")
|
||
success = start_forum_monitoring()
|
||
if not success:
|
||
logger.info("ForumEngine: 论坛启动失败")
|
||
except Exception as e:
|
||
logger.exception(f"ForumEngine: 启动论坛失败: {e}")
|
||
|
||
# 停止ForumEngine智能监控
|
||
def stop_forum_engine():
|
||
"""停止ForumEngine论坛"""
|
||
try:
|
||
from ForumEngine.monitor import stop_forum_monitoring
|
||
logger.info("ForumEngine: 停止论坛...")
|
||
stop_forum_monitoring()
|
||
logger.info("ForumEngine: 论坛已停止")
|
||
except Exception as e:
|
||
logger.exception(f"ForumEngine: 停止论坛失败: {e}")
|
||
|
||
def parse_forum_log_line(line):
|
||
"""解析forum.log行内容,提取对话信息"""
|
||
import re
|
||
|
||
# 匹配格式: [时间] [来源] 内容(来源允许大小写及空格)
|
||
pattern = r'\[(\d{2}:\d{2}:\d{2})\]\s*\[([^\]]+)\]\s*(.*)'
|
||
match = re.match(pattern, line)
|
||
|
||
if not match:
|
||
return None
|
||
|
||
timestamp, raw_source, content = match.groups()
|
||
source = raw_source.strip().upper()
|
||
|
||
# 过滤掉系统消息和空内容
|
||
if source == 'SYSTEM' or not content.strip():
|
||
return None
|
||
|
||
# 支持三个Agent和主持人
|
||
if source not in ['QUERY', 'INSIGHT', 'MEDIA', 'HOST']:
|
||
return None
|
||
|
||
# 解码日志中的转义换行,保留多行格式
|
||
cleaned_content = content.replace('\\n', '\n').replace('\\r', '').strip()
|
||
|
||
# 根据来源确定消息类型和发送者
|
||
if source == 'HOST':
|
||
message_type = 'host'
|
||
sender = 'Forum Host'
|
||
else:
|
||
message_type = 'agent'
|
||
sender = f'{source.title()} Engine'
|
||
|
||
return {
|
||
'type': message_type,
|
||
'sender': sender,
|
||
'content': cleaned_content,
|
||
'timestamp': timestamp,
|
||
'source': source
|
||
}
|
||
|
||
# Forum日志监听器
|
||
# 存储每个客户端的历史日志发送位置
|
||
forum_log_positions = {}
|
||
|
||
def monitor_forum_log():
|
||
"""监听forum.log文件变化并推送到前端"""
|
||
import time
|
||
from pathlib import Path
|
||
|
||
forum_log_file = LOG_DIR / "forum.log"
|
||
last_position = 0
|
||
processed_lines = set() # 用于跟踪已处理的行,避免重复
|
||
|
||
# 如果文件存在,获取初始位置但不跳过内容
|
||
if forum_log_file.exists():
|
||
with open(forum_log_file, 'r', encoding='utf-8', errors='ignore') as f:
|
||
# 记录文件大小,但不添加到processed_lines
|
||
# 这样用户打开forum标签时可以获取历史
|
||
f.seek(0, 2) # 移到文件末尾
|
||
last_position = f.tell()
|
||
|
||
while True:
|
||
try:
|
||
if forum_log_file.exists():
|
||
with open(forum_log_file, 'r', encoding='utf-8', errors='ignore') as f:
|
||
f.seek(last_position)
|
||
new_lines = f.readlines()
|
||
|
||
if new_lines:
|
||
for line in new_lines:
|
||
line = line.rstrip('\n\r')
|
||
if line.strip():
|
||
line_hash = hash(line.strip())
|
||
|
||
# 避免重复处理同一行
|
||
if line_hash in processed_lines:
|
||
continue
|
||
|
||
processed_lines.add(line_hash)
|
||
|
||
# 解析日志行并发送forum消息
|
||
parsed_message = parse_forum_log_line(line)
|
||
if parsed_message:
|
||
socketio.emit('forum_message', parsed_message)
|
||
|
||
# 只有在控制台显示forum时才发送控制台消息
|
||
timestamp = datetime.now().strftime('%H:%M:%S')
|
||
formatted_line = f"[{timestamp}] {line}"
|
||
socketio.emit('console_output', {
|
||
'app': 'forum',
|
||
'line': formatted_line
|
||
})
|
||
|
||
last_position = f.tell()
|
||
|
||
# 清理processed_lines集合,避免内存泄漏(保留最近1000行的哈希)
|
||
if len(processed_lines) > 1000:
|
||
# 保留最近500行的哈希
|
||
recent_hashes = list(processed_lines)[-500:]
|
||
processed_lines = set(recent_hashes)
|
||
|
||
time.sleep(1) # 每秒检查一次
|
||
except Exception as e:
|
||
logger.error(f"Forum日志监听错误: {e}")
|
||
time.sleep(5)
|
||
|
||
# 启动Forum日志监听线程
|
||
forum_monitor_thread = threading.Thread(target=monitor_forum_log, daemon=True)
|
||
forum_monitor_thread.start()
|
||
|
||
# 全局变量存储进程信息
|
||
processes = {
|
||
'insight': {'process': None, 'port': 8501, 'status': 'stopped', 'output': [], 'log_file': None},
|
||
'media': {'process': None, 'port': 8502, 'status': 'stopped', 'output': [], 'log_file': None},
|
||
'query': {'process': None, 'port': 8503, 'status': 'stopped', 'output': [], 'log_file': None},
|
||
'forum': {'process': None, 'port': None, 'status': 'stopped', 'output': [], 'log_file': None} # 启动后标记为 running
|
||
}
|
||
|
||
STREAMLIT_SCRIPTS = {
|
||
'insight': 'SingleEngineApp/insight_engine_streamlit_app.py',
|
||
'media': 'SingleEngineApp/media_engine_streamlit_app.py',
|
||
'query': 'SingleEngineApp/query_engine_streamlit_app.py'
|
||
}
|
||
|
||
# 输出队列
|
||
output_queues = {
|
||
'insight': Queue(),
|
||
'media': Queue(),
|
||
'query': Queue(),
|
||
'forum': Queue()
|
||
}
|
||
|
||
def write_log_to_file(app_name, line):
|
||
"""将日志写入文件"""
|
||
try:
|
||
log_file_path = LOG_DIR / f"{app_name}.log"
|
||
with open(log_file_path, 'a', encoding='utf-8') as f:
|
||
f.write(line + '\n')
|
||
f.flush()
|
||
except Exception as e:
|
||
logger.error(f"Error writing log for {app_name}: {e}")
|
||
|
||
def read_log_from_file(app_name, tail_lines=None):
|
||
"""从文件读取日志"""
|
||
try:
|
||
log_file_path = LOG_DIR / f"{app_name}.log"
|
||
if not log_file_path.exists():
|
||
return []
|
||
|
||
with open(log_file_path, 'r', encoding='utf-8') as f:
|
||
lines = f.readlines()
|
||
lines = [line.rstrip('\n\r') for line in lines if line.strip()]
|
||
|
||
if tail_lines:
|
||
return lines[-tail_lines:]
|
||
return lines
|
||
except Exception as e:
|
||
logger.exception(f"Error reading log for {app_name}: {e}")
|
||
return []
|
||
|
||
def read_process_output(process, app_name):
|
||
"""读取进程输出并写入文件"""
|
||
import select
|
||
import sys
|
||
|
||
while True:
|
||
try:
|
||
if process.poll() is not None:
|
||
# 进程结束,读取剩余输出
|
||
remaining_output = process.stdout.read()
|
||
if remaining_output:
|
||
lines = remaining_output.decode('utf-8', errors='replace').split('\n')
|
||
for line in lines:
|
||
line = line.strip()
|
||
if line:
|
||
timestamp = datetime.now().strftime('%H:%M:%S')
|
||
formatted_line = f"[{timestamp}] {line}"
|
||
write_log_to_file(app_name, formatted_line)
|
||
socketio.emit('console_output', {
|
||
'app': app_name,
|
||
'line': formatted_line
|
||
})
|
||
break
|
||
|
||
# 使用非阻塞读取
|
||
if sys.platform == 'win32':
|
||
# Windows下使用不同的方法
|
||
output = process.stdout.readline()
|
||
if output:
|
||
line = output.decode('utf-8', errors='replace').strip()
|
||
if line:
|
||
timestamp = datetime.now().strftime('%H:%M:%S')
|
||
formatted_line = f"[{timestamp}] {line}"
|
||
|
||
# 写入日志文件
|
||
write_log_to_file(app_name, formatted_line)
|
||
|
||
# 发送到前端
|
||
socketio.emit('console_output', {
|
||
'app': app_name,
|
||
'line': formatted_line
|
||
})
|
||
else:
|
||
# 没有输出时短暂休眠
|
||
time.sleep(0.1)
|
||
else:
|
||
# Unix系统使用select
|
||
ready, _, _ = select.select([process.stdout], [], [], 0.1)
|
||
if ready:
|
||
output = process.stdout.readline()
|
||
if output:
|
||
line = output.decode('utf-8', errors='replace').strip()
|
||
if line:
|
||
timestamp = datetime.now().strftime('%H:%M:%S')
|
||
formatted_line = f"[{timestamp}] {line}"
|
||
|
||
# 写入日志文件
|
||
write_log_to_file(app_name, formatted_line)
|
||
|
||
# 发送到前端
|
||
socketio.emit('console_output', {
|
||
'app': app_name,
|
||
'line': formatted_line
|
||
})
|
||
|
||
except Exception as e:
|
||
error_msg = f"Error reading output for {app_name}: {e}"
|
||
logger.exception(error_msg)
|
||
write_log_to_file(app_name, f"[{datetime.now().strftime('%H:%M:%S')}] {error_msg}")
|
||
break
|
||
|
||
def start_streamlit_app(app_name, script_path, port):
|
||
"""启动Streamlit应用"""
|
||
try:
|
||
if processes[app_name]['process'] is not None:
|
||
return False, "应用已经在运行"
|
||
|
||
# 检查文件是否存在
|
||
if not os.path.exists(script_path):
|
||
return False, f"文件不存在: {script_path}"
|
||
|
||
# 清空之前的日志文件
|
||
log_file_path = LOG_DIR / f"{app_name}.log"
|
||
if log_file_path.exists():
|
||
log_file_path.unlink()
|
||
|
||
# 创建启动日志
|
||
start_msg = f"[{datetime.now().strftime('%H:%M:%S')}] 启动 {app_name} 应用..."
|
||
write_log_to_file(app_name, start_msg)
|
||
|
||
cmd = [
|
||
sys.executable, '-m', 'streamlit', 'run',
|
||
script_path,
|
||
'--server.port', str(port),
|
||
'--server.headless', 'true',
|
||
'--browser.gatherUsageStats', 'false',
|
||
# '--logger.level', 'debug', # 增加日志详细程度
|
||
'--logger.level', 'info',
|
||
'--server.enableCORS', 'false'
|
||
]
|
||
|
||
# 设置环境变量确保UTF-8编码和减少缓冲
|
||
env = os.environ.copy()
|
||
env.update({
|
||
'PYTHONIOENCODING': 'utf-8',
|
||
'PYTHONUTF8': '1',
|
||
'LANG': 'en_US.UTF-8',
|
||
'LC_ALL': 'en_US.UTF-8',
|
||
'PYTHONUNBUFFERED': '1', # 禁用Python缓冲
|
||
'STREAMLIT_BROWSER_GATHER_USAGE_STATS': 'false'
|
||
})
|
||
|
||
# 使用当前工作目录而不是脚本目录
|
||
process = subprocess.Popen(
|
||
cmd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
bufsize=0, # 无缓冲
|
||
universal_newlines=False,
|
||
cwd=os.getcwd(),
|
||
env=env,
|
||
encoding=None, # 让我们手动处理编码
|
||
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0
|
||
)
|
||
|
||
processes[app_name]['process'] = process
|
||
processes[app_name]['status'] = 'starting'
|
||
processes[app_name]['output'] = []
|
||
|
||
# 启动输出读取线程
|
||
output_thread = threading.Thread(
|
||
target=read_process_output,
|
||
args=(process, app_name),
|
||
daemon=True
|
||
)
|
||
output_thread.start()
|
||
|
||
return True, f"{app_name} 应用启动中..."
|
||
|
||
except Exception as e:
|
||
error_msg = f"启动失败: {str(e)}"
|
||
write_log_to_file(app_name, f"[{datetime.now().strftime('%H:%M:%S')}] {error_msg}")
|
||
return False, error_msg
|
||
|
||
def stop_streamlit_app(app_name):
|
||
"""停止Streamlit应用"""
|
||
try:
|
||
if processes[app_name]['process'] is None:
|
||
return False, "应用未运行"
|
||
|
||
process = processes[app_name]['process']
|
||
process.terminate()
|
||
|
||
# 等待进程结束
|
||
try:
|
||
process.wait(timeout=5)
|
||
except subprocess.TimeoutExpired:
|
||
process.kill()
|
||
process.wait()
|
||
|
||
processes[app_name]['process'] = None
|
||
processes[app_name]['status'] = 'stopped'
|
||
|
||
return True, f"{app_name} 应用已停止"
|
||
|
||
except Exception as e:
|
||
return False, f"停止失败: {str(e)}"
|
||
|
||
HEALTHCHECK_PATH = "/_stcore/health"
|
||
HEALTHCHECK_PROXIES = {'http': None, 'https': None}
|
||
|
||
|
||
def _build_healthcheck_url(port):
|
||
return f"http://127.0.0.1:{port}{HEALTHCHECK_PATH}"
|
||
|
||
|
||
def check_app_status():
|
||
"""检查应用状态"""
|
||
for app_name, info in processes.items():
|
||
if info['process'] is not None:
|
||
if info['process'].poll() is None:
|
||
# 进程仍在运行,检查端口是否可访问
|
||
try:
|
||
response = requests.get(
|
||
_build_healthcheck_url(info['port']),
|
||
timeout=2,
|
||
proxies=HEALTHCHECK_PROXIES
|
||
)
|
||
if response.status_code == 200:
|
||
info['status'] = 'running'
|
||
else:
|
||
info['status'] = 'starting'
|
||
except Exception as exc:
|
||
logger.warning(f"{app_name} 健康检查失败: {exc}")
|
||
info['status'] = 'starting'
|
||
else:
|
||
# 进程已结束
|
||
info['process'] = None
|
||
info['status'] = 'stopped'
|
||
|
||
def wait_for_app_startup(app_name, max_wait_time=90):
|
||
"""等待应用启动完成"""
|
||
import time
|
||
start_time = time.time()
|
||
while time.time() - start_time < max_wait_time:
|
||
info = processes[app_name]
|
||
if info['process'] is None:
|
||
return False, "进程已停止"
|
||
|
||
if info['process'].poll() is not None:
|
||
return False, "进程启动失败"
|
||
|
||
try:
|
||
response = requests.get(
|
||
_build_healthcheck_url(info['port']),
|
||
timeout=2,
|
||
proxies=HEALTHCHECK_PROXIES
|
||
)
|
||
if response.status_code == 200:
|
||
info['status'] = 'running'
|
||
return True, "启动成功"
|
||
except Exception as exc:
|
||
logger.warning(f"{app_name} 健康检查失败: {exc}")
|
||
|
||
time.sleep(1)
|
||
|
||
return False, "启动超时"
|
||
|
||
def cleanup_processes():
|
||
"""清理所有进程"""
|
||
for app_name in STREAMLIT_SCRIPTS:
|
||
stop_streamlit_app(app_name)
|
||
|
||
processes['forum']['status'] = 'stopped'
|
||
try:
|
||
stop_forum_engine()
|
||
except Exception: # pragma: no cover
|
||
logger.exception("停止ForumEngine失败")
|
||
_set_system_state(started=False, starting=False)
|
||
|
||
# 注册清理函数
|
||
atexit.register(cleanup_processes)
|
||
|
||
@app.route('/')
|
||
def index():
|
||
"""主页"""
|
||
return render_template('index.html')
|
||
|
||
@app.route('/api/status')
|
||
def get_status():
|
||
"""获取所有应用状态"""
|
||
check_app_status()
|
||
return jsonify({
|
||
app_name: {
|
||
'status': info['status'],
|
||
'port': info['port'],
|
||
'output_lines': len(info['output'])
|
||
}
|
||
for app_name, info in processes.items()
|
||
})
|
||
|
||
@app.route('/api/start/<app_name>')
|
||
def start_app(app_name):
|
||
"""启动指定应用"""
|
||
if app_name not in processes:
|
||
return jsonify({'success': False, 'message': '未知应用'})
|
||
|
||
if app_name == 'forum':
|
||
try:
|
||
start_forum_engine()
|
||
processes['forum']['status'] = 'running'
|
||
return jsonify({'success': True, 'message': 'ForumEngine已启动'})
|
||
except Exception as exc: # pragma: no cover
|
||
logger.exception("手动启动ForumEngine失败")
|
||
return jsonify({'success': False, 'message': f'ForumEngine启动失败: {exc}'})
|
||
|
||
script_path = STREAMLIT_SCRIPTS.get(app_name)
|
||
if not script_path:
|
||
return jsonify({'success': False, 'message': '该应用不支持启动操作'})
|
||
|
||
success, message = start_streamlit_app(
|
||
app_name,
|
||
script_path,
|
||
processes[app_name]['port']
|
||
)
|
||
|
||
if success:
|
||
# 等待应用启动
|
||
startup_success, startup_message = wait_for_app_startup(app_name, 15)
|
||
if not startup_success:
|
||
message += f" 但启动检查失败: {startup_message}"
|
||
|
||
return jsonify({'success': success, 'message': message})
|
||
|
||
@app.route('/api/stop/<app_name>')
|
||
def stop_app(app_name):
|
||
"""停止指定应用"""
|
||
if app_name not in processes:
|
||
return jsonify({'success': False, 'message': '未知应用'})
|
||
|
||
if app_name == 'forum':
|
||
try:
|
||
stop_forum_engine()
|
||
processes['forum']['status'] = 'stopped'
|
||
return jsonify({'success': True, 'message': 'ForumEngine已停止'})
|
||
except Exception as exc: # pragma: no cover
|
||
logger.exception("手动停止ForumEngine失败")
|
||
return jsonify({'success': False, 'message': f'ForumEngine停止失败: {exc}'})
|
||
|
||
success, message = stop_streamlit_app(app_name)
|
||
return jsonify({'success': success, 'message': message})
|
||
|
||
@app.route('/api/output/<app_name>')
|
||
def get_output(app_name):
|
||
"""获取应用输出"""
|
||
if app_name not in processes:
|
||
return jsonify({'success': False, 'message': '未知应用'})
|
||
|
||
# 特殊处理Forum Engine
|
||
if app_name == 'forum':
|
||
try:
|
||
forum_log_content = read_log_from_file('forum')
|
||
return jsonify({
|
||
'success': True,
|
||
'output': forum_log_content,
|
||
'total_lines': len(forum_log_content)
|
||
})
|
||
except Exception as e:
|
||
return jsonify({'success': False, 'message': f'读取forum日志失败: {str(e)}'})
|
||
|
||
# 从文件读取完整日志
|
||
output_lines = read_log_from_file(app_name)
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'output': output_lines
|
||
})
|
||
|
||
@app.route('/api/test_log/<app_name>')
|
||
def test_log(app_name):
|
||
"""测试日志写入功能"""
|
||
if app_name not in processes:
|
||
return jsonify({'success': False, 'message': '未知应用'})
|
||
|
||
# 写入测试消息
|
||
test_msg = f"[{datetime.now().strftime('%H:%M:%S')}] 测试日志消息 - {datetime.now()}"
|
||
write_log_to_file(app_name, test_msg)
|
||
|
||
# 通过Socket.IO发送
|
||
socketio.emit('console_output', {
|
||
'app': app_name,
|
||
'line': test_msg
|
||
})
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': f'测试消息已写入 {app_name} 日志'
|
||
})
|
||
|
||
@app.route('/api/forum/start')
|
||
def start_forum_monitoring_api():
|
||
"""手动启动ForumEngine论坛"""
|
||
try:
|
||
from ForumEngine.monitor import start_forum_monitoring
|
||
success = start_forum_monitoring()
|
||
if success:
|
||
return jsonify({'success': True, 'message': 'ForumEngine论坛已启动'})
|
||
else:
|
||
return jsonify({'success': False, 'message': 'ForumEngine论坛启动失败'})
|
||
except Exception as e:
|
||
return jsonify({'success': False, 'message': f'启动论坛失败: {str(e)}'})
|
||
|
||
@app.route('/api/forum/stop')
|
||
def stop_forum_monitoring_api():
|
||
"""手动停止ForumEngine论坛"""
|
||
try:
|
||
from ForumEngine.monitor import stop_forum_monitoring
|
||
stop_forum_monitoring()
|
||
return jsonify({'success': True, 'message': 'ForumEngine论坛已停止'})
|
||
except Exception as e:
|
||
return jsonify({'success': False, 'message': f'停止论坛失败: {str(e)}'})
|
||
|
||
@app.route('/api/forum/log')
|
||
def get_forum_log():
|
||
"""获取ForumEngine的forum.log内容"""
|
||
try:
|
||
forum_log_file = LOG_DIR / "forum.log"
|
||
if not forum_log_file.exists():
|
||
return jsonify({
|
||
'success': True,
|
||
'log_lines': [],
|
||
'parsed_messages': [],
|
||
'total_lines': 0
|
||
})
|
||
|
||
with open(forum_log_file, 'r', encoding='utf-8', errors='ignore') as f:
|
||
lines = f.readlines()
|
||
lines = [line.rstrip('\n\r') for line in lines if line.strip()]
|
||
|
||
# 解析每一行日志并提取对话信息
|
||
parsed_messages = []
|
||
for line in lines:
|
||
parsed_message = parse_forum_log_line(line)
|
||
if parsed_message:
|
||
parsed_messages.append(parsed_message)
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'log_lines': lines,
|
||
'parsed_messages': parsed_messages,
|
||
'total_lines': len(lines)
|
||
})
|
||
except Exception as e:
|
||
return jsonify({'success': False, 'message': f'读取forum.log失败: {str(e)}'})
|
||
|
||
@app.route('/api/forum/log/history', methods=['POST'])
|
||
def get_forum_log_history():
|
||
"""获取Forum历史日志(支持从指定位置开始)"""
|
||
try:
|
||
data = request.get_json()
|
||
start_position = data.get('position', 0) # 客户端上次接收的位置
|
||
max_lines = data.get('max_lines', 1000) # 最多返回的行数
|
||
|
||
forum_log_file = LOG_DIR / "forum.log"
|
||
if not forum_log_file.exists():
|
||
return jsonify({
|
||
'success': True,
|
||
'log_lines': [],
|
||
'position': 0,
|
||
'has_more': False
|
||
})
|
||
|
||
with open(forum_log_file, 'r', encoding='utf-8', errors='ignore') as f:
|
||
# 从指定位置开始读取
|
||
f.seek(start_position)
|
||
lines = []
|
||
line_count = 0
|
||
|
||
for line in f:
|
||
if line_count >= max_lines:
|
||
break
|
||
line = line.rstrip('\n\r')
|
||
if line.strip():
|
||
# 添加时间戳
|
||
timestamp = datetime.now().strftime('%H:%M:%S')
|
||
formatted_line = f"[{timestamp}] {line}"
|
||
lines.append(formatted_line)
|
||
line_count += 1
|
||
|
||
# 记录当前位置
|
||
current_position = f.tell()
|
||
|
||
# 检查是否还有更多内容
|
||
f.seek(0, 2) # 移到文件末尾
|
||
end_position = f.tell()
|
||
has_more = current_position < end_position
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'log_lines': lines,
|
||
'position': current_position,
|
||
'has_more': has_more
|
||
})
|
||
except Exception as e:
|
||
return jsonify({'success': False, 'message': f'读取forum历史失败: {str(e)}'})
|
||
|
||
@app.route('/api/search', methods=['POST'])
|
||
def search():
|
||
"""统一搜索接口"""
|
||
data = request.get_json()
|
||
query = data.get('query', '').strip()
|
||
|
||
if not query:
|
||
return jsonify({'success': False, 'message': '搜索查询不能为空'})
|
||
|
||
# ForumEngine论坛已经在后台运行,会自动检测搜索活动
|
||
# logger.info("ForumEngine: 搜索请求已收到,论坛将自动检测日志变化")
|
||
|
||
# 检查哪些应用正在运行
|
||
check_app_status()
|
||
running_apps = [name for name, info in processes.items() if info['status'] == 'running']
|
||
|
||
if not running_apps:
|
||
return jsonify({'success': False, 'message': '没有运行中的应用'})
|
||
|
||
# 向运行中的应用发送搜索请求
|
||
results = {}
|
||
api_ports = {'insight': 8601, 'media': 8602, 'query': 8603}
|
||
|
||
for app_name in running_apps:
|
||
try:
|
||
api_port = api_ports[app_name]
|
||
# 调用Streamlit应用的API端点
|
||
response = requests.post(
|
||
f"http://localhost:{api_port}/api/search",
|
||
json={'query': query},
|
||
timeout=10
|
||
)
|
||
if response.status_code == 200:
|
||
results[app_name] = response.json()
|
||
else:
|
||
results[app_name] = {'success': False, 'message': 'API调用失败'}
|
||
except Exception as e:
|
||
results[app_name] = {'success': False, 'message': str(e)}
|
||
|
||
# 搜索完成后可以选择停止监控,或者让它继续运行以捕获后续的处理日志
|
||
# 这里我们让监控继续运行,用户可以通过其他接口手动停止
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'query': query,
|
||
'results': results
|
||
})
|
||
|
||
|
||
@app.route('/api/config', methods=['GET'])
|
||
def get_config():
|
||
"""Expose selected configuration values to the frontend."""
|
||
try:
|
||
config_values = read_config_values()
|
||
return jsonify({'success': True, 'config': config_values})
|
||
except Exception as exc:
|
||
logger.exception("读取配置失败")
|
||
return jsonify({'success': False, 'message': f'读取配置失败: {exc}'}), 500
|
||
|
||
|
||
@app.route('/api/config', methods=['POST'])
|
||
def update_config():
|
||
"""Update configuration values and persist them to config.py."""
|
||
payload = request.get_json(silent=True) or {}
|
||
if not isinstance(payload, dict) or not payload:
|
||
return jsonify({'success': False, 'message': '请求体不能为空'}), 400
|
||
|
||
updates = {}
|
||
for key, value in payload.items():
|
||
if key in CONFIG_KEYS:
|
||
updates[key] = value if value is not None else ''
|
||
|
||
if not updates:
|
||
return jsonify({'success': False, 'message': '没有可更新的配置项'}), 400
|
||
|
||
try:
|
||
write_config_values(updates)
|
||
updated_config = read_config_values()
|
||
return jsonify({'success': True, 'config': updated_config})
|
||
except Exception as exc:
|
||
logger.exception("更新配置失败")
|
||
return jsonify({'success': False, 'message': f'更新配置失败: {exc}'}), 500
|
||
|
||
|
||
@app.route('/api/system/status')
|
||
def get_system_status():
|
||
"""返回系统启动状态。"""
|
||
state = _get_system_state()
|
||
return jsonify({
|
||
'success': True,
|
||
'started': state['started'],
|
||
'starting': state['starting']
|
||
})
|
||
|
||
|
||
@app.route('/api/system/start', methods=['POST'])
|
||
def start_system():
|
||
"""在接收到请求后启动完整系统。"""
|
||
allowed, message = _prepare_system_start()
|
||
if not allowed:
|
||
return jsonify({'success': False, 'message': message}), 400
|
||
|
||
try:
|
||
success, logs, errors = initialize_system_components()
|
||
if success:
|
||
_set_system_state(started=True)
|
||
return jsonify({'success': True, 'message': '系统启动成功', 'logs': logs})
|
||
|
||
_set_system_state(started=False)
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '系统启动失败',
|
||
'logs': logs,
|
||
'errors': errors
|
||
}), 500
|
||
except Exception as exc: # pragma: no cover - 保底捕获
|
||
logger.exception("系统启动过程中出现异常")
|
||
_set_system_state(started=False)
|
||
return jsonify({'success': False, 'message': f'系统启动异常: {exc}'}), 500
|
||
finally:
|
||
_set_system_state(starting=False)
|
||
|
||
@socketio.on('connect')
|
||
def handle_connect():
|
||
"""客户端连接"""
|
||
emit('status', 'Connected to Flask server')
|
||
|
||
@socketio.on('request_status')
|
||
def handle_status_request():
|
||
"""请求状态更新"""
|
||
check_app_status()
|
||
emit('status_update', {
|
||
app_name: {
|
||
'status': info['status'],
|
||
'port': info['port']
|
||
}
|
||
for app_name, info in processes.items()
|
||
})
|
||
|
||
if __name__ == '__main__':
|
||
# 从配置文件读取 HOST 和 PORT
|
||
from config import settings
|
||
HOST = settings.HOST
|
||
PORT = settings.PORT
|
||
|
||
logger.info("等待配置确认,系统将在前端指令后启动组件...")
|
||
logger.info(f"Flask服务器已启动,访问地址: http://{HOST}:{PORT}")
|
||
|
||
try:
|
||
socketio.run(app, host=HOST, port=PORT, debug=False)
|
||
except KeyboardInterrupt:
|
||
logger.info("\n正在关闭应用...")
|
||
cleanup_processes()
|
||
|
||
|