parallel_quicksort
Table of Contents
在并行运算中,使用原地分区不一定总是比使用额外空间更好。
- 内存充足、数据量大:使用额外空间分区,便于并行化且编程简洁。
- 内存有限、数据量中等:使用原地分区,减少内存开销。
#
使用额外空间的版本
from concurrent.futures import ProcessPoolExecutor
def quick_sort(arr):
if len(arr) <= 1:
return arr
pivot = arr[-1]
# 分区操作
left = [x for x in arr[:-1] if x <= pivot]
right = [x for x in arr[:-1] if x > pivot]
# 定义两个进程池,用于并行排序左右子数组
# with multiprocessing.Pool(processes=2) as pool:
# with multiprocessing.get_context("spawn").Pool(processes=2) as pool:
# sorted_left, sorted_right = pool.map(quick_sort, [left, right])
# AssertionError: daemonic processes are not allowed to have children
with ProcessPoolExecutor(max_workers=2) as executor:
sorted_left, sorted_right = executor.map(quick_sort, [left, right])
return sorted_left + [pivot] + sorted_right
#
原地分区的版本
from concurrent.futures import ThreadPoolExecutor
from typing import List
def partition(arr: List[int], low: int, high: int) -> int:
pivot = arr[high]
i = low - 1 # i是较小元素的索引
for j in range(low, high):
if arr[j] <= pivot:
i += 1
arr[i], arr[j] = arr[j], arr[i]
arr[i + 1], arr[high] = arr[high], arr[i + 1]
return i + 1
def quicksort(arr: List[int], low: int, high: int, executor, threshold=1000):
if low < high:
# 执行分区操作
pi = partition(arr, low, high)
# 如果分区大小大于阈值则并行处理
if (high - low) > threshold:
# 提交左右两部分的并行任务
left_future = executor.submit(
quicksort, arr, low, pi - 1, executor, threshold
)
right_future = executor.submit(
quicksort, arr, pi + 1, high, executor, threshold
)
# 等待任务完成
left_future.result()
right_future.result()
else:
# 小于阈值时直接递归处理
quicksort(arr, low, pi - 1, executor, threshold)
quicksort(arr, pi + 1, high, executor, threshold)
def parallel_quicksort(arr: List[int]):
# 使用ThreadPoolExecutor进行并行化
with ThreadPoolExecutor() as executor:
quicksort(arr, 0, len(arr) - 1, executor)