共享内存 IPC 同步(无锁)

Shared-memory IPC synchronization (lock-free)(共享内存 IPC 同步(无锁))
本文介绍了共享内存 IPC 同步(无锁)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Consider the following scenario:

Requirements:

  • Intel x64 Server (multiple CPU-sockets => NUMA)
  • Ubuntu 12, GCC 4.6
  • Two processes sharing large amounts of data over (named) shared-memory
  • Classical producer-consumer scenario
  • Memory is arranged in a circular buffer (with M elements)

Program sequence (pseudo code):

Process A (Producer):

int bufferPos = 0;
while( true )
{
    if( isBufferEmpty( bufferPos ) )
    {
        writeData( bufferPos );
        setBufferFull( bufferPos );

        bufferPos = ( bufferPos + 1 ) % M;
    }
}

Process B (Consumer):

int bufferPos = 0;
while( true )
{
    if( isBufferFull( bufferPos ) )
    {
        readData( bufferPos );
        setBufferEmpty( bufferPos );

        bufferPos = ( bufferPos + 1 ) % M;
    }
}

Now the age-old question: How to synchronize them effectively!?

  1. Protect every read/write access with mutexes
  2. Introduce a "grace period", to allow writes to complete: Read data in buffer N, when buffer(N+3) has been marked as full (dangerous, but seems to work...)
  3. ?!?

Ideally I would like something along the lines of a memory-barrier, that guarantees that all previous reads/writes are visible across all CPUs, along the lines of:

writeData( i );
MemoryBarrier();

//All data written and visible, set flag
setBufferFull( i );

This way, I would only have to monitor the buffer flags and then could read the large data chunks safely.

Generally I'm looking for something along the lines of acquire/release fences as described by Preshing here:

http://preshing.com/20130922/acquire-and-release-fences/

(if I understand it correctly the C++11 atomics only work for threads of a single process and not along multiple processes.)

However the GCC-own memory barriers (__sync_synchronize in combination with the compiler barrier asm volatile( "" ::: "memory" ) to be sure) don't seem to work as expected, as writes become visible after the barrier, when I expected them to be completed.

Any help would be appreciated...

BTW: Under windows this just works fine using volatile variables (a Microsoft specific behaviour)...

解决方案

Boost Interprocess has support for Shared Memory.

Boost Lockfree has a Single-Producer Single-Consumer queue type (spsc_queue). This is basically what you refer to as a circular buffer.

Here's a demonstration that passes IPC messages (in this case, of type string) using this queue, in a lock-free fashion.

Defining the types

First, let's define our types:

namespace bip = boost::interprocess;
namespace shm
{
    template <typename T>
        using alloc = bip::allocator<T, bip::managed_shared_memory::segment_manager>;

    using char_alloc    =  alloc<char>;
    using shared_string =  bip::basic_string<char, std::char_traits<char>, char_alloc >;
    using string_alloc  =  alloc<shared_string>;

    using ring_buffer = boost::lockfree::spsc_queue<
        shared_string, 
        boost::lockfree::capacity<200> 
        // alternatively, pass
        // boost::lockfree::allocator<string_alloc>
    >;
}

For simplicity I chose to demo the runtime-size spsc_queue implementation, randomly requesting a capacity of 200 elements.

The shared_string typedef defines a string that will transparently allocate from the shared memory segment, so they are also "magically" shared with the other process.

The consumer side

This is the simplest, so:

int main()
{
    // create segment and corresponding allocator
    bip::managed_shared_memory segment(bip::open_or_create, "MySharedMemory", 65536);
    shm::string_alloc char_alloc(segment.get_segment_manager());

    shm::ring_buffer *queue = segment.find_or_construct<shm::ring_buffer>("queue")();

This opens the shared memory area, locates the shared queue if it exists. NOTE This should be synchronized in real life.

Now for the actual demonstration:

while (true)
{
    std::this_thread::sleep_for(std::chrono::milliseconds(10));

    shm::shared_string v(char_alloc);
    if (queue->pop(v))
        std::cout << "Processed: '" << v << "'
";
}

The consumer just infinitely monitors the queue for pending jobs and processes one each ~10ms.

The Producer side

The producer side is very similar:

int main()
{
    bip::managed_shared_memory segment(bip::open_or_create, "MySharedMemory", 65536);
    shm::char_alloc char_alloc(segment.get_segment_manager());

    shm::ring_buffer *queue = segment.find_or_construct<shm::ring_buffer>("queue")();

Again, add proper synchronization to the initialization phase. Also, you would probably make the producer in charge of freeing the shared memory segment in due time. In this demonstration, I just "let it hang". This is nice for testing, see below.

So, what does the producer do?

    for (const char* s : { "hello world", "the answer is 42", "where is your towel" })
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(250));
        queue->push({s, char_alloc});
    }
}

Right, the producer produces precisely 3 messages in ~750ms and then exits.

Note that consequently if we do (assume a POSIX shell with job control):

./producer& ./producer& ./producer&
wait

./consumer&

Will print 3x3 messages "immediately", while leaving the consumer running. Doing

./producer& ./producer& ./producer&

again after this, will show the messages "trickle in" in realtime (in burst of 3 at ~250ms intervals) because the consumer is still running in the background

See the full code online in this gist: https://gist.github.com/sehe/9376856

这篇关于共享内存 IPC 同步(无锁)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

Rising edge interrupt triggering multiple times on STM32 Nucleo(在STM32 Nucleo上多次触发上升沿中断)
How to use va_list correctly in a sequence of wrapper functions calls?(如何在一系列包装函数调用中正确使用 va_list?)
OpenGL Perspective Projection Clipping Polygon with Vertex Outside Frustum = Wrong texture mapping?(OpenGL透视投影裁剪多边形,顶点在视锥外=错误的纹理映射?)
How does one properly deserialize a byte array back into an object in C++?(如何正确地将字节数组反序列化回 C++ 中的对象?)
What free tiniest flash file system could you advice for embedded system?(您可以为嵌入式系统推荐什么免费的最小闪存文件系统?)
Volatile member variables vs. volatile object?(易失性成员变量与易失性对象?)