| 特性 | 多线程 | 多进程 |
|---|---|---|
| 内存 | 共享内存 | 独立内存空间 |
| 启动开销 | 小 | 较大 |
| 数据共享 | 容易(但有锁问题) | 需要IPC机制 |
| 受GIL影响 | 是(CPU密集型受限) | 否 |
| 适用场景 | I/O密集型、GUI | CPU密集型、需要隔离 |
import threading
import requests
import time
def download_page(url, results, index):
"""下载单个网页"""
try:
response = requests.get(url, timeout=5)
results[index] = len(response.text)
print(f"Downloaded {url}: {len(response.text)} chars")
except Exception as e:
results[index] = 0
print(f"Error downloading {url}: {e}")
def multithread_download(urls):
"""使用多线程下载多个网页"""
threads = []
results = [None] * len(urls)
start = time.time()
for i, url in enumerate(urls):
thread = threading.Thread(
target=download_page,
args=(url, results, i)
)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print(f"Total time: {time.time() - start:.2f}s")
return results
# 测试
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/3"
]
multithread_download(urls)
import tkinter as tk
import threading
import time
class GUIApp:
def __init__(self):
self.root = tk.Tk()
self.root.title("多线程GUI示例")
# 进度条
self.progress = tk.DoubleVar()
tk.Label(self.root, text="后台任务示例").pack()
tk.Progressbar(self.root, variable=self.progress, length=200).pack()
# 按钮
tk.Button(self.root, text="启动耗时任务",
command=self.start_background_task).pack()
tk.Button(self.root, text="点击我(测试响应)",
command=self.show_response).pack()
def long_running_task(self):
"""模拟耗时任务"""
for i in range(1, 101):
time.sleep(0.05) # 模拟处理
self.progress.set(i)
print("任务完成!")
def start_background_task(self):
"""在新线程中启动耗时任务"""
thread = threading.Thread(target=self.long_running_task)
thread.daemon = True # 守护线程
thread.start()
def show_response(self):
"""测试GUI响应"""
print("GUI仍然响应!")
def run(self):
self.root.mainloop()
# app = GUIApp()
# app.run()
import multiprocessing
import time
import math
def cpu_intensive_task(n):
"""计算n以内所有数的平方根之和"""
result = 0
for i in range(n):
result += math.sqrt(i)
return result
def multiprocess_calculation():
"""使用多进程进行并行计算"""
start = time.time()
# 创建进程池
with multiprocessing.Pool(processes=4) as pool:
# 分配任务
tasks = [1000000, 1500000, 2000000, 2500000]
results = pool.map(cpu_intensive_task, tasks)
total_time = time.time() - start
print(f"多进程计算结果: {results}")
print(f"总耗时: {total_time:.2f}秒")
return results
# 对比单进程版本
def single_process_calculation():
start = time.time()
results = []
for n in [1000000, 1500000, 2000000, 2500000]:
results.append(cpu_intensive_task(n))
total_time = time.time() - start
print(f"单进程计算结果: {results}")
print(f"总耗时: {total_time:.2f}秒")
return results
# 测试对比
if __name__ == "__main__":
print("=== CPU密集型任务测试 ===")
single_process_calculation()
multiprocess_calculation()
import multiprocessing
import os
from multiprocessing import Process, Queue
def stage1(raw_data_queue, processed_queue):
"""第一阶段:数据预处理"""
while True:
data = raw_data_queue.get()
if data is None: # 结束信号
processed_queue.put(None)
break
# 模拟处理
processed = [x * 2 for x in data]
processed_queue.put(processed)
print(f"Stage1 PID {os.getpid()}: processed {len(data)} items")
def stage2(processed_queue, result_queue):
"""第二阶段:数据分析"""
while True:
data = processed_queue.get()
if data is None:
result_queue.put(None)
break
# 模拟分析
analysis_result = sum(data) / len(data)
result_queue.put(analysis_result)
print(f"Stage2 PID {os.getpid()}: analysis result {analysis_result:.2f}")
def pipeline_processing():
"""多进程流水线处理"""
# 创建队列
raw_queue = Queue()
processed_queue = Queue()
result_queue = Queue()
# 创建进程
p1 = Process(target=stage1, args=(raw_queue, processed_queue))
p2 = Process(target=stage2, args=(processed_queue, result_queue))
# 启动进程
p1.start()
p2.start()
# 发送数据
for i in range(10):
raw_data = list(range(i * 100, (i + 1) * 100))
raw_queue.put(raw_data)
# 发送结束信号
raw_queue.put(None)
# 收集结果
results = []
while True:
result = result_queue.get()
if result is None:
break
results.append(result)
# 等待进程结束
p1.join()
p2.join()
print(f"处理完成,得到 {len(results)} 个结果")
return results
import concurrent.futures
import time
import math
import requests
def io_bound_task(url):
"""I/O密集型任务"""
response = requests.get(url, timeout=5)
return len(response.text)
def cpu_bound_task(n):
"""CPU密集型任务"""
return sum(math.sqrt(i) for i in range(n))
def hybrid_approach():
"""线程池处理I/O + 进程池处理CPU"""
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1"
]
numbers = [100000, 200000, 300000, 400000]
start = time.time()
# 使用ThreadPoolExecutor处理I/O密集型任务
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as io_executor:
io_futures = [io_executor.submit(io_bound_task, url) for url in urls]
# 使用ProcessPoolExecutor处理CPU密集型任务
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as cpu_executor:
cpu_futures = [cpu_executor.submit(cpu_bound_task, n) for n in numbers]
# 收集结果
io_results = [f.result() for f in io_futures]
cpu_results = [f.result() for f in cpu_futures]
total_time = time.time() - start
print(f"I/O结果: {io_results}")
print(f"CPU结果: {cpu_results[:2]}...") # 只显示前两个
print(f"总耗时: {total_time:.2f}秒")
return io_results, cpu_results
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import functools
def smart_executor(task_type='io', max_workers=None):
"""根据任务类型智能选择执行器"""
if task_type == 'cpu':
return ProcessPoolExecutor(max_workers or multiprocessing.cpu_count())
else: # 'io' or default
return ThreadPoolExecutor(max_workers or 10)
# 使用示例
def process_data_batch(data_batch, task_type='io'):
"""根据任务类型选择合适的并行方式"""
with smart_executor(task_type) as executor:
results = list(executor.map(process_function, data_batch))
return results
关键点:理解GIL的影响是选择多进程还是多线程的关键。对于Python,I/O等待时GIL会释放,所以I/O密集型任务多线程效果很好;CPU密集型任务需要多进程来真正利用多核。