c++ - smallest - mergesort algorithmus



并行合并C++中的性能问题 (2)

我已经尝试使用线程和模板编写mergesort的并行实现。 相关的代码如下所示。

我已经将性能与C ++ STL进行了比较。 当没有线程产生时,我的代码比std :: sort慢6倍。 玩变量maxthreads(和/或FACTOR)我只能使性能提高一倍,所以在最好的情况下,我比std :: sort慢3倍。 我已经尝试了16核心多处理器机器上的代码。

htop显示内核是按照预期使用的,但为什么性能不足,我不觉得整体运行时是并行的?

有错误吗?

谢谢你的回复。

#define FACTOR 1
static unsigned int maxthreads = FACTOR * std::thread::hardware_concurrency();

unsigned int workers=0;
std::mutex g_mutex;

template <typename T>
std::vector<T>* mergesort_inplace_multithreading(
    typename std::vector<T>::iterator* listbegin,
    typename std::vector<T>::iterator *listend,
    std::vector<T>* listarg)
{
    if (*listbegin == *listend)
    {
        return listarg;
    }
    else if (*listend == *listbegin + 1)
    {
        return listarg;
    }
    else
    {
        size_t offset = std::distance(*listbegin, *listend)/2;
        typename std::vector<T>::iterator listhalf = *listbegin + offset;
        g_mutex.lock();
        if (::workers <= maxthreads-2 and maxthreads >=2)
        {
            workers += 2;

            g_mutex.unlock();

            std::thread first_thread(mergesort_inplace_multithreading<T>, listbegin, &listhalf, listarg);
            std::thread second_thread(mergesort_inplace_multithreading<T>, &listhalf, listend, listarg);
            first_thread.join();
            second_thread.join();
            g_mutex.lock();
            workers -= 2;
            g_mutex.unlock();
        }
        else
        {
            g_mutex.unlock();
            mergesort_inplace_multithreading<T>(listbegin, &listhalf, listarg);
            mergesort_inplace_multithreading<T>(&listhalf, listend, listarg);
        }

        typename std::vector<T> result;
        typename std::vector<T>::iterator lo_sorted_it = *listbegin;
        typename std::vector<T>::iterator hi_sorted_it = listhalf;
        typename std::vector<T>::iterator lo_sortedend = listhalf;
        typename std::vector<T>::iterator hi_sortedend = *listend;
        while (lo_sorted_it != lo_sortedend and hi_sorted_it != hi_sortedend)
        {
            if (*lo_sorted_it <= *hi_sorted_it)
            {
                result.push_back(*lo_sorted_it);
                ++lo_sorted_it;
            }
            else
            {
                result.push_back(*hi_sorted_it);
                ++hi_sorted_it;
            }

        }//end while

        if (lo_sorted_it != lo_sortedend)
        {
            //assert(hi_sorted_it == hi_sortedend);
            result.insert(result.end(), lo_sorted_it, lo_sortedend);
        }
        else
        {
            //assert(lo_sorted_it == lo_sortedend);
            result.insert(result.end(), hi_sorted_it, hi_sortedend);
        }
        std::copy(result.begin(), result.end(), *listbegin);
        return listarg;
    }
}

int main()
{
    //some tests
}

Answer #1

感谢您的答复。

互斥体只保护无符号的int工作(一个全局变量),它跟踪有多少线程被产生。 如果达到最大值(由maxthreads给出),则不会生成更多的线程。 您使用mergesort_mt2中的参数N来完成此操作。

你的机器有多少核心?

尽管如此,表演似乎只是双倍...


Answer #2

并行mergesort不需要互斥体。 而且你当然不需要为每个分区分割启动两个线程。 你启动一个线程; 第二个分区在当前线程上处理; 一个线程资源的使用比一个线程所使用的线程资源要好得多,只能等待另外两个线程完成。

首先,简单的测试程序,排序20万个无符号整数。 注意:所有使用Apple LLVM版本5.1(clang-503.0.40)(基于LLVM 3.4svn),64位,posix线程和O2

测试程序

int main()
{
    using namespace std::chrono;

    std::random_device rd;
    std::mt19937 rng(rd());
    std::uniform_int_distribution<unsigned int> dist(0, std::numeric_limits<unsigned int>::max());

    std::vector<unsigned int> v, back(20*1000000);

    for (int i=0; i<5; ++i)
    {
        std::cout << "Generating...\n";
        std::generate_n(back.begin(), back.size(), [&](){return dist(rng);});

        time_point<system_clock> t0, t1;

        v = back;
        std::cout << "std::sort: ";
        t0 = system_clock::now();
        std::sort(v.begin(), v.end());
        t1 = system_clock::now();
        std::cout << duration_cast<milliseconds>(t1-t0).count() << "ms\n";

        v = back;
        std::cout << "mergesort_mt1: ";
        t0 = system_clock::now();
        mergesort_mt1(v.begin(), v.end());
        t1 = system_clock::now();
        std::cout << duration_cast<milliseconds>(t1-t0).count() << "ms\n";
    }

    return 0;
}

并行Mergesort

我们从超基本的东西开始。 我们将并发线程数限制为标准库中报告的硬件并发数。 一旦我们达到这个限度,我们就不再发布新的线程,而只是简单地递归现有的线程。 这个微不足道的算法一旦分布在硬件支持的线程中,就具有令人惊讶的正常行为。

template<typename Iter>
void mergesort_mt1(Iter begin, Iter end,
                  unsigned int N = std::thread::hardware_concurrency()/2)
{
    auto len = std::distance(begin, end);
    if (len < 2)
        return;

    Iter mid = std::next(begin, len/2);
    if (N > 1)
    {
        auto fn = std::async(mergesort_mt1<Iter>, begin, mid, N-2);
        mergesort_mt1(mid, end, N-2);
        fn.wait();
    }
    else
    {
        mergesort_mt1(begin, mid, 0);
        mergesort_mt1(mid, end, 0);
    }

    std::inplace_merge(begin, mid, end);
}

产量

Generating...
std::sort: 1902ms
mergesort_mt1: 1609ms
Generating...
std::sort: 1894ms
mergesort_mt1: 1584ms
Generating...
std::sort: 1881ms
mergesort_mt1: 1589ms
Generating...
std::sort: 1840ms
mergesort_mt1: 1580ms
Generating...
std::sort: 1841ms
mergesort_mt1: 1631ms

这看起来有希望,但肯定可以改善。

并行合并+标准库排序

在实现中, std::sort算法在供应商和供应商之间差别很大。 标准的主要限制是平均复杂度为O(NlogN)。 为了在性能方面达到这个目的,许多std::sort算法是标准库中最复杂的,nutcase优化的代码。 我已经细读了一些有几个内部排序特性的实现。 我见过的一个这样的实现使用introsort快速排序,直到递归深度有限,然后堆排序 )为较大的分区,一旦达到小分区,屈从于一个庞大的手动展开的16插槽插入排序

关键是,标准的图书馆作者明白,一个通用的排序算法根本不适合所有。 有几个人经常被用来做这个任务,经常和谐地合作。 不要天真地认为你能打败他们; 而是通过利用他们的辛勤工作加入他们

修改我们的代码很简单。 对于小于1025的所有分区,我们使用std::sort 。其余部分是相同的:

template<typename Iter>
void mergesort_mt2(Iter begin, Iter end,
                   unsigned int N = std::thread::hardware_concurrency())
{
    auto len = std::distance(begin, end);
    if (len <= 1024)
    {
        std::sort(begin,end);
        return;
    }

    Iter mid = std::next(begin, len/2);
    if (N > 1)
    {
        auto fn = std::async(mergesort_mt2<Iter>, begin, mid, N-2);
        mergesort_mt2(mid, end, N-2);
        fn.wait();
    }
    else
    {
        mergesort_mt2(begin, mid, 0);
        mergesort_mt2(mid, end, 0);
    }

    std::inplace_merge(begin, mid, end);
}

在将新的测试用例添加到测试程序后,我们得到:

产量

Generating...
std::sort: 1930ms
mergesort_mt1: 1695ms
mergesort_mt2: 998ms
Generating...
std::sort: 1854ms
mergesort_mt1: 1573ms
mergesort_mt2: 1030ms
Generating...
std::sort: 1867ms
mergesort_mt1: 1584ms
mergesort_mt2: 1005ms
Generating...
std::sort: 1862ms
mergesort_mt1: 1589ms
mergesort_mt2: 1001ms
Generating...
std::sort: 1847ms
mergesort_mt1: 1578ms
mergesort_mt2: 1009ms

好。 现在我们看到了一些令人印象深刻 但是我们能挤得更多吗?

并行合并+标准排序w /有限递归

如果你想一想,为了充分利用所有这些努力工作std::sort ,我们可以简单地停止递归,一旦我们达到完整的线程人口。 如果发生这种情况,只需要std::sort ,并在完成时将它们合并在一起。 难以相信,这实际上会降低代码的复杂性。 我们的算法变成了简单地在核心之间扩展分区的一个,每一个都由std::sort在时间到的时候处理:

template<typename Iter>
void mergesort_mt3(Iter begin, Iter end,
                   unsigned int N = std::thread::hardware_concurrency()/2)
{
    auto len = std::distance(begin, end);
    if (len <= 1024 || N < 2)
    {
        std::sort(begin,end);
        return;
    }

    Iter mid = std::next(begin, len/2);
    auto fn = std::async(mergesort_mt3<Iter>, begin, mid, N-2);
    mergesort_mt3(mid, end, N-2);
    fn.wait();
    std::inplace_merge(begin, mid, end);
}

再次加入我们的测试循环之后...

产量

Generating...
std::sort: 1911ms
mergesort_mt1: 1656ms
mergesort_mt2: 1006ms
mergesort_mt3: 802ms
Generating...
std::sort: 1854ms
mergesort_mt1: 1588ms
mergesort_mt2: 1008ms
mergesort_mt3: 806ms
Generating...
std::sort: 1836ms
mergesort_mt1: 1580ms
mergesort_mt2: 1017ms
mergesort_mt3: 806ms
Generating...
std::sort: 1843ms
mergesort_mt1: 1583ms
mergesort_mt2: 1006ms
mergesort_mt3: 853ms
Generating...
std::sort: 1855ms
mergesort_mt1: 1589ms
mergesort_mt2: 1012ms
mergesort_mt3: 798ms

正如所写的,对于任何1024个项目或更小的分区,我们只是委托给std::sort 。 如果分区较大 ,我们引入一个新线程来处理拆分分区的一端,使用当前线程来处理另一个分区。 一旦我们使线程的限制N达到饱和,我们就停止分割,并且简单地把所有的东西都委托给std::sort 。 总之,我们是一个多线程分发前端std::sort

概要

我们还可以发射更多的子弹(使用一些元编程并假定一个固定的并发池号),但是我会留给你。

如果您只专注于分区,分配到线程直到分配,为分区分区使用高度优化的分类算法,然后将所有内容缝合在一起以完成作业,则可以显着提高分类性能。 还有改进的余地吗? 当然。 但是在上面提到的最简单的形式中,没有锁定,没有互斥锁等。最后一个样本和裸std::sort之间的区别是,在一个微小的MacBook Air上,相同的数据集上的改善了58%,4GB RAM和一个双核心i7处理器。 这是令人印象深刻的,并考虑到它只需要很少的代码,只是简单的f'ing 真棒





mergesort