- 更新README.md,改善项目文档结构和内容 - 添加项目logo文件assets/logo.png,提升项目视觉识别度 - 完善帮助文本描述,明确说明使用默认参数和默认python解释器的行为
404 lines
16 KiB
Python
404 lines
16 KiB
Python
# main.py
|
||
import subprocess
|
||
import json
|
||
import csv
|
||
import os
|
||
import sys
|
||
import argparse
|
||
import multiprocessing
|
||
from datetime import datetime
|
||
|
||
def print_custom_help():
|
||
"""
|
||
自定义帮助信息
|
||
"""
|
||
# --- ANSI 颜色和样式代码 ---
|
||
# 重置所有样式
|
||
RESET = '\033[0m'
|
||
# 文本样式
|
||
BOLD = '\033[1m'
|
||
# 前景色
|
||
GREEN = '\033[32m'
|
||
CYAN = '\033[36m'
|
||
YELLOW = '\033[33m'
|
||
|
||
# 组合样式
|
||
HEADER = GREEN + BOLD
|
||
OPTION = CYAN
|
||
EXAMPLE = YELLOW
|
||
|
||
help_text = f"""
|
||
{HEADER}Usage:{RESET} main.py [OPTIONS]
|
||
|
||
{HEADER}Python 多线程性能基准测试工具 - 自定义帮助文档{RESET}
|
||
|
||
一个用于对比不同 Python 版本(单线程、GIL、NoGIL)在 CPU 密集型任务上性能的工具。
|
||
|
||
{HEADER}Options:{RESET}
|
||
{OPTION}-h, --help{RESET} 显示此帮助信息并退出。
|
||
{OPTION}-v, --verbose{RESET} 启用详细模式。在测试过程中打印每个线程的实时日志。
|
||
{OPTION}-q, --quiet{RESET} 启用静默模式。使用默认参数和默认python解释器,只在测试结束后打印最终报告,抑制其他所有输出。
|
||
|
||
{OPTION}-n NUM_PRIMES, --num-primes NUM_PRIMES{RESET}
|
||
指定要寻找的质数总数。
|
||
默认值: 30000
|
||
|
||
{OPTION}-t THREADS, --threads THREADS{RESET}
|
||
指定多线程测试时使用的线程数。
|
||
默认值: 你的 CPU 核心数 ({multiprocessing.cpu_count()})
|
||
|
||
{OPTION}-s TEST_TYPE, --skip TEST_TYPE{RESET}
|
||
跳过指定类型的测试。可以多次使用此选项。
|
||
可选值: single, gil, nogil
|
||
|
||
{OPTION}-l TIMEOUT, --timeout TIMEOUT{RESET}
|
||
单个测试的超时时间(秒)。如果测试超过此时限,将被强制终止。
|
||
默认值: 300
|
||
|
||
{OPTION}-o OUTPUT_FILE, --output OUTPUT_FILE{RESET}
|
||
将测试报告保存到指定文件。支持的格式由文件后缀决定:
|
||
- .json: 保存为 JSON 格式(默认)。
|
||
- .csv: 保存为 CSV 格式(便于 Excel 分析)。
|
||
- .txt: 保存为人类可读的文本格式。
|
||
|
||
{HEADER}Examples:{RESET}
|
||
{EXAMPLE}1. 快速开始(使用所有默认参数){RESET}
|
||
python main.py
|
||
|
||
{EXAMPLE}2. 详细模式 + 寻找 50000 个质数 + 使用 12 个线程{RESET}
|
||
python main.py -v -n 50000 -t 12
|
||
|
||
{EXAMPLE}3. 静默模式 + 跳过 GIL 测试 + 将报告保存为 CSV{RESET}
|
||
python main.py -q --skip gil -o performance_results.csv
|
||
|
||
{EXAMPLE}4. 自定义超时时间(10 分钟)并保存为文本报告{RESET}
|
||
python main.py -l 600 -o detailed_report.txt
|
||
|
||
Report bugs to: your_email@example.com
|
||
Project home: https://github.com/your_username/your_project
|
||
"""
|
||
|
||
print(help_text.strip())
|
||
sys.exit(0)
|
||
|
||
def parse_arguments():
|
||
"""
|
||
解析命令行参数
|
||
"""
|
||
parser = argparse.ArgumentParser(
|
||
description="Python 多线程性能基准测试工具",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="For more examples and detailed usage, run 'python main.py -h'.",
|
||
add_help=False
|
||
)
|
||
parser.add_argument("-h", "--help", action="store_true", help=argparse.SUPPRESS)
|
||
parser.add_argument("-v", "--verbose", action="store_true",help="启用详细模式,显示每个线程的日志")
|
||
parser.add_argument("-q", "--quiet", action="store_true", help="静默模式,仅输出最终报告和错误信息")
|
||
parser.add_argument("-n", "--num-primes", type=int, default=30000, help="要寻找的质数总数(默认:30000)")
|
||
parser.add_argument("-t", "--threads", type=int, default=multiprocessing.cpu_count(), help="多线程测试的线程数(默认:CPU核心数)")
|
||
parser.add_argument("-o", "--output", type=str, help="输出测试报告到文件(支持格式:JSON/CSV/TXT,通过后缀自动识别)")
|
||
parser.add_argument("-s", "--skip", action="append", choices=["single", "gil", "nogil"], help="跳过指定测试类型(可多次使用,例:--skip gil --skip single)")
|
||
parser.add_argument("-l", "--timeout", type=int, default=300, help="单个测试的超时时间(秒,默认:300)")
|
||
|
||
args = parser.parse_args()
|
||
|
||
if args.help:
|
||
print_custom_help()
|
||
|
||
return args
|
||
|
||
def get_python_path(prompt, args):
|
||
"""
|
||
获取用户输入的Python可执行文件路径,并验证其有效性
|
||
"""
|
||
while True:
|
||
if args.quiet:
|
||
# 静默模式下自动使用系统默认Python
|
||
import sys
|
||
path = sys.executable
|
||
if not args.quiet:
|
||
print(f" 使用默认Python路径: {path}")
|
||
return path
|
||
path = input(prompt).strip()
|
||
if not path:
|
||
print(" 路径不能为空。")
|
||
continue
|
||
|
||
if os.path.exists(path) and os.access(path, os.X_OK):
|
||
try:
|
||
result = subprocess.run([path, "--version"], capture_output=True, text=True, check=True)
|
||
print(f" 验证成功: {result.stdout.strip()}")
|
||
return path
|
||
except subprocess.CalledProcessError:
|
||
print(" 验证失败: 这不是一个有效的Python解释器。")
|
||
except Exception as e:
|
||
print(f" 验证失败: {e}")
|
||
else:
|
||
print(" 路径不存在或不可执行。")
|
||
|
||
def run_test(python_path, test_type, total_primes, thread_count, args):
|
||
"""
|
||
使用指定的Python解释器运行自定义测试
|
||
"""
|
||
if not args.quiet:
|
||
print(f"\n--- 开始测试: {test_type} ---")
|
||
|
||
cmd = [python_path, "src/test_worker.py", test_type, str(total_primes)]
|
||
if thread_count:
|
||
cmd.append(str(thread_count))
|
||
if args.verbose:
|
||
cmd.append("--verbose")
|
||
|
||
if not args.quiet:
|
||
print(f" 执行命令: {' '.join(cmd)}")
|
||
print(f" 超时时间: {args.timeout} 秒")
|
||
|
||
try:
|
||
# 使用 Popen 实时捕获输出
|
||
process = subprocess.Popen(cmd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True,
|
||
)
|
||
|
||
stdout_output, stderr_output = "", ""
|
||
|
||
# 实时打印线程日志
|
||
try:
|
||
while True:
|
||
stderr_line = process.stderr.readline()
|
||
if not stderr_line:
|
||
break
|
||
stderr_output += stderr_line
|
||
if args.verbose:
|
||
print(f" [线程日志] {stderr_line.strip()}")
|
||
stdout_output, stderr_output = process.communicate(timeout=args.timeout)
|
||
except subprocess.TimeoutExpired:
|
||
process.kill()
|
||
print(f" 测试超时(超过 {args.timeout} 秒)")
|
||
except Exception as e:
|
||
print(f" 读取线程日志时发生错误: {e}")
|
||
|
||
if process.returncode != 0:
|
||
print(f" 测试执行失败 (返回码 {process.returncode}):")
|
||
if stdout_output:
|
||
print(f" 标准输出: {stdout_output}")
|
||
if stderr_output:
|
||
print(f" 标准错误: {stderr_output}")
|
||
return None
|
||
|
||
data = json.loads(stdout_output)
|
||
|
||
if data.get("error"):
|
||
print(f" 测试失败: {data['error']}")
|
||
return None
|
||
|
||
if not args.quiet:
|
||
print(f" 测试成功:")
|
||
print(f" 耗时: {data['duration']:.2f} 秒")
|
||
print(f" 找到质数数量: {data['prime_count']}")
|
||
if data['last_prime']:
|
||
print(f" 最后一个质数: {data['last_prime']}")
|
||
|
||
return data
|
||
|
||
except json.JSONDecodeError:
|
||
print(f" 无法解析测试结果{stdout_output}")
|
||
return None
|
||
except Exception as e:
|
||
print(f" 发生未知错误: {e}")
|
||
return None
|
||
|
||
def get_file_format(output_path):
|
||
"""
|
||
根据文件后缀获取输出格式
|
||
"""
|
||
if not output_path:
|
||
return "json"
|
||
suffix = os.path.splitext(output_path)[1].lower()
|
||
if suffix == ".csv":
|
||
return "csv"
|
||
elif suffix == ".txt":
|
||
return "txt"
|
||
else:
|
||
return "json"
|
||
|
||
def save_report(report, output_path):
|
||
"""
|
||
保存测试报告
|
||
"""
|
||
file_format = get_file_format(output_path)
|
||
report["generated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
try:
|
||
if file_format == "json":
|
||
# JSON格式
|
||
with open(output_path, "w", encoding="utf-8") as f:
|
||
json.dump(report, f, ensure_ascii=False, indent=2)
|
||
elif file_format == "csv":
|
||
# CSV格式
|
||
with open(output_path, "w", newline="", encoding="utf-8") as f:
|
||
writer = csv.writer(f)
|
||
# 表头
|
||
writer.writerow(["测试类型", "Python路径", "线程数", "耗时(秒)", "质数数量", "最后一个质数", "生成时间"])
|
||
# 数据行
|
||
test_labels = {"single": "单线程", "gil": "GIL多线程", "nogil": "NoGIL多线程"}
|
||
for test_type, data in report["results"].items():
|
||
writer.writerow([
|
||
test_labels.get(test_type, test_type),
|
||
data["python_path"],
|
||
data.get("threads", "-"),
|
||
round(data["duration"], 2),
|
||
data["prime_count"],
|
||
data["last_prime"],
|
||
report["generated_at"]
|
||
])
|
||
elif file_format == "txt":
|
||
# TXT格式
|
||
with open(output_path, "w", encoding="utf-8") as f:
|
||
f.write("=" * 60 + "\n")
|
||
f.write("Python 多线程性能基准测试报告\n")
|
||
f.write("=" * 60 + "\n")
|
||
f.write(f"生成时间: {report['generated_at']}\n")
|
||
f.write("\n【测试参数】\n")
|
||
f.write(f" 质数总数: {report['parameters']['num_primes']}\n")
|
||
f.write(f" 多线程数: {report['parameters']['threads']}\n")
|
||
f.write(f" 超时时间: {report['parameters']['timeout']} 秒\n")
|
||
f.write(f" 跳过测试: {', '.join(report['parameters']['skip_tests']) if report['parameters']['skip_tests'] else '无'}\n")
|
||
f.write("\n【测试结果】\n")
|
||
f.write(f"{'测试类型':<15} {'耗时(秒)':<12} {'质数数量':<10} {'最后一个质数':<15}\n")
|
||
f.write("-" * 60 + "\n")
|
||
test_labels = {"single": "单线程", "gil": "GIL多线程", "nogil": "NoGIL多线程"}
|
||
for test_type, data in report["results"].items():
|
||
f.write(f"{test_labels.get(test_type, test_type):<15} {round(data['duration'], 2):<12.2f} {data['prime_count']:<10} {data['last_prime']:<15}\n")
|
||
# 计算相对速度
|
||
if len(report["results"]) >= 2:
|
||
fastest = min([data["duration"] for data in report["results"].values()])
|
||
f.write("\n【相对速度对比】(以最快测试为基准)\n")
|
||
for test_type, data in report["results"].items():
|
||
speedup = fastest / data["duration"]
|
||
f.write(f"{test_labels.get(test_type, test_type)}: {speedup:.2f}x\n")
|
||
|
||
print(f"\n测试报告已保存到: {os.path.abspath(output_path)}")
|
||
print(f"报告格式: {file_format.upper()}")
|
||
except Exception as e:
|
||
print(f"\n保存报告失败: {e}")
|
||
def main():
|
||
args = parse_arguments()
|
||
|
||
if not args.quiet:
|
||
print("=" * 60)
|
||
print("Python 多线程性能基准测试工具")
|
||
print("=" * 60)
|
||
if args.verbose:
|
||
print("** 已启用详细模式 (--verbose) **")
|
||
if args.quiet:
|
||
print("** 已启用静默模式 (--quiet) **")
|
||
if args.skip:
|
||
print(f"** 跳过测试类型: {', '.join(args.skip)} **")
|
||
print("=" * 60)
|
||
|
||
python_paths = {}
|
||
skip_tests = args.skip or []
|
||
|
||
if "single" not in skip_tests:
|
||
python_paths["single"] = get_python_path(" 1. 用于单线程测试的Python路径: ", args)
|
||
if "gil" not in skip_tests:
|
||
python_paths["gil"] = get_python_path(" 2. 用于GIL多线程测试的Python路径: ", args)
|
||
if "nogil" not in skip_tests:
|
||
python_paths["nogil"] = get_python_path(" 3. 用于NoGIL多线程测试的Python路径: ", args)
|
||
|
||
if not args.quiet:
|
||
print("\n[测试参数]")
|
||
print(f" 质数总数: {args.num_primes}")
|
||
print(f" 多线程数: {args.threads}")
|
||
print(f" 单个测试超时: {args.timeout} 秒")
|
||
if args.output:
|
||
print(f" 报告输出文件: {args.output}")
|
||
print("=" * 60)
|
||
|
||
print("开始执行所有测试...")
|
||
print("=" * 60)
|
||
|
||
test_results = {}
|
||
report_data = {
|
||
"parameters": {
|
||
"num_primes": args.num_primes,
|
||
"threads": args.threads,
|
||
"timeout": args.timeout,
|
||
"skip_tests": args.skip or []
|
||
},
|
||
"results": {}
|
||
}
|
||
|
||
if "single" not in skip_tests:
|
||
result = run_test(python_paths["single"], "single", args.num_primes, None, args)
|
||
if result:
|
||
test_results["single"] = result
|
||
report_data["results"]["single"] = {
|
||
"python_path": python_paths["single"],
|
||
"duration": result["duration"],
|
||
"prime_count": result["prime_count"],
|
||
"last_prime": result["last_prime"]
|
||
}
|
||
|
||
if "gil" not in skip_tests:
|
||
result = run_test(python_paths["gil"], "gil", args.num_primes, args.threads, args)
|
||
if result:
|
||
test_results["gil"] = result
|
||
report_data["results"]["gil"] = {
|
||
"python_path": python_paths["gil"],
|
||
"threads": args.threads,
|
||
"duration": result["duration"],
|
||
"prime_count": result["prime_count"],
|
||
"last_prime": result["last_prime"]
|
||
}
|
||
|
||
if "nogil" not in skip_tests:
|
||
result = run_test(python_paths["nogil"], "nogil", args.num_primes, args.threads, args)
|
||
if result:
|
||
test_results["nogil"] = result
|
||
report_data["results"]["nogil"] = {
|
||
"python_path": python_paths["nogil"],
|
||
"threads": args.threads,
|
||
"duration": result["duration"],
|
||
"prime_count": result["prime_count"],
|
||
"last_prime": result["last_prime"]
|
||
}
|
||
|
||
print("\n" + "=" * 60)
|
||
print("测试完成! 性能对比报告如下:")
|
||
print("=" * 60)
|
||
|
||
if not test_results:
|
||
print("未能获取任何有效测试结果。")
|
||
if args.output:
|
||
save_report(report_data, args.output)
|
||
return
|
||
|
||
# 相对速度
|
||
fastest_duration = min([r["duration"] for r in test_results.values()])
|
||
|
||
# 报告表格
|
||
print(f"\n{'测试类型':<15} {'耗时(秒)':<12} {'相对速度':<12} {'质数数量':<10}")
|
||
print("-" * 60)
|
||
|
||
test_labels = {
|
||
"single": "单线程",
|
||
"gil": "GIL多线程",
|
||
"nogil": "NoGIL多线程"
|
||
}
|
||
|
||
for test_type, result in test_results.items():
|
||
duration = result["duration"]
|
||
speedup = fastest_duration / duration
|
||
prime_count = result["prime_count"]
|
||
print(f"{test_labels[test_type]:<15} {duration:<12.2f} {speedup:<12.2f}x {prime_count:<10}")
|
||
|
||
if args.output:
|
||
save_report(report_data, args.output)
|
||
|
||
print("\n" + "=" * 60)
|
||
|
||
if __name__ == "__main__":
|
||
main() |