c# - لماذا ينتج Monitor.PulseAll نمط زمن انتقال "خطوة خطوة" في مؤشرات الترابط المشار إليها؟



.net multithreading (2)

في مكتبة تستخدم Monitor.PulseAll () لمزامنة مؤشر الترابط ، لاحظت أن زمن الانتقال من وقت استدعاء PulseAll (...) إلى الوقت الذي يتم فيه تنشيط مؤشر الترابط يبدو أنه يتبع توزيع "درج خطوة" - للغاية خطوات كبيرة. المواضيع المستيقظة لا تقوم بأي عمل تقريبًا ؛ وعلى الفور تقريبا العودة إلى الانتظار على الشاشة. على سبيل المثال ، في مربع به 12 مركزًا مع 24 مؤشرًا ينتظر على الشاشة (2 × Xeon5680 / Gulftown ؛ 6 مراكز أساسية لكل معالج ؛ HT Disabled) ، يكون زمن الانتقال بين النبضة والاستيقاظ على النحو التالي:

أول 12 سلسلة (لاحظ أن لدينا 12 نوى) تأخذ ما بين 30 و 60 ميكروثانية للرد. ثم نبدأ في الحصول على القفزات الكبيرة جدًا ؛ مع الهضاب حوالي 700 ، 1300 ، 1900 ، و 2600 ميكروثانية.

تمكنت من إعادة إنشاء هذا السلوك بنجاح بشكل مستقل عن مكتبة الطرف الثالث باستخدام الكود أدناه. ما تفعله هذه الشفرة هو إطلاق عدد كبير من مؤشرات الترابط (تغيير معلمة numThreads) التي فقط انتظر على شاشة ، وقراءة الطابع الزمني ، تسجيل الدخول إلى ConcurrentSet ، ثم العودة على الفور إلى الانتظار. مرة واحدة يستيقظ PulseAll () الثاني كافة مؤشرات الترابط. يتم ذلك 20 مرة ، ويبلغ الكمون للتكرار العاشر لوحدة التحكم.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Diagnostics;

namespace PulseAllTest
{
    class Program
    {
        static long LastTimestamp;
        static long Iteration;
        static object SyncObj = new object();
        static Stopwatch s = new Stopwatch();
        static ConcurrentBag<Tuple<long, long>> IterationToTicks = new ConcurrentBag<Tuple<long, long>>();

        static void Main(string[] args)
        {
            long numThreads = 32;

            for (int i = 0; i < numThreads; ++i)
            {
                Task.Factory.StartNew(ReadLastTimestampAndPublish, TaskCreationOptions.LongRunning);
            }

            s.Start();
            for (int i = 0; i < 20; ++i)
            {
                lock (SyncObj)
                {
                    ++Iteration;
                    LastTimestamp = s.Elapsed.Ticks;
                    Monitor.PulseAll(SyncObj);
                }
                Thread.Sleep(TimeSpan.FromSeconds(1));
            }

            Console.WriteLine(String.Join("\n",
                from n in IterationToTicks where n.Item1 == 10 orderby n.Item2 
                    select ((decimal)n.Item2)/TimeSpan.TicksPerMillisecond));
            Console.Read();
        }

        static void ReadLastTimestampAndPublish()
        {
            while(true)
            {
                lock(SyncObj)
                {
                    Monitor.Wait(SyncObj);
                }
                IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp));
            }
        }
    }
}

باستخدام الكود أعلاه ، إليك مثال على حالات الاستتار الموجودة في مربع مزود بفرط تشعبي 8 نوى / ث ممكّن (أي 16 نوى في إدارة المهام) و 32 سلسلة (* 2x Xeon5550 / Gainestown ؛ 4 نوى فعلية لكل معالج ؛ HT ممكّن):

EDIT: لمحاولة إخراج NUMA من المعادلة ، يوجد أدناه رسم بياني يشغّل برنامج العينة مع 16 مؤشر ترابط على Core i7-3770 (Ivy Bridge) ؛ 4 النوى المادية ؛ تم تمكين HT:

هل يستطيع أحد أن يشرح لماذا يتصرف Monitor.PulseAll () بهذه الطريقة؟

EDIT2:

لمحاولة إظهار أن هذا السلوك ليس متأصلاً لاستيقاظ مجموعة من الخيوط في نفس الوقت ، قمت بتكرار سلوك برنامج الاختبار باستخدام الأحداث ؛ وبدلاً من قياس زمن انتقال PulseAll () أقوم بقياس زمن انتقال ManualResetEvent.Set (). تقوم التعليمة البرمجية بإنشاء عدد من مؤشرات ترابط العامل ثم انتظار حدث ManualResetEvent.Set () على نفس كائن ManualResetEvent. عندما يتم تشغيل الحدث ، فإنهم يأخذون مقياس زمن الوصول ثم ينتظرون فورًا على AutoResetEvent الفردي لكل مؤشر ترابط. قبل التكرار التالي (500 مللي ثانية من قبل) ، يتم إعادة تعيين ManualResetEvent () ثم يتم ضبط كل AutoResetEvent () بحيث يمكن العودة إلى الانتظار على ManualResetEvent المشتركة.

لقد ترددت في نشر هذا لأنه يمكن أن يكون جلسة استماع حمراء عملاقة (لا أدعي أي أحداث وأن المراقبين يتصرفون بشكل مشابه) بالإضافة إلى أنه يستخدم بعض الممارسات الفظيعة للغاية للحصول على حدث يتصرف وكأنه مراقب (أحب / أكره أن أرى ما زملاء العمل سيفعلون إذا قدمت هذا لمراجعة الكود) ؛ لكنني أعتقد أن النتائج مفيدة.

تم هذا الاختبار على نفس جهاز الاختبار الأصلي ؛ 2xXeon5680 / Gulftown؛ 6 نوى لكل معالج (إجمالي 12 نوى) ؛ Hyperthreading معطل.

إذا لم يكن واضحًا كيف يختلف هذا اختلافًا جذريًا عن Monitor.PulseAll ؛ هنا هو أول رسم بياني متراكب على الرسم البياني الأخير:

الكود المستخدم لإنشاء هذه القياسات هو أدناه:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Diagnostics;

namespace MRETest
{
    class Program
    {
        static long LastTimestamp;
        static long Iteration;
        static ManualResetEventSlim MRES = new ManualResetEventSlim(false);
        static List<ReadLastTimestampAndPublish> Publishers = 
            new List<ReadLastTimestampAndPublish>();
        static Stopwatch s = new Stopwatch();
        static ConcurrentBag<Tuple<long, long>> IterationToTicks = 
            new ConcurrentBag<Tuple<long, long>>();

        static void Main(string[] args)
        {
            long numThreads = 24;
            s.Start();

            for (int i = 0; i < numThreads; ++i)
            {
                AutoResetEvent ares = new AutoResetEvent(false);
                ReadLastTimestampAndPublish spinner = new ReadLastTimestampAndPublish(
                    new AutoResetEvent(false));
                Task.Factory.StartNew(spinner.Spin, TaskCreationOptions.LongRunning);
                Publishers.Add(spinner);
            }

            for (int i = 0; i < 20; ++i)
            {
                ++Iteration;
                LastTimestamp = s.Elapsed.Ticks;
                MRES.Set();
                Thread.Sleep(500);
                MRES.Reset();
                foreach (ReadLastTimestampAndPublish publisher in Publishers)
                {
                    publisher.ARES.Set();
                }
                Thread.Sleep(500);
            }

            Console.WriteLine(String.Join("\n",
                from n in IterationToTicks where n.Item1 == 10 orderby n.Item2
                    select ((decimal)n.Item2) / TimeSpan.TicksPerMillisecond));
            Console.Read();
        }

        class ReadLastTimestampAndPublish
        {
            public AutoResetEvent ARES { get; private set; }

            public ReadLastTimestampAndPublish(AutoResetEvent ares)
            {
                this.ARES = ares;
            }

            public void Spin()
            {
                while (true)
                {
                    MRES.Wait();
                    IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp));
                    ARES.WaitOne();
                }
            }
        }
    }
}

https://src-bin.com


Answer #1

أحد الاختلافات بين هذه النسخة هو أنه في حالة PulseAll - تكرر مؤشرات الترابط الحلقة فورًا وتغلق الكائن مرة أخرى.

لديك 12 مركزًا ، لذا يتم تشغيل 12 مؤشر ترابط وتنفيذ الحلقة وإدخال الحلقة مرة أخرى وتأمين الكائن (واحد تلو الآخر) ثم إدخال حالة الانتظار. كل ذلك الوقت المواضيع الأخرى الانتظار. في حالة ManualEvent لديك حدثان ، لذلك لا تكرر مؤشرات الترابط الحلقة على الفور ، ولكن يتم حظرها في أحداث ARES بدلاً من ذلك - وهذا يتيح لمؤشرات الترابط الأخرى الحصول على ملكية القفل بشكل أسرع.

لقد قمت بمحاكاة سلوك مشابه في PulseAll عن طريق إضافة السكون في نهاية الحلقة في ReadLastTimestampAndPublish. يتيح هذا لمؤشر الترابط الآخر قفل syncObj بشكل أسرع ويبدو أنه يحسن الأرقام التي أحصل عليها من البرنامج.

static void ReadLastTimestampAndPublish()
{
    while(true)
    {
        lock(SyncObj)
        {
            Monitor.Wait(SyncObj);
        }
        IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp));
        Thread.Sleep(TimeSpan.FromMilliseconds(100));   // <===
    }
}

Answer #2

لتبدأ ، هذه ليست إجابة ، مجرد ملاحظاتي من النظر إلى SSCLI لمعرفة بالضبط ما يجري. معظم هذا أعلى بكثير من رأسي ، لكنه مثير للاهتمام رغم ذلك.

تبدأ الرحلة إلى أسفل فتحة الأرانب بدعوة إلى Monitor.PulseAll ، والتي يتم تنفيذها في C #:

clr\src\bcl\system\threading\monitor.cs :

namespace System.Threading
{
    public static class Monitor 
    {
        // other methods omitted

        [MethodImplAttribute(MethodImplOptions.InternalCall)]
        private static extern void ObjPulseAll(Object obj);

        public static void PulseAll(Object obj)
        {
            if (obj==null) {
                throw new ArgumentNullException("obj");
            }

            ObjPulseAll(obj);
        } 
    }
}

الحصول على clr\src\vm\ecall.cpp أساليب InternalCall في clr\src\vm\ecall.cpp :

FCFuncStart(gMonitorFuncs)
    FCFuncElement("Enter", JIT_MonEnter)
    FCFuncElement("Exit", JIT_MonExit)
    FCFuncElement("TryEnterTimeout", JIT_MonTryEnter)
    FCFuncElement("ObjWait", ObjectNative::WaitTimeout)
    FCFuncElement("ObjPulse", ObjectNative::Pulse)
    FCFuncElement("ObjPulseAll", ObjectNative::PulseAll)
    FCFuncElement("ReliableEnter", JIT_MonReliableEnter)
FCFuncEnd()

ObjectNative في clr\src\vm\comobject.cpp :

FCIMPL1(void, ObjectNative::PulseAll, Object* pThisUNSAFE)
{
    CONTRACTL
    {
        MODE_COOPERATIVE;
        DISABLED(GC_TRIGGERS);  // can't use this in an FCALL because we're in forbid gc mode until we setup a H_M_F.
        THROWS;
        SO_TOLERANT;
    }
    CONTRACTL_END;

    OBJECTREF pThis = (OBJECTREF) pThisUNSAFE;
    HELPER_METHOD_FRAME_BEGIN_1(pThis);
    //-[autocvtpro]-------------------------------------------------------

    if (pThis == NULL)
        COMPlusThrow(kNullReferenceException, L"NullReference_This");

    pThis->PulseAll();

    //-[autocvtepi]-------------------------------------------------------
    HELPER_METHOD_FRAME_END();
}
FCIMPLEND

OBJECTREF هو بعض السحر الذي تم رشه أعلى Object (المشغل -> OBJECTREF->PulseAll() بشكل زائد) ، لذلك OBJECTREF->PulseAll() هو في الواقع Object->PulseAll() الذي يتم تنفيذه في clr\src\vm\object.h اتصل بـ ObjHeader->PulseAll :

class Object
{
  // snip   
  public:
  // snip
    ObjHeader   *GetHeader()
    {
        LEAF_CONTRACT;
        return PTR_ObjHeader(PTR_HOST_TO_TADDR(this) - sizeof(ObjHeader));
    }
  // snip
    void PulseAll()
    {
        WRAPPER_CONTRACT;
        GetHeader()->PulseAll();
    }
  // snip
}

ObjHeader::PulseAll باسترداد SyncBlock ، والذي يستخدم AwareLock لإدخال ing AwareLock القفل على الكائن. AwareLock ( clr\src\vm\syncblk.cpp ) CLREvent ( clr\src\vm\synch.cpp ) تم إنشاؤه كـ CLREvent::CreateMonitorEvent(SIZE_T) ( CLREvent::CreateMonitorEvent(SIZE_T) ) ، والذي يستدعي UnsafeCreateEvent ( clr\src\inc\unsafe.h UnsafeCreateEvent clr\src\inc\unsafe.h ) أو أساليب التزامن بيئة الاستضافة.

clr\src\vm\syncblk.cpp :

void ObjHeader::PulseAll()
{
    CONTRACTL
    {
        INSTANCE_CHECK;
        THROWS;
        GC_TRIGGERS;
        MODE_ANY;
        INJECT_FAULT(COMPlusThrowOM(););
    }
    CONTRACTL_END;

    //  The following code may cause GC, so we must fetch the sync block from
    //  the object now in case it moves.
    SyncBlock *pSB = GetBaseObject()->GetSyncBlock();

    // GetSyncBlock throws on failure
    _ASSERTE(pSB != NULL);

    // make sure we own the crst
    if (!pSB->DoesCurrentThreadOwnMonitor())
        COMPlusThrow(kSynchronizationLockException);

    pSB->PulseAll();
}

void SyncBlock::PulseAll()
{
    CONTRACTL
    {
        INSTANCE_CHECK;
        NOTHROW;
        GC_NOTRIGGER;
        MODE_ANY;
    }
    CONTRACTL_END;

    WaitEventLink  *pWaitEventLink;

    while ((pWaitEventLink = ThreadQueue::DequeueThread(this)) != NULL)
        pWaitEventLink->m_EventWait->Set();
}

يستخدم crst ( clr\src\vm\crst.cpp ) وهو عبارة عن غلاف حول المقاطع الحرجة. m_EventWait هو دليل CLREvent .

لذلك ، كل هذا يستخدم بدائل OS ما لم يقم موفر الاستضافة الافتراضي بتجاوز الأشياء.





latency