fgg blog

parallel_quicksort

Table of Contents

在并行运算中,使用原地分区不一定总是比使用额外空间更好。

  • 内存充足、数据量大:使用额外空间分区,便于并行化且编程简洁。
  • 内存有限、数据量中等:使用原地分区,减少内存开销。

# 使用额外空间的版本

from concurrent.futures import ProcessPoolExecutor


def quick_sort(arr):
    if len(arr) <= 1:
        return arr

    pivot = arr[-1]

    # 分区操作
    left = [x for x in arr[:-1] if x <= pivot]
    right = [x for x in arr[:-1] if x > pivot]

    # 定义两个进程池,用于并行排序左右子数组
    #  with multiprocessing.Pool(processes=2) as pool:
    #  with multiprocessing.get_context("spawn").Pool(processes=2) as pool:
    #      sorted_left, sorted_right = pool.map(quick_sort, [left, right])
    #  AssertionError: daemonic processes are not allowed to have children

    with ProcessPoolExecutor(max_workers=2) as executor:
        sorted_left, sorted_right = executor.map(quick_sort, [left, right])

    return sorted_left + [pivot] + sorted_right

# 原地分区的版本

from concurrent.futures import ThreadPoolExecutor
from typing import List


def partition(arr: List[int], low: int, high: int) -> int:
    pivot = arr[high]
    i = low - 1  # i是较小元素的索引
    for j in range(low, high):
        if arr[j] <= pivot:
            i += 1
            arr[i], arr[j] = arr[j], arr[i]
    arr[i + 1], arr[high] = arr[high], arr[i + 1]
    return i + 1


def quicksort(arr: List[int], low: int, high: int, executor, threshold=1000):
    if low < high:
        # 执行分区操作
        pi = partition(arr, low, high)

        # 如果分区大小大于阈值则并行处理
        if (high - low) > threshold:
            # 提交左右两部分的并行任务
            left_future = executor.submit(
                quicksort, arr, low, pi - 1, executor, threshold
            )
            right_future = executor.submit(
                quicksort, arr, pi + 1, high, executor, threshold
            )
            # 等待任务完成
            left_future.result()
            right_future.result()
        else:
            # 小于阈值时直接递归处理
            quicksort(arr, low, pi - 1, executor, threshold)
            quicksort(arr, pi + 1, high, executor, threshold)


def parallel_quicksort(arr: List[int]):
    # 使用ThreadPoolExecutor进行并行化
    with ThreadPoolExecutor() as executor:
        quicksort(arr, 0, len(arr) - 1, executor)