feat: 添加Python GIL与NoGIL多线程性能基准测试工具

- 创建完整的多线程性能测试套件,用于对比单线程、GIL多线程和NoGIL多线程的性能差异
- 实现三种测试模式:单线程测试、传统GIL多线程测试、无GIL多线程测试
- 添加质数查找算法作为CPU密集型测试用例,支持可配置的质数数量和线程数
- 提供详细的性能对比报告,包括执行时间、相对速度倍数和找到的质数数量
- 支持详细日志输出模式,可实时查看各线程的执行状态
- 包含项目配置文件:pyproject.toml、.gitignore、.python-version和MIT许可证
- 采用模块化设计,将不同测试策略分离到独立模块中便于维护
This commit is contained in:
drd_vic
2025-11-24 01:16:23 +08:00
commit 079bd9fa33
11 changed files with 485 additions and 0 deletions

43
.gitignore vendored Normal file
View File

@@ -0,0 +1,43 @@
# Python-generated files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Virtual environments
venv/
.env/
.venv/
env/
# uv lock file
uv.lock
# IDE files
.vscode/
.idea/
*.suo
*.ntvs*
*.njsproj
*.sln
*.sw?
# Logs
*.log
# Runtime data
pids
*.pid
*.seed
*.pid.lock
# OS generated files
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.14

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Runda Dong
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

184
main.py Normal file
View File

@@ -0,0 +1,184 @@
# main.py
import subprocess
import json
import os
import argparse
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description="Python 多线程性能基准测试工具")
parser.add_argument("-v", "--verbose", action="store_true",
help="启用详细模式,显示每个线程的日志")
return parser.parse_args()
def get_python_path(prompt):
"""获取用户输入的Python可执行文件路径并验证其有效性"""
while True:
path = input(prompt).strip()
if not path:
print("路径不能为空。")
continue
if os.path.exists(path) and os.access(path, os.X_OK):
try:
result = subprocess.run([path, "--version"], capture_output=True, text=True, check=True)
print(f" 验证成功: {result.stdout.strip()}")
return path
except subprocess.CalledProcessError:
print(" 验证失败: 这不是一个有效的Python解释器。")
except Exception as e:
print(f" 验证失败: {e}")
else:
print(" 路径不存在或不可执行。")
def run_test(python_path, test_type, total_primes, thread_count, verbose):
"""使用指定的Python解释器运行测试"""
print(f"\n--- 开始测试: {test_type} ---")
cmd = [python_path, "src/test_worker.py", test_type, str(total_primes)]
if thread_count:
cmd.append(str(thread_count))
if verbose:
cmd.append("--verbose")
print(f" 执行命令: {' '.join(cmd)}")
try:
# 使用 Popen 实时捕获输出
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout_output = ""
stderr_output = ""
# 实时打印 stderr (线程日志)
while True:
stderr_line = process.stderr.readline()
if not stderr_line:
break
stderr_output += stderr_line
if verbose:
print(f" [线程日志] {stderr_line.strip()}")
# 获取 stdout 结果
stdout_output, _ = process.communicate()
if process.returncode != 0:
print(f" 测试执行失败 (返回码 {process.returncode}):")
if stdout_output:
print(f" 标准输出: {stdout_output}")
if stderr_output:
print(f" 标准错误: {stderr_output}")
return None
# 解析JSON结果
data = json.loads(stdout_output)
if data.get("error"):
print(f" 测试失败: {data['error']}")
return None
print(f" 测试成功:")
print(f" 耗时: {data['duration']:.2f}")
print(f" 找到质数数量: {data['prime_count']}")
if data['last_prime']:
print(f" 最后一个质数: {data['last_prime']}")
return data
except json.JSONDecodeError:
print(f" 无法解析测试结果。")
print(f" 原始输出: {stdout_output}")
return None
except Exception as e:
print(f" 发生未知错误: {e}")
return None
def main():
args = parse_arguments()
print("=" * 60)
print("Python 多线程性能基准测试工具")
print("=" * 60)
if args.verbose:
print("** 已启用详细模式 (--verbose) **")
print("本工具将使用不同的Python解释器运行CPU密集型任务")
print("以比较单线程、GIL多线程和NoGIL多线程的性能差异。")
print("=" * 60)
print("\n[步骤1/2] 请提供Python解释器路径:")
python_paths = {
"single": get_python_path(" 1. 用于单线程测试的Python路径: "),
"gil": get_python_path(" 2. 用于GIL多线程测试的Python路径: "),
"nogil": get_python_path(" 3. 用于NoGIL多线程测试的Python路径: ")
}
print("\n[步骤2/2] 请设置测试参数:")
while True:
try:
total_primes = int(input(" 要寻找的质数总数 (例如 30000): ").strip())
if total_primes > 0:
break
print(" 请输入一个正整数。")
except ValueError:
print(" 请输入一个有效的整数。")
while True:
try:
thread_count = int(input(" 多线程测试的线程数 (例如 8): ").strip())
if thread_count > 0:
break
print(" 请输入一个正整数。")
except ValueError:
print(" 请输入一个有效的整数。")
print("\n" + "=" * 60)
print("开始执行所有测试...")
print("=" * 60)
test_results = {}
# 单线程测试
result = run_test(python_paths["single"], "single", total_primes, None, args.verbose)
if result:
test_results["single"] = result
# GIL多线程测试
result = run_test(python_paths["gil"], "gil", total_primes, thread_count, args.verbose)
if result:
test_results["gil"] = result
# NoGIL多线程测试
result = run_test(python_paths["nogil"], "nogil", total_primes, thread_count, args.verbose)
if result:
test_results["nogil"] = result
print("\n" + "=" * 60)
print("测试完成! 性能对比报告如下:")
print("=" * 60)
if not test_results:
print("未能获取任何有效测试结果。")
return
fastest_duration = min([r["duration"] for r in test_results.values()])
print(f"\n{'测试类型':<15} {'耗时(秒)':<12} {'相对速度':<12} {'质数数量':<10}")
print("-" * 60)
for test_type, result in test_results.items():
duration = result["duration"]
speedup = fastest_duration / duration
prime_count = result["prime_count"]
test_type_label = {
"single": "单线程",
"gil": "GIL多线程",
"nogil": "NoGIL多线程"
}.get(test_type, test_type)
print(f"{test_type_label:<15} {duration:<12.2f} {speedup:<12.2f}x {prime_count:<10}")
print("\n" + "=" * 60)
if __name__ == "__main__":
main()

15
pyproject.toml Normal file
View File

@@ -0,0 +1,15 @@
[project]
name = "py-gil-nogil-bench"
version = "0.1.0"
description = "Python GIL vs NoGIL 多线程性能基准测试工具"
readme = "README.md"
requires-python = ">=3.9"
dependencies = []
[tool.uv]
python-install-mirror = "https://gh-proxy.com/https://github.com/astral-sh/python-build-standalone/releases/download/"
[[tool.uv.index]]
name = "tuna"
url = "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"
default = true

0
src/__init__.py Normal file
View File

62
src/gil_thread.py Normal file
View File

@@ -0,0 +1,62 @@
# src/gil_thread.py
import threading
from .primes import is_prime
import sys
# 全局共享变量和锁
_next_number_lock = threading.Lock()
_next_number_to_check = 2
_found_primes_lock = threading.Lock()
_found_primes = []
_verbose = False # 全局verbose标志
def _find_primes_worker_gil(target_count):
"""
线程池工作函数 (有GIL版本)。
动态获取下一个数字进行检查。
"""
global _next_number_to_check
local_count = 0
while True:
# 检查是否已找到足够的质数
with _found_primes_lock:
if len(_found_primes) >= target_count:
break
# 获取下一个要检查的数字
current_number = None
with _next_number_lock:
current_number = _next_number_to_check
_next_number_to_check += 1
# 检查质数
if is_prime(current_number):
with _found_primes_lock:
_found_primes.append(current_number)
local_count += 1
if _verbose and local_count % 1000 == 0:
thread_name = threading.current_thread().name
print(f"[{thread_name}] 已找到 {local_count} 个质数...", file=sys.stderr)
def find_primes_gil_thread(total_to_find, thread_count, verbose=False):
"""
使用多线程有GIL寻找指定数量的质数。
"""
global _next_number_to_check, _found_primes, _verbose
_next_number_to_check = 2
_found_primes = []
_verbose = verbose
threads = []
for _ in range(thread_count):
thread = threading.Thread(target=_find_primes_worker_gil, args=(total_to_find,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
return _found_primes.copy()

52
src/nogil_thread.py Normal file
View File

@@ -0,0 +1,52 @@
# src/nogil_thread.py
from concurrent.futures import ThreadPoolExecutor
from .primes import is_prime
import sys
def _find_primes_in_range(start, end, thread_id, verbose):
"""
在指定范围内寻找质数无共享状态适合NoGIL
"""
primes = []
# 从奇数开始检查,提高效率
candidate = start if start % 2 == 1 else start + 1
while candidate < end:
if is_prime(candidate):
primes.append(candidate)
if verbose and len(primes) % 1000 == 0:
print(f"[线程-{thread_id}] 已找到 {len(primes)} 个质数...", file=sys.stderr)
candidate += 2
return primes
def find_primes_nogil_thread(total_to_find, thread_count, verbose=False):
"""
使用多线程无GIL寻找指定数量的质数。
采用静态任务划分策略。
"""
# 估算一个足够大的搜索范围来找到 total_to_find 个质数
import math
if total_to_find < 6:
search_limit = 30
else:
ln_n = math.log(total_to_find)
search_limit = int(total_to_find * (ln_n + math.log(ln_n)) * 1.1) # 乘以1.1作为缓冲
chunk_size = search_limit // thread_count
all_primes = []
with ThreadPoolExecutor(max_workers=thread_count) as executor:
futures = []
for i in range(thread_count):
start = 2 + i * chunk_size
# 最后一个线程处理剩余的所有数字
end = start + chunk_size if i != thread_count - 1 else search_limit
futures.append(executor.submit(_find_primes_in_range, start, end, i, verbose))
for future in futures:
all_primes.extend(future.result())
# 合并结果并排序
all_primes.sort()
# 返回前 total_to_find 个质数
return all_primes[:total_to_find]

19
src/primes.py Normal file
View File

@@ -0,0 +1,19 @@
# src/primes.py
import math
def is_prime(n):
"""
质数判断函数。
对于一个大于2的整数n,如果它不能被2到根号n之间的任何整数整除,那么它就是质数。
"""
if n <= 1:
return False
if n <= 3:
return True
if n % 2 == 0:
return False
# 检查从3到sqrt(n)之间的所有奇数
for i in range(3, math.isqrt(n) + 1, 2):
if n % i == 0:
return False
return True

17
src/single_thread.py Normal file
View File

@@ -0,0 +1,17 @@
# src/single_thread.py
from .primes import is_prime
import sys
def find_primes_single_thread(total_to_find, verbose=False):
"""
使用单线程寻找指定数量的质数。
"""
primes = []
candidate = 2
while len(primes) < total_to_find:
if is_prime(candidate):
primes.append(candidate)
if verbose and len(primes) % 1000 == 0:
print(f"单线程: 已找到 {len(primes)} 个质数...", file=sys.stderr)
candidate += 1
return primes

71
src/test_worker.py Normal file
View File

@@ -0,0 +1,71 @@
# src/test_worker.py
import os
import sys
import json
import time
import argparse
# 获取当前脚本 (test_worker.py) 的绝对路径
current_script_path = os.path.abspath(__file__)
# 获取脚本所在目录 (src/) 的路径
src_directory = os.path.dirname(current_script_path)
# 获取项目根目录 (src/ 的父目录) 的路径
project_root = os.path.dirname(src_directory)
# 将项目根目录添加到 Python 的模块搜索路径中
sys.path.insert(0, project_root)
def parse_arguments():
parser = argparse.ArgumentParser(description="测试工作线程")
parser.add_argument("test_type")
parser.add_argument("total_primes", type=int)
parser.add_argument("thread_count", type=int, nargs='?', default=None)
parser.add_argument("--verbose", action="store_true")
return parser.parse_args()
def main():
args = parse_arguments()
test_type = args.test_type
total_primes = args.total_primes
thread_count = args.thread_count
verbose = args.verbose
result = {
"test_type": test_type,
"total_primes": total_primes,
"thread_count": thread_count if test_type != "single" else None,
"duration": None,
"error": None
}
try:
start_time = time.time()
if test_type == "single":
from src.single_thread import find_primes_single_thread
primes = find_primes_single_thread(total_primes, verbose)
elif test_type == "gil":
from src.gil_thread import find_primes_gil_thread
primes = find_primes_gil_thread(total_primes, thread_count, verbose)
elif test_type == "nogil":
from src.nogil_thread import find_primes_nogil_thread
primes = find_primes_nogil_thread(total_primes, thread_count, verbose)
else:
raise ValueError(f"Unknown test type: {test_type}")
duration = time.time() - start_time
result["duration"] = duration
result["prime_count"] = len(primes)
result["last_prime"] = primes[-1] if primes else None
except Exception as e:
result["error"] = str(e)
# 在详细模式下,也将异常打印到 stderr
if verbose:
print(f"[错误] {e}", file=sys.stderr)
# 确保结果输出到stdout
print(json.dumps(result))
if __name__ == "__main__":
main()