What is the most efficient way signal to multiple consumer threads that there is data available from a single producer thread?

I have a data source which will provide data regularly for N listeners.

I can control access to the data using a ReaderWriterLock but what should be used to signal all listeners that they can simultaneously acquire a copy of that data for their own processing needs?

I was initially thinking an AutoResetEvent might do it but from what I've read that would only signal a single thread to go ahead and read?

Edit - I tried the following but without a Sleep in the main thread nothing gets triggered. Why is that?

// TriggeringMultipleThreads.cpp : main project file.

#include "stdafx.h"

using namespace System;
using namespace System::Threading;

static void Report(String^ caller, String^ message)
{
    DateTime^ now = DateTime::UtcNow;
    Console::WriteLine(String::Format("{0}: {1}: {2}", now->Ticks, caller, message));
}

static void ThreadProc(Object^ args)
{
    try
    {
        Collections::Generic::List<Object^>^ params = dynamic_cast<Collections::Generic::List<Object^>^>(args);
        String^ threadName = dynamic_cast<String^>(params[0]);
        int pauseBetweenLocks = static_cast<int>(params[1]);
        ReaderWriterLock^ masterLock = dynamic_cast<ReaderWriterLock^>(params[2]);
        EventWaitHandle^ newDataSignal = dynamic_cast<EventWaitHandle^>(params[3]);
        while (true)
        {
            Report(threadName, "waiting for new data signal");
            bool newDataAvailable = newDataSignal->WaitOne();
            if (newDataAvailable == true)
            {
                Report(threadName, "acquiring reader lock");
                masterLock->AcquireReaderLock(1000);
                Report(threadName, "got data lock");
                masterLock->ReleaseReaderLock();
                Report(threadName, "released data lock");
                if (pauseBetweenLocks > 0)
                {
                    Report(threadName, String::Format("sleeping for {0}ms", pauseBetweenLocks/2));
                    Thread::Sleep(pauseBetweenLocks/2);
                    Report(threadName, String::Format("sleeping for {0}ms", pauseBetweenLocks/2));
                    Thread::Sleep(pauseBetweenLocks/2);
                }
            }
            else
            {
                Report(threadName, String::Format("failed to find new data"));
            }
        }
    }
    catch(Exception^ e)
    {
        Report("Thread", String::Format("Thread died: {0}", e->Message));
    }
}

Thread^ CreateAndStartThread(String^ threadName, int delay, ReaderWriterLock^ dataLock, EventWaitHandle^ newDataSignal)
{
    Thread^ t1 = gcnew Thread(gcnew ParameterizedThreadStart(&ThreadProc));
    Collections::Generic::List<Object^>^ t1Params = gcnew Collections::Generic::List<Object^>(2);
    t1Params->Add(threadName);
    t1Params->Add(delay);
    t1Params->Add(dataLock);
    t1Params->Add(newDataSignal);
    t1->Start(t1Params);
    return t1;
}

int main(array<System::String ^> ^args)
{
    ReaderWriterLock^ masterLock = gcnew ReaderWriterLock();
    EventWaitHandle^ newDataSignal = gcnew ManualResetEvent(false);

    Thread^ t1 = CreateAndStartThread("Fast thread1",   0, masterLock, newDataSignal);
    Thread^ t2 = CreateAndStartThread("Slow thread", 100, masterLock, newDataSignal);
    Thread^ t3 = CreateAndStartThread("Fast thread2",   0, masterLock, newDataSignal);
    String^ masterThreadName = "Master";

    while (true)
    {
        Report(masterThreadName, "Ready");
        Console::ReadLine();

        Console::WriteLine();
        Report(masterThreadName, "Master thread acquiring lock");
        masterLock->AcquireWriterLock(1000);
        Report(masterThreadName, "signalling listeners");
        bool signalStarted = newDataSignal->Set();

        Thread::Sleep(1); // <---- Why is this required?

        bool signalReset = newDataSignal->Reset();
        Report(masterThreadName, String::Format("signalling listeners ({0}, {1})", signalStarted, signalReset));
        Report(masterThreadName, "releasing lock");
        masterLock->ReleaseWriterLock();
        Report(masterThreadName, "released lock");
    }

    return 0;
}

Running that gives me (for example):

635364403105485943: Master: Ready
635364403105485943: Fast thread1: waiting for new data signal
635364403105485943: Fast thread2: waiting for new data signal
635364403105485943: Slow thread: waiting for new data signal

635364403132598654: Master: Master thread acquiring lock
635364403132598654: Master: signalling listeners
635364403132598654: Fast thread1: acquiring reader lock
635364403132598654: Slow thread: acquiring reader lock
635364403132608655: Master: signalling listeners (True, True)
635364403132608655: Master: releasing lock
635364403132618656: Master: released lock
635364403132618656: Fast thread1: got data lock
635364403132618656: Slow thread: got data lock
635364403132618656: Slow thread: released data lock
635364403132618656: Slow thread: sleeping for 50ms
635364403132618656: Master: Ready
635364403132598654: Fast thread2: acquiring reader lock
635364403132618656: Fast thread2: got data lock
635364403132628657: Fast thread2: released data lock
635364403132628657: Fast thread2: waiting for new data signal
635364403132618656: Fast thread1: released data lock
635364403132638658: Fast thread1: waiting for new data signal
635364403133118706: Slow thread: sleeping for 50ms
635364403133618756: Slow thread: waiting for new data signal

If I comment out Thread::Sleep(1); in the master thread none of the other threads fire.

Jon Skeet
people
quotationmark

You could use a ManualResetEvent instead... but I would instead encourage you to use a higher level abstraction. Consider whether your task may be appropriate for the Dataflow API - or possibly use a BlockingCollection for each listener, and add the item into each collection when the producer creates one. (You might have one "tee-ing" listener to sit between a single BlockingCollection that the producer knows about, and the listeners. At that point you're really reinventing the wheel though...)

Oh, and it's possible that Reactive Extensions might be a good fit for you too.

Basically, do everything you can to avoid going as low level as *ResetEvent and ReaderWriterLock :)

people

See more on this question at Stackoverflow