Files

1158 lines
40 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Flask主应用 - 统一管理三个Streamlit应用
"""
import os
import sys
# 【修复】尽早设置环境变量,确保所有模块都使用无缓冲模式
os.environ['PYTHONIOENCODING'] = 'utf-8'
os.environ['PYTHONUTF8'] = '1'
os.environ['PYTHONUNBUFFERED'] = '1' # 禁用Python输出缓冲确保日志实时输出
import subprocess
import time
import threading
from datetime import datetime
from queue import Queue
from flask import Flask, render_template, request, jsonify, Response
from flask_socketio import SocketIO, emit
import atexit
import requests
from loguru import logger
import importlib
from pathlib import Path
from MindSpider.main import MindSpider
# 导入ReportEngine
try:
from ReportEngine.flask_interface import report_bp, initialize_report_engine
REPORT_ENGINE_AVAILABLE = True
except ImportError as e:
logger.error(f"ReportEngine导入失败: {e}")
REPORT_ENGINE_AVAILABLE = False
app = Flask(__name__)
app.config['SECRET_KEY'] = 'Dedicated-to-creating-a-concise-and-versatile-public-opinion-analysis-platform'
socketio = SocketIO(app, cors_allowed_origins="*")
# eventlet 在客户端主动断开时偶尔会抛出 ConnectionAbortedError这里做一次防御性包裹
# 避免无意义的堆栈污染日志(仅在 eventlet 可用时启用)。
def _patch_eventlet_disconnect_logging():
try:
import eventlet.wsgi # type: ignore
except Exception as exc: # pragma: no cover - 仅在生产环境有效
logger.debug(f"eventlet 不可用,跳过断开补丁: {exc}")
return
try:
original_finish = eventlet.wsgi.HttpProtocol.finish # type: ignore[attr-defined]
except Exception as exc: # pragma: no cover
logger.debug(f"eventlet 缺少 HttpProtocol.finish跳过断开补丁: {exc}")
return
def _safe_finish(self, *args, **kwargs): # pragma: no cover - 运行时才会触发
try:
return original_finish(self, *args, **kwargs)
except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError) as exc:
try:
environ = getattr(self, 'environ', {}) or {}
method = environ.get('REQUEST_METHOD', '')
path = environ.get('PATH_INFO', '')
logger.warning(f"客户端已主动断开,忽略异常: {method} {path} ({exc})")
except Exception:
logger.warning(f"客户端已主动断开,忽略异常: {exc}")
return
eventlet.wsgi.HttpProtocol.finish = _safe_finish # type: ignore[attr-defined]
logger.info("已对 eventlet 连接中断进行安全防护")
_patch_eventlet_disconnect_logging()
# 注册ReportEngine Blueprint
if REPORT_ENGINE_AVAILABLE:
app.register_blueprint(report_bp, url_prefix='/api/report')
logger.info("ReportEngine接口已注册")
else:
logger.info("ReportEngine不可用跳过接口注册")
# 创建日志目录
LOG_DIR = Path('logs')
LOG_DIR.mkdir(exist_ok=True)
CONFIG_MODULE_NAME = 'config'
CONFIG_FILE_PATH = Path(__file__).resolve().parent / 'config.py'
CONFIG_KEYS = [
'HOST',
'PORT',
'DB_DIALECT',
'DB_HOST',
'DB_PORT',
'DB_USER',
'DB_PASSWORD',
'DB_NAME',
'DB_CHARSET',
'INSIGHT_ENGINE_API_KEY',
'INSIGHT_ENGINE_BASE_URL',
'INSIGHT_ENGINE_MODEL_NAME',
'MEDIA_ENGINE_API_KEY',
'MEDIA_ENGINE_BASE_URL',
'MEDIA_ENGINE_MODEL_NAME',
'QUERY_ENGINE_API_KEY',
'QUERY_ENGINE_BASE_URL',
'QUERY_ENGINE_MODEL_NAME',
'REPORT_ENGINE_API_KEY',
'REPORT_ENGINE_BASE_URL',
'REPORT_ENGINE_MODEL_NAME',
'FORUM_HOST_API_KEY',
'FORUM_HOST_BASE_URL',
'FORUM_HOST_MODEL_NAME',
'KEYWORD_OPTIMIZER_API_KEY',
'KEYWORD_OPTIMIZER_BASE_URL',
'KEYWORD_OPTIMIZER_MODEL_NAME',
'TAVILY_API_KEY',
'BOCHA_WEB_SEARCH_API_KEY'
]
def _load_config_module():
"""Load or reload the config module to ensure latest values are available."""
importlib.invalidate_caches()
module = sys.modules.get(CONFIG_MODULE_NAME)
try:
if module is None:
module = importlib.import_module(CONFIG_MODULE_NAME)
else:
module = importlib.reload(module)
except ModuleNotFoundError:
return None
return module
def read_config_values():
"""Return the current configuration values that are exposed to the frontend."""
try:
# 重新加载配置以获取最新的 Settings 实例
from config import reload_settings, settings
reload_settings()
values = {}
for key in CONFIG_KEYS:
# 从 Pydantic Settings 实例读取值
value = getattr(settings, key, None)
# Convert to string for uniform handling on the frontend.
if value is None:
values[key] = ''
else:
values[key] = str(value)
return values
except Exception as exc:
logger.exception(f"读取配置失败: {exc}")
return {}
def _serialize_config_value(value):
"""Serialize Python values back to a config.py assignment-friendly string."""
if isinstance(value, bool):
return 'True' if value else 'False'
if isinstance(value, (int, float)):
return str(value)
if value is None:
return 'None'
value_str = str(value)
escaped = value_str.replace('\\', '\\\\').replace('"', '\\"')
return f'"{escaped}"'
def write_config_values(updates):
"""Persist configuration updates to .env file (Pydantic Settings source)."""
from pathlib import Path
# 确定 .env 文件路径(与 config.py 中的逻辑一致)
project_root = Path(__file__).resolve().parent
cwd_env = Path.cwd() / ".env"
env_file_path = cwd_env if cwd_env.exists() else (project_root / ".env")
# 读取现有的 .env 文件内容
env_lines = []
env_key_indices = {} # 记录每个键在文件中的索引位置
if env_file_path.exists():
env_lines = env_file_path.read_text(encoding='utf-8').splitlines()
# 提取已存在的键及其索引
for i, line in enumerate(env_lines):
line_stripped = line.strip()
if line_stripped and not line_stripped.startswith('#'):
if '=' in line_stripped:
key = line_stripped.split('=')[0].strip()
env_key_indices[key] = i
# 更新或添加配置项
for key, raw_value in updates.items():
# 格式化值用于 .env 文件(不需要引号,除非是字符串且包含空格)
if raw_value is None or raw_value == '':
env_value = ''
elif isinstance(raw_value, (int, float)):
env_value = str(raw_value)
elif isinstance(raw_value, bool):
env_value = 'True' if raw_value else 'False'
else:
value_str = str(raw_value)
# 如果包含空格或特殊字符,需要引号
if ' ' in value_str or '\n' in value_str or '#' in value_str:
escaped = value_str.replace('\\', '\\\\').replace('"', '\\"')
env_value = f'"{escaped}"'
else:
env_value = value_str
# 更新或添加配置项
if key in env_key_indices:
# 更新现有行
env_lines[env_key_indices[key]] = f'{key}={env_value}'
else:
# 添加新行到文件末尾
env_lines.append(f'{key}={env_value}')
# 写入 .env 文件
env_file_path.parent.mkdir(parents=True, exist_ok=True)
env_file_path.write_text('\n'.join(env_lines) + '\n', encoding='utf-8')
# 重新加载配置模块(这会重新读取 .env 文件并创建新的 Settings 实例)
_load_config_module()
system_state_lock = threading.Lock()
system_state = {
'started': False,
'starting': False
}
def _set_system_state(*, started=None, starting=None):
"""Safely update the cached system state flags."""
with system_state_lock:
if started is not None:
system_state['started'] = started
if starting is not None:
system_state['starting'] = starting
def _get_system_state():
"""Return a shallow copy of the system state flags."""
with system_state_lock:
return system_state.copy()
def _prepare_system_start():
"""Mark the system as starting if it is not already running or starting."""
with system_state_lock:
if system_state['started']:
return False, '系统已启动'
if system_state['starting']:
return False, '系统正在启动'
system_state['starting'] = True
return True, None
def initialize_system_components():
"""启动所有依赖组件Streamlit 子应用、ForumEngine、ReportEngine"""
logs = []
errors = []
spider = MindSpider()
if spider.initialize_database():
logger.info("数据库初始化成功")
else:
logger.error("数据库初始化失败")
try:
stop_forum_engine()
logs.append("已停止 ForumEngine 监控器以避免文件冲突")
except Exception as exc: # pragma: no cover - 安全捕获
message = f"停止 ForumEngine 时发生异常: {exc}"
logs.append(message)
logger.exception(message)
processes['forum']['status'] = 'stopped'
for app_name, script_path in STREAMLIT_SCRIPTS.items():
logs.append(f"检查文件: {script_path}")
if os.path.exists(script_path):
success, message = start_streamlit_app(app_name, script_path, processes[app_name]['port'])
logs.append(f"{app_name}: {message}")
if success:
startup_success, startup_message = wait_for_app_startup(app_name, 30)
logs.append(f"{app_name} 启动检查: {startup_message}")
if not startup_success:
errors.append(f"{app_name} 启动失败: {startup_message}")
else:
errors.append(f"{app_name} 启动失败: {message}")
else:
msg = f"文件不存在: {script_path}"
logs.append(f"错误: {msg}")
errors.append(f"{app_name}: {msg}")
forum_started = False
try:
start_forum_engine()
processes['forum']['status'] = 'running'
logs.append("ForumEngine 启动完成")
forum_started = True
except Exception as exc: # pragma: no cover - 保底捕获
error_msg = f"ForumEngine 启动失败: {exc}"
logs.append(error_msg)
errors.append(error_msg)
if REPORT_ENGINE_AVAILABLE:
try:
if initialize_report_engine():
logs.append("ReportEngine 初始化成功")
else:
msg = "ReportEngine 初始化失败"
logs.append(msg)
errors.append(msg)
except Exception as exc: # pragma: no cover
msg = f"ReportEngine 初始化异常: {exc}"
logs.append(msg)
errors.append(msg)
if errors:
cleanup_processes()
processes['forum']['status'] = 'stopped'
if forum_started:
try:
stop_forum_engine()
except Exception: # pragma: no cover
logger.exception("停止ForumEngine失败")
return False, logs, errors
return True, logs, []
# 初始化ForumEngine的forum.log文件
def init_forum_log():
"""初始化forum.log文件"""
try:
forum_log_file = LOG_DIR / "forum.log"
# 检查文件不存在则创建并且写一个开始,存在就清空写一个开始
if not forum_log_file.exists():
with open(forum_log_file, 'w', encoding='utf-8') as f:
start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
f.write(f"=== ForumEngine 系统初始化 - {start_time} ===\n")
logger.info(f"ForumEngine: forum.log 已初始化")
else:
with open(forum_log_file, 'w', encoding='utf-8') as f:
start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
f.write(f"=== ForumEngine 系统初始化 - {start_time} ===\n")
logger.info(f"ForumEngine: forum.log 已初始化")
except Exception as e:
logger.exception(f"ForumEngine: 初始化forum.log失败: {e}")
# 初始化forum.log
init_forum_log()
# 启动ForumEngine智能监控
def start_forum_engine():
"""启动ForumEngine论坛"""
try:
from ForumEngine.monitor import start_forum_monitoring
logger.info("ForumEngine: 启动论坛...")
success = start_forum_monitoring()
if not success:
logger.info("ForumEngine: 论坛启动失败")
except Exception as e:
logger.exception(f"ForumEngine: 启动论坛失败: {e}")
# 停止ForumEngine智能监控
def stop_forum_engine():
"""停止ForumEngine论坛"""
try:
from ForumEngine.monitor import stop_forum_monitoring
logger.info("ForumEngine: 停止论坛...")
stop_forum_monitoring()
logger.info("ForumEngine: 论坛已停止")
except Exception as e:
logger.exception(f"ForumEngine: 停止论坛失败: {e}")
def parse_forum_log_line(line):
"""解析forum.log行内容提取对话信息"""
import re
# 匹配格式: [时间] [来源] 内容(来源允许大小写及空格)
pattern = r'\[(\d{2}:\d{2}:\d{2})\]\s*\[([^\]]+)\]\s*(.*)'
match = re.match(pattern, line)
if not match:
return None
timestamp, raw_source, content = match.groups()
source = raw_source.strip().upper()
# 过滤掉系统消息和空内容
if source == 'SYSTEM' or not content.strip():
return None
# 支持三个Agent和主持人
if source not in ['QUERY', 'INSIGHT', 'MEDIA', 'HOST']:
return None
# 解码日志中的转义换行,保留多行格式
cleaned_content = content.replace('\\n', '\n').replace('\\r', '').strip()
# 根据来源确定消息类型和发送者
if source == 'HOST':
message_type = 'host'
sender = 'Forum Host'
else:
message_type = 'agent'
sender = f'{source.title()} Engine'
return {
'type': message_type,
'sender': sender,
'content': cleaned_content,
'timestamp': timestamp,
'source': source
}
# Forum日志监听器
# 存储每个客户端的历史日志发送位置
forum_log_positions = {}
def monitor_forum_log():
"""监听forum.log文件变化并推送到前端"""
import time
from pathlib import Path
forum_log_file = LOG_DIR / "forum.log"
last_position = 0
processed_lines = set() # 用于跟踪已处理的行,避免重复
# 如果文件存在,获取初始位置但不跳过内容
if forum_log_file.exists():
with open(forum_log_file, 'r', encoding='utf-8', errors='ignore') as f:
# 记录文件大小但不添加到processed_lines
# 这样用户打开forum标签时可以获取历史
f.seek(0, 2) # 移到文件末尾
last_position = f.tell()
while True:
try:
if forum_log_file.exists():
with open(forum_log_file, 'r', encoding='utf-8', errors='ignore') as f:
f.seek(last_position)
new_lines = f.readlines()
if new_lines:
for line in new_lines:
line = line.rstrip('\n\r')
if line.strip():
line_hash = hash(line.strip())
# 避免重复处理同一行
if line_hash in processed_lines:
continue
processed_lines.add(line_hash)
# 解析日志行并发送forum消息
parsed_message = parse_forum_log_line(line)
if parsed_message:
socketio.emit('forum_message', parsed_message)
# 只有在控制台显示forum时才发送控制台消息
timestamp = datetime.now().strftime('%H:%M:%S')
formatted_line = f"[{timestamp}] {line}"
socketio.emit('console_output', {
'app': 'forum',
'line': formatted_line
})
last_position = f.tell()
# 清理processed_lines集合避免内存泄漏保留最近1000行的哈希
if len(processed_lines) > 1000:
# 保留最近500行的哈希
recent_hashes = list(processed_lines)[-500:]
processed_lines = set(recent_hashes)
time.sleep(1) # 每秒检查一次
except Exception as e:
logger.error(f"Forum日志监听错误: {e}")
time.sleep(5)
# 启动Forum日志监听线程
forum_monitor_thread = threading.Thread(target=monitor_forum_log, daemon=True)
forum_monitor_thread.start()
# 全局变量存储进程信息
processes = {
'insight': {'process': None, 'port': 8501, 'status': 'stopped', 'output': [], 'log_file': None},
'media': {'process': None, 'port': 8502, 'status': 'stopped', 'output': [], 'log_file': None},
'query': {'process': None, 'port': 8503, 'status': 'stopped', 'output': [], 'log_file': None},
'forum': {'process': None, 'port': None, 'status': 'stopped', 'output': [], 'log_file': None} # 启动后标记为 running
}
STREAMLIT_SCRIPTS = {
'insight': 'SingleEngineApp/insight_engine_streamlit_app.py',
'media': 'SingleEngineApp/media_engine_streamlit_app.py',
'query': 'SingleEngineApp/query_engine_streamlit_app.py'
}
# 输出队列
output_queues = {
'insight': Queue(),
'media': Queue(),
'query': Queue(),
'forum': Queue()
}
def write_log_to_file(app_name, line):
"""将日志写入文件"""
try:
log_file_path = LOG_DIR / f"{app_name}.log"
with open(log_file_path, 'a', encoding='utf-8') as f:
f.write(line + '\n')
f.flush()
except Exception as e:
logger.error(f"Error writing log for {app_name}: {e}")
def read_log_from_file(app_name, tail_lines=None):
"""从文件读取日志"""
try:
log_file_path = LOG_DIR / f"{app_name}.log"
if not log_file_path.exists():
return []
with open(log_file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
lines = [line.rstrip('\n\r') for line in lines if line.strip()]
if tail_lines:
return lines[-tail_lines:]
return lines
except Exception as e:
logger.exception(f"Error reading log for {app_name}: {e}")
return []
def read_process_output(process, app_name):
"""读取进程输出并写入文件"""
import select
import sys
while True:
try:
if process.poll() is not None:
# 进程结束,读取剩余输出
remaining_output = process.stdout.read()
if remaining_output:
lines = remaining_output.decode('utf-8', errors='replace').split('\n')
for line in lines:
line = line.strip()
if line:
timestamp = datetime.now().strftime('%H:%M:%S')
formatted_line = f"[{timestamp}] {line}"
write_log_to_file(app_name, formatted_line)
socketio.emit('console_output', {
'app': app_name,
'line': formatted_line
})
break
# 使用非阻塞读取
if sys.platform == 'win32':
# Windows下使用不同的方法
output = process.stdout.readline()
if output:
line = output.decode('utf-8', errors='replace').strip()
if line:
timestamp = datetime.now().strftime('%H:%M:%S')
formatted_line = f"[{timestamp}] {line}"
# 写入日志文件
write_log_to_file(app_name, formatted_line)
# 发送到前端
socketio.emit('console_output', {
'app': app_name,
'line': formatted_line
})
else:
# 没有输出时短暂休眠
time.sleep(0.1)
else:
# Unix系统使用select
ready, _, _ = select.select([process.stdout], [], [], 0.1)
if ready:
output = process.stdout.readline()
if output:
line = output.decode('utf-8', errors='replace').strip()
if line:
timestamp = datetime.now().strftime('%H:%M:%S')
formatted_line = f"[{timestamp}] {line}"
# 写入日志文件
write_log_to_file(app_name, formatted_line)
# 发送到前端
socketio.emit('console_output', {
'app': app_name,
'line': formatted_line
})
except Exception as e:
error_msg = f"Error reading output for {app_name}: {e}"
logger.exception(error_msg)
write_log_to_file(app_name, f"[{datetime.now().strftime('%H:%M:%S')}] {error_msg}")
break
def start_streamlit_app(app_name, script_path, port):
"""启动Streamlit应用"""
try:
if processes[app_name]['process'] is not None:
return False, "应用已经在运行"
# 检查文件是否存在
if not os.path.exists(script_path):
return False, f"文件不存在: {script_path}"
# 清空之前的日志文件
log_file_path = LOG_DIR / f"{app_name}.log"
if log_file_path.exists():
log_file_path.unlink()
# 创建启动日志
start_msg = f"[{datetime.now().strftime('%H:%M:%S')}] 启动 {app_name} 应用..."
write_log_to_file(app_name, start_msg)
cmd = [
sys.executable, '-m', 'streamlit', 'run',
script_path,
'--server.port', str(port),
'--server.headless', 'true',
'--browser.gatherUsageStats', 'false',
# '--logger.level', 'debug', # 增加日志详细程度
'--logger.level', 'info',
'--server.enableCORS', 'false'
]
# 设置环境变量确保UTF-8编码和减少缓冲
env = os.environ.copy()
env.update({
'PYTHONIOENCODING': 'utf-8',
'PYTHONUTF8': '1',
'LANG': 'en_US.UTF-8',
'LC_ALL': 'en_US.UTF-8',
'PYTHONUNBUFFERED': '1', # 禁用Python缓冲
'STREAMLIT_BROWSER_GATHER_USAGE_STATS': 'false'
})
# 使用当前工作目录而不是脚本目录
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=0, # 无缓冲
universal_newlines=False,
cwd=os.getcwd(),
env=env,
encoding=None, # 让我们手动处理编码
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0
)
processes[app_name]['process'] = process
processes[app_name]['status'] = 'starting'
processes[app_name]['output'] = []
# 启动输出读取线程
output_thread = threading.Thread(
target=read_process_output,
args=(process, app_name),
daemon=True
)
output_thread.start()
return True, f"{app_name} 应用启动中..."
except Exception as e:
error_msg = f"启动失败: {str(e)}"
write_log_to_file(app_name, f"[{datetime.now().strftime('%H:%M:%S')}] {error_msg}")
return False, error_msg
def stop_streamlit_app(app_name):
"""停止Streamlit应用"""
try:
if processes[app_name]['process'] is None:
return False, "应用未运行"
process = processes[app_name]['process']
process.terminate()
# 等待进程结束
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
processes[app_name]['process'] = None
processes[app_name]['status'] = 'stopped'
return True, f"{app_name} 应用已停止"
except Exception as e:
return False, f"停止失败: {str(e)}"
HEALTHCHECK_PATH = "/_stcore/health"
HEALTHCHECK_PROXIES = {'http': None, 'https': None}
def _build_healthcheck_url(port):
return f"http://127.0.0.1:{port}{HEALTHCHECK_PATH}"
def check_app_status():
"""检查应用状态"""
for app_name, info in processes.items():
if info['process'] is not None:
if info['process'].poll() is None:
# 进程仍在运行,检查端口是否可访问
try:
response = requests.get(
_build_healthcheck_url(info['port']),
timeout=2,
proxies=HEALTHCHECK_PROXIES
)
if response.status_code == 200:
info['status'] = 'running'
else:
info['status'] = 'starting'
except Exception as exc:
logger.warning(f"{app_name} 健康检查失败: {exc}")
info['status'] = 'starting'
else:
# 进程已结束
info['process'] = None
info['status'] = 'stopped'
def wait_for_app_startup(app_name, max_wait_time=90):
"""等待应用启动完成"""
import time
start_time = time.time()
while time.time() - start_time < max_wait_time:
info = processes[app_name]
if info['process'] is None:
return False, "进程已停止"
if info['process'].poll() is not None:
return False, "进程启动失败"
try:
response = requests.get(
_build_healthcheck_url(info['port']),
timeout=2,
proxies=HEALTHCHECK_PROXIES
)
if response.status_code == 200:
info['status'] = 'running'
return True, "启动成功"
except Exception as exc:
logger.warning(f"{app_name} 健康检查失败: {exc}")
time.sleep(1)
return False, "启动超时"
def cleanup_processes():
"""清理所有进程"""
for app_name in STREAMLIT_SCRIPTS:
stop_streamlit_app(app_name)
processes['forum']['status'] = 'stopped'
try:
stop_forum_engine()
except Exception: # pragma: no cover
logger.exception("停止ForumEngine失败")
_set_system_state(started=False, starting=False)
# 注册清理函数
atexit.register(cleanup_processes)
@app.route('/')
def index():
"""主页"""
return render_template('index.html')
@app.route('/api/status')
def get_status():
"""获取所有应用状态"""
check_app_status()
return jsonify({
app_name: {
'status': info['status'],
'port': info['port'],
'output_lines': len(info['output'])
}
for app_name, info in processes.items()
})
@app.route('/api/start/<app_name>')
def start_app(app_name):
"""启动指定应用"""
if app_name not in processes:
return jsonify({'success': False, 'message': '未知应用'})
if app_name == 'forum':
try:
start_forum_engine()
processes['forum']['status'] = 'running'
return jsonify({'success': True, 'message': 'ForumEngine已启动'})
except Exception as exc: # pragma: no cover
logger.exception("手动启动ForumEngine失败")
return jsonify({'success': False, 'message': f'ForumEngine启动失败: {exc}'})
script_path = STREAMLIT_SCRIPTS.get(app_name)
if not script_path:
return jsonify({'success': False, 'message': '该应用不支持启动操作'})
success, message = start_streamlit_app(
app_name,
script_path,
processes[app_name]['port']
)
if success:
# 等待应用启动
startup_success, startup_message = wait_for_app_startup(app_name, 15)
if not startup_success:
message += f" 但启动检查失败: {startup_message}"
return jsonify({'success': success, 'message': message})
@app.route('/api/stop/<app_name>')
def stop_app(app_name):
"""停止指定应用"""
if app_name not in processes:
return jsonify({'success': False, 'message': '未知应用'})
if app_name == 'forum':
try:
stop_forum_engine()
processes['forum']['status'] = 'stopped'
return jsonify({'success': True, 'message': 'ForumEngine已停止'})
except Exception as exc: # pragma: no cover
logger.exception("手动停止ForumEngine失败")
return jsonify({'success': False, 'message': f'ForumEngine停止失败: {exc}'})
success, message = stop_streamlit_app(app_name)
return jsonify({'success': success, 'message': message})
@app.route('/api/output/<app_name>')
def get_output(app_name):
"""获取应用输出"""
if app_name not in processes:
return jsonify({'success': False, 'message': '未知应用'})
# 特殊处理Forum Engine
if app_name == 'forum':
try:
forum_log_content = read_log_from_file('forum')
return jsonify({
'success': True,
'output': forum_log_content,
'total_lines': len(forum_log_content)
})
except Exception as e:
return jsonify({'success': False, 'message': f'读取forum日志失败: {str(e)}'})
# 从文件读取完整日志
output_lines = read_log_from_file(app_name)
return jsonify({
'success': True,
'output': output_lines
})
@app.route('/api/test_log/<app_name>')
def test_log(app_name):
"""测试日志写入功能"""
if app_name not in processes:
return jsonify({'success': False, 'message': '未知应用'})
# 写入测试消息
test_msg = f"[{datetime.now().strftime('%H:%M:%S')}] 测试日志消息 - {datetime.now()}"
write_log_to_file(app_name, test_msg)
# 通过Socket.IO发送
socketio.emit('console_output', {
'app': app_name,
'line': test_msg
})
return jsonify({
'success': True,
'message': f'测试消息已写入 {app_name} 日志'
})
@app.route('/api/forum/start')
def start_forum_monitoring_api():
"""手动启动ForumEngine论坛"""
try:
from ForumEngine.monitor import start_forum_monitoring
success = start_forum_monitoring()
if success:
return jsonify({'success': True, 'message': 'ForumEngine论坛已启动'})
else:
return jsonify({'success': False, 'message': 'ForumEngine论坛启动失败'})
except Exception as e:
return jsonify({'success': False, 'message': f'启动论坛失败: {str(e)}'})
@app.route('/api/forum/stop')
def stop_forum_monitoring_api():
"""手动停止ForumEngine论坛"""
try:
from ForumEngine.monitor import stop_forum_monitoring
stop_forum_monitoring()
return jsonify({'success': True, 'message': 'ForumEngine论坛已停止'})
except Exception as e:
return jsonify({'success': False, 'message': f'停止论坛失败: {str(e)}'})
@app.route('/api/forum/log')
def get_forum_log():
"""获取ForumEngine的forum.log内容"""
try:
forum_log_file = LOG_DIR / "forum.log"
if not forum_log_file.exists():
return jsonify({
'success': True,
'log_lines': [],
'parsed_messages': [],
'total_lines': 0
})
with open(forum_log_file, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
lines = [line.rstrip('\n\r') for line in lines if line.strip()]
# 解析每一行日志并提取对话信息
parsed_messages = []
for line in lines:
parsed_message = parse_forum_log_line(line)
if parsed_message:
parsed_messages.append(parsed_message)
return jsonify({
'success': True,
'log_lines': lines,
'parsed_messages': parsed_messages,
'total_lines': len(lines)
})
except Exception as e:
return jsonify({'success': False, 'message': f'读取forum.log失败: {str(e)}'})
@app.route('/api/forum/log/history', methods=['POST'])
def get_forum_log_history():
"""获取Forum历史日志支持从指定位置开始"""
try:
data = request.get_json()
start_position = data.get('position', 0) # 客户端上次接收的位置
max_lines = data.get('max_lines', 1000) # 最多返回的行数
forum_log_file = LOG_DIR / "forum.log"
if not forum_log_file.exists():
return jsonify({
'success': True,
'log_lines': [],
'position': 0,
'has_more': False
})
with open(forum_log_file, 'r', encoding='utf-8', errors='ignore') as f:
# 从指定位置开始读取
f.seek(start_position)
lines = []
line_count = 0
for line in f:
if line_count >= max_lines:
break
line = line.rstrip('\n\r')
if line.strip():
# 添加时间戳
timestamp = datetime.now().strftime('%H:%M:%S')
formatted_line = f"[{timestamp}] {line}"
lines.append(formatted_line)
line_count += 1
# 记录当前位置
current_position = f.tell()
# 检查是否还有更多内容
f.seek(0, 2) # 移到文件末尾
end_position = f.tell()
has_more = current_position < end_position
return jsonify({
'success': True,
'log_lines': lines,
'position': current_position,
'has_more': has_more
})
except Exception as e:
return jsonify({'success': False, 'message': f'读取forum历史失败: {str(e)}'})
@app.route('/api/search', methods=['POST'])
def search():
"""统一搜索接口"""
data = request.get_json()
query = data.get('query', '').strip()
if not query:
return jsonify({'success': False, 'message': '搜索查询不能为空'})
# ForumEngine论坛已经在后台运行会自动检测搜索活动
# logger.info("ForumEngine: 搜索请求已收到,论坛将自动检测日志变化")
# 检查哪些应用正在运行
check_app_status()
running_apps = [name for name, info in processes.items() if info['status'] == 'running']
if not running_apps:
return jsonify({'success': False, 'message': '没有运行中的应用'})
# 向运行中的应用发送搜索请求
results = {}
api_ports = {'insight': 8601, 'media': 8602, 'query': 8603}
for app_name in running_apps:
try:
api_port = api_ports[app_name]
# 调用Streamlit应用的API端点
response = requests.post(
f"http://localhost:{api_port}/api/search",
json={'query': query},
timeout=10
)
if response.status_code == 200:
results[app_name] = response.json()
else:
results[app_name] = {'success': False, 'message': 'API调用失败'}
except Exception as e:
results[app_name] = {'success': False, 'message': str(e)}
# 搜索完成后可以选择停止监控,或者让它继续运行以捕获后续的处理日志
# 这里我们让监控继续运行,用户可以通过其他接口手动停止
return jsonify({
'success': True,
'query': query,
'results': results
})
@app.route('/api/config', methods=['GET'])
def get_config():
"""Expose selected configuration values to the frontend."""
try:
config_values = read_config_values()
return jsonify({'success': True, 'config': config_values})
except Exception as exc:
logger.exception("读取配置失败")
return jsonify({'success': False, 'message': f'读取配置失败: {exc}'}), 500
@app.route('/api/config', methods=['POST'])
def update_config():
"""Update configuration values and persist them to config.py."""
payload = request.get_json(silent=True) or {}
if not isinstance(payload, dict) or not payload:
return jsonify({'success': False, 'message': '请求体不能为空'}), 400
updates = {}
for key, value in payload.items():
if key in CONFIG_KEYS:
updates[key] = value if value is not None else ''
if not updates:
return jsonify({'success': False, 'message': '没有可更新的配置项'}), 400
try:
write_config_values(updates)
updated_config = read_config_values()
return jsonify({'success': True, 'config': updated_config})
except Exception as exc:
logger.exception("更新配置失败")
return jsonify({'success': False, 'message': f'更新配置失败: {exc}'}), 500
@app.route('/api/system/status')
def get_system_status():
"""返回系统启动状态。"""
state = _get_system_state()
return jsonify({
'success': True,
'started': state['started'],
'starting': state['starting']
})
@app.route('/api/system/start', methods=['POST'])
def start_system():
"""在接收到请求后启动完整系统。"""
allowed, message = _prepare_system_start()
if not allowed:
return jsonify({'success': False, 'message': message}), 400
try:
success, logs, errors = initialize_system_components()
if success:
_set_system_state(started=True)
return jsonify({'success': True, 'message': '系统启动成功', 'logs': logs})
_set_system_state(started=False)
return jsonify({
'success': False,
'message': '系统启动失败',
'logs': logs,
'errors': errors
}), 500
except Exception as exc: # pragma: no cover - 保底捕获
logger.exception("系统启动过程中出现异常")
_set_system_state(started=False)
return jsonify({'success': False, 'message': f'系统启动异常: {exc}'}), 500
finally:
_set_system_state(starting=False)
@socketio.on('connect')
def handle_connect():
"""客户端连接"""
emit('status', 'Connected to Flask server')
@socketio.on('request_status')
def handle_status_request():
"""请求状态更新"""
check_app_status()
emit('status_update', {
app_name: {
'status': info['status'],
'port': info['port']
}
for app_name, info in processes.items()
})
if __name__ == '__main__':
# 从配置文件读取 HOST 和 PORT
from config import settings
HOST = settings.HOST
PORT = settings.PORT
logger.info("等待配置确认,系统将在前端指令后启动组件...")
logger.info(f"Flask服务器已启动访问地址: http://{HOST}:{PORT}")
try:
socketio.run(app, host=HOST, port=PORT, debug=False)
except KeyboardInterrupt:
logger.info("\n正在关闭应用...")
cleanup_processes()