Files
PyGILnoGILBench/main.py
drd_vic be88dc8f27 docs: 更新项目文档和添加项目logo
- 更新README.md,改善项目文档结构和内容
- 添加项目logo文件assets/logo.png,提升项目视觉识别度
- 完善帮助文本描述,明确说明使用默认参数和默认python解释器的行为
2025-11-24 18:28:07 +08:00

404 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# main.py
import subprocess
import json
import csv
import os
import sys
import argparse
import multiprocessing
from datetime import datetime
def print_custom_help():
"""
自定义帮助信息
"""
# --- ANSI 颜色和样式代码 ---
# 重置所有样式
RESET = '\033[0m'
# 文本样式
BOLD = '\033[1m'
# 前景色
GREEN = '\033[32m'
CYAN = '\033[36m'
YELLOW = '\033[33m'
# 组合样式
HEADER = GREEN + BOLD
OPTION = CYAN
EXAMPLE = YELLOW
help_text = f"""
{HEADER}Usage:{RESET} main.py [OPTIONS]
{HEADER}Python 多线程性能基准测试工具 - 自定义帮助文档{RESET}
一个用于对比不同 Python 版本单线程、GIL、NoGIL在 CPU 密集型任务上性能的工具。
{HEADER}Options:{RESET}
{OPTION}-h, --help{RESET} 显示此帮助信息并退出。
{OPTION}-v, --verbose{RESET} 启用详细模式。在测试过程中打印每个线程的实时日志。
{OPTION}-q, --quiet{RESET} 启用静默模式。使用默认参数和默认python解释器只在测试结束后打印最终报告抑制其他所有输出。
{OPTION}-n NUM_PRIMES, --num-primes NUM_PRIMES{RESET}
指定要寻找的质数总数。
默认值: 30000
{OPTION}-t THREADS, --threads THREADS{RESET}
指定多线程测试时使用的线程数。
默认值: 你的 CPU 核心数 ({multiprocessing.cpu_count()})
{OPTION}-s TEST_TYPE, --skip TEST_TYPE{RESET}
跳过指定类型的测试。可以多次使用此选项。
可选值: single, gil, nogil
{OPTION}-l TIMEOUT, --timeout TIMEOUT{RESET}
单个测试的超时时间(秒)。如果测试超过此时限,将被强制终止。
默认值: 300
{OPTION}-o OUTPUT_FILE, --output OUTPUT_FILE{RESET}
将测试报告保存到指定文件。支持的格式由文件后缀决定:
- .json: 保存为 JSON 格式(默认)。
- .csv: 保存为 CSV 格式(便于 Excel 分析)。
- .txt: 保存为人类可读的文本格式。
{HEADER}Examples:{RESET}
{EXAMPLE}1. 快速开始(使用所有默认参数){RESET}
python main.py
{EXAMPLE}2. 详细模式 + 寻找 50000 个质数 + 使用 12 个线程{RESET}
python main.py -v -n 50000 -t 12
{EXAMPLE}3. 静默模式 + 跳过 GIL 测试 + 将报告保存为 CSV{RESET}
python main.py -q --skip gil -o performance_results.csv
{EXAMPLE}4. 自定义超时时间10 分钟)并保存为文本报告{RESET}
python main.py -l 600 -o detailed_report.txt
Report bugs to: your_email@example.com
Project home: https://github.com/your_username/your_project
"""
print(help_text.strip())
sys.exit(0)
def parse_arguments():
"""
解析命令行参数
"""
parser = argparse.ArgumentParser(
description="Python 多线程性能基准测试工具",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="For more examples and detailed usage, run 'python main.py -h'.",
add_help=False
)
parser.add_argument("-h", "--help", action="store_true", help=argparse.SUPPRESS)
parser.add_argument("-v", "--verbose", action="store_true",help="启用详细模式,显示每个线程的日志")
parser.add_argument("-q", "--quiet", action="store_true", help="静默模式,仅输出最终报告和错误信息")
parser.add_argument("-n", "--num-primes", type=int, default=30000, help="要寻找的质数总数默认30000")
parser.add_argument("-t", "--threads", type=int, default=multiprocessing.cpu_count(), help="多线程测试的线程数默认CPU核心数")
parser.add_argument("-o", "--output", type=str, help="输出测试报告到文件支持格式JSON/CSV/TXT通过后缀自动识别")
parser.add_argument("-s", "--skip", action="append", choices=["single", "gil", "nogil"], help="跳过指定测试类型(可多次使用,例:--skip gil --skip single")
parser.add_argument("-l", "--timeout", type=int, default=300, help="单个测试的超时时间默认300")
args = parser.parse_args()
if args.help:
print_custom_help()
return args
def get_python_path(prompt, args):
"""
获取用户输入的Python可执行文件路径并验证其有效性
"""
while True:
if args.quiet:
# 静默模式下自动使用系统默认Python
import sys
path = sys.executable
if not args.quiet:
print(f" 使用默认Python路径: {path}")
return path
path = input(prompt).strip()
if not path:
print(" 路径不能为空。")
continue
if os.path.exists(path) and os.access(path, os.X_OK):
try:
result = subprocess.run([path, "--version"], capture_output=True, text=True, check=True)
print(f" 验证成功: {result.stdout.strip()}")
return path
except subprocess.CalledProcessError:
print(" 验证失败: 这不是一个有效的Python解释器。")
except Exception as e:
print(f" 验证失败: {e}")
else:
print(" 路径不存在或不可执行。")
def run_test(python_path, test_type, total_primes, thread_count, args):
"""
使用指定的Python解释器运行自定义测试
"""
if not args.quiet:
print(f"\n--- 开始测试: {test_type} ---")
cmd = [python_path, "src/test_worker.py", test_type, str(total_primes)]
if thread_count:
cmd.append(str(thread_count))
if args.verbose:
cmd.append("--verbose")
if not args.quiet:
print(f" 执行命令: {' '.join(cmd)}")
print(f" 超时时间: {args.timeout}")
try:
# 使用 Popen 实时捕获输出
process = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
stdout_output, stderr_output = "", ""
# 实时打印线程日志
try:
while True:
stderr_line = process.stderr.readline()
if not stderr_line:
break
stderr_output += stderr_line
if args.verbose:
print(f" [线程日志] {stderr_line.strip()}")
stdout_output, stderr_output = process.communicate(timeout=args.timeout)
except subprocess.TimeoutExpired:
process.kill()
print(f" 测试超时(超过 {args.timeout} 秒)")
except Exception as e:
print(f" 读取线程日志时发生错误: {e}")
if process.returncode != 0:
print(f" 测试执行失败 (返回码 {process.returncode}):")
if stdout_output:
print(f" 标准输出: {stdout_output}")
if stderr_output:
print(f" 标准错误: {stderr_output}")
return None
data = json.loads(stdout_output)
if data.get("error"):
print(f" 测试失败: {data['error']}")
return None
if not args.quiet:
print(f" 测试成功:")
print(f" 耗时: {data['duration']:.2f}")
print(f" 找到质数数量: {data['prime_count']}")
if data['last_prime']:
print(f" 最后一个质数: {data['last_prime']}")
return data
except json.JSONDecodeError:
print(f" 无法解析测试结果{stdout_output}")
return None
except Exception as e:
print(f" 发生未知错误: {e}")
return None
def get_file_format(output_path):
"""
根据文件后缀获取输出格式
"""
if not output_path:
return "json"
suffix = os.path.splitext(output_path)[1].lower()
if suffix == ".csv":
return "csv"
elif suffix == ".txt":
return "txt"
else:
return "json"
def save_report(report, output_path):
"""
保存测试报告
"""
file_format = get_file_format(output_path)
report["generated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
if file_format == "json":
# JSON格式
with open(output_path, "w", encoding="utf-8") as f:
json.dump(report, f, ensure_ascii=False, indent=2)
elif file_format == "csv":
# CSV格式
with open(output_path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
# 表头
writer.writerow(["测试类型", "Python路径", "线程数", "耗时(秒)", "质数数量", "最后一个质数", "生成时间"])
# 数据行
test_labels = {"single": "单线程", "gil": "GIL多线程", "nogil": "NoGIL多线程"}
for test_type, data in report["results"].items():
writer.writerow([
test_labels.get(test_type, test_type),
data["python_path"],
data.get("threads", "-"),
round(data["duration"], 2),
data["prime_count"],
data["last_prime"],
report["generated_at"]
])
elif file_format == "txt":
# TXT格式
with open(output_path, "w", encoding="utf-8") as f:
f.write("=" * 60 + "\n")
f.write("Python 多线程性能基准测试报告\n")
f.write("=" * 60 + "\n")
f.write(f"生成时间: {report['generated_at']}\n")
f.write("\n【测试参数】\n")
f.write(f" 质数总数: {report['parameters']['num_primes']}\n")
f.write(f" 多线程数: {report['parameters']['threads']}\n")
f.write(f" 超时时间: {report['parameters']['timeout']}\n")
f.write(f" 跳过测试: {', '.join(report['parameters']['skip_tests']) if report['parameters']['skip_tests'] else ''}\n")
f.write("\n【测试结果】\n")
f.write(f"{'测试类型':<15} {'耗时(秒)':<12} {'质数数量':<10} {'最后一个质数':<15}\n")
f.write("-" * 60 + "\n")
test_labels = {"single": "单线程", "gil": "GIL多线程", "nogil": "NoGIL多线程"}
for test_type, data in report["results"].items():
f.write(f"{test_labels.get(test_type, test_type):<15} {round(data['duration'], 2):<12.2f} {data['prime_count']:<10} {data['last_prime']:<15}\n")
# 计算相对速度
if len(report["results"]) >= 2:
fastest = min([data["duration"] for data in report["results"].values()])
f.write("\n【相对速度对比】(以最快测试为基准)\n")
for test_type, data in report["results"].items():
speedup = fastest / data["duration"]
f.write(f"{test_labels.get(test_type, test_type)}: {speedup:.2f}x\n")
print(f"\n测试报告已保存到: {os.path.abspath(output_path)}")
print(f"报告格式: {file_format.upper()}")
except Exception as e:
print(f"\n保存报告失败: {e}")
def main():
args = parse_arguments()
if not args.quiet:
print("=" * 60)
print("Python 多线程性能基准测试工具")
print("=" * 60)
if args.verbose:
print("** 已启用详细模式 (--verbose) **")
if args.quiet:
print("** 已启用静默模式 (--quiet) **")
if args.skip:
print(f"** 跳过测试类型: {', '.join(args.skip)} **")
print("=" * 60)
python_paths = {}
skip_tests = args.skip or []
if "single" not in skip_tests:
python_paths["single"] = get_python_path(" 1. 用于单线程测试的Python路径: ", args)
if "gil" not in skip_tests:
python_paths["gil"] = get_python_path(" 2. 用于GIL多线程测试的Python路径: ", args)
if "nogil" not in skip_tests:
python_paths["nogil"] = get_python_path(" 3. 用于NoGIL多线程测试的Python路径: ", args)
if not args.quiet:
print("\n[测试参数]")
print(f" 质数总数: {args.num_primes}")
print(f" 多线程数: {args.threads}")
print(f" 单个测试超时: {args.timeout}")
if args.output:
print(f" 报告输出文件: {args.output}")
print("=" * 60)
print("开始执行所有测试...")
print("=" * 60)
test_results = {}
report_data = {
"parameters": {
"num_primes": args.num_primes,
"threads": args.threads,
"timeout": args.timeout,
"skip_tests": args.skip or []
},
"results": {}
}
if "single" not in skip_tests:
result = run_test(python_paths["single"], "single", args.num_primes, None, args)
if result:
test_results["single"] = result
report_data["results"]["single"] = {
"python_path": python_paths["single"],
"duration": result["duration"],
"prime_count": result["prime_count"],
"last_prime": result["last_prime"]
}
if "gil" not in skip_tests:
result = run_test(python_paths["gil"], "gil", args.num_primes, args.threads, args)
if result:
test_results["gil"] = result
report_data["results"]["gil"] = {
"python_path": python_paths["gil"],
"threads": args.threads,
"duration": result["duration"],
"prime_count": result["prime_count"],
"last_prime": result["last_prime"]
}
if "nogil" not in skip_tests:
result = run_test(python_paths["nogil"], "nogil", args.num_primes, args.threads, args)
if result:
test_results["nogil"] = result
report_data["results"]["nogil"] = {
"python_path": python_paths["nogil"],
"threads": args.threads,
"duration": result["duration"],
"prime_count": result["prime_count"],
"last_prime": result["last_prime"]
}
print("\n" + "=" * 60)
print("测试完成! 性能对比报告如下:")
print("=" * 60)
if not test_results:
print("未能获取任何有效测试结果。")
if args.output:
save_report(report_data, args.output)
return
# 相对速度
fastest_duration = min([r["duration"] for r in test_results.values()])
# 报告表格
print(f"\n{'测试类型':<15} {'耗时(秒)':<12} {'相对速度':<12} {'质数数量':<10}")
print("-" * 60)
test_labels = {
"single": "单线程",
"gil": "GIL多线程",
"nogil": "NoGIL多线程"
}
for test_type, result in test_results.items():
duration = result["duration"]
speedup = fastest_duration / duration
prime_count = result["prime_count"]
print(f"{test_labels[test_type]:<15} {duration:<12.2f} {speedup:<12.2f}x {prime_count:<10}")
if args.output:
save_report(report_data, args.output)
print("\n" + "=" * 60)
if __name__ == "__main__":
main()