dds::core::GuardCondition Memory Growth When Using Multiple Threads

4 posts / 0 new
Last post
Offline
Last seen: 5 years 6 months ago
Joined: 10/29/2015
Posts: 12
dds::core::GuardCondition Memory Growth When Using Multiple Threads

Hi all,

Consider the following test program compiled against rti 5.2.0 on an x86-64 platform:

#include <dds/core/cond/GuardCondition.hpp>
#include <future>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>

struct Test
{
  dds::core::cond::GuardCondition m_guard_condition;
  mutable std::mutex m_mutex;

  void on()
  {
    std::unique_lock<std::mutex> lock(m_mutex);
    m_guard_condition.trigger_value(true);
  }

  void off()
  {
    std::unique_lock<std::mutex> lock(m_mutex);
    m_guard_condition.trigger_value(false);
  }
};

int main(int argc, char **argv)
{
  int loops = 10000;
  if (argc > 1)
  {
    loops = std::stoi(std::string(argv[1]));
  }
  std::cout << "Executing " << loops << " loops" << std::endl;
  Test t;
  for (int i = 0; i < loops; ++i)
  {
    t.on();
    auto r = std::async(std::launch::async, std::bind(&Test::off, &t)).share();
    r.get();
  }
}

In the loop body, it sets the GuardCondition's trigger value. Then it spawns a new thread which unsets the trigger value again and terminates. Every single time the trigger value is accessed from a new thread, RTI allocates more memory internally and never frees it up again. Running the test programm with different loop counts and unleashing Valgrind on it shows:

$ valgrind guard-condition-test 10000
==3219== Memcheck, a memory error detector
==3219== Copyright (C) 2002-2013, and GNU GPL'd, by Julian Seward et al.
==3219== Using Valgrind-3.10.0 and LibVEX; rerun with -h for copyright info
==3219== Command: guard-condition-test
==3219== 
Executing 10000 loops
==3219== 
==3219== HEAP SUMMARY:
==3219==     in use at exit: 83,208,868 bytes in 30,425 blocks
==3219==   total heap usage: 70,462 allocs, 40,037 frees, 86,019,620 bytes allocated
==3219== 
==3219== LEAK SUMMARY:
==3219==    definitely lost: 0 bytes in 0 blocks
==3219==    indirectly lost: 0 bytes in 0 blocks
==3219==      possibly lost: 23,286 bytes in 121 blocks
==3219==    still reachable: 83,185,582 bytes in 30,304 blocks
==3219==         suppressed: 0 bytes in 0 blocks
==3219== Rerun with --leak-check=full to see details of leaked memory
==3219== 
==3219== For counts of detected and suppressed errors, rerun with: -v
==3219== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

$ valgrind guard-condition-test 20000
==13790== Memcheck, a memory error detector
==13790== Copyright (C) 2002-2013, and GNU GPL'd, by Julian Seward et al.
==13790== Using Valgrind-3.10.0 and LibVEX; rerun with -h for copyright info
==13790== Command: guard-condition-test 20000
==13790== 
Executing 20000 loops
==13790== 
==13790== HEAP SUMMARY:
==13790==     in use at exit: 166,348,870 bytes in 60,425 blocks
==13790==   total heap usage: 140,463 allocs, 80,038 frees, 171,959,652 bytes allocated
==13790== 
==13790== LEAK SUMMARY:
==13790==    definitely lost: 0 bytes in 0 blocks
==13790==    indirectly lost: 0 bytes in 0 blocks
==13790==      possibly lost: 23,286 bytes in 121 blocks
==13790==    still reachable: 166,325,584 bytes in 60,304 blocks
==13790==         suppressed: 0 bytes in 0 blocks
==13790== Rerun with --leak-check=full to see details of leaked memory
==13790== 
==13790== For counts of detected and suppressed errors, rerun with: -v
==13790== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

As you can see, running twice as many iterations doubles the memory usage of the test program. Looking into the details (--leak-check=full --show-leak-kinds=all), we see the main culprit is somewhere in here (this one is from the run with 10,000 loop iterations):

==2299== 81,920,000 bytes in 10,000 blocks are still reachable in loss record 175 of 175
==2299==    at 0x4C2AD10: calloc (vg_replace_malloc.c:623)
==2299==    by 0x615B573: RTIOsapiHeap_reallocateMemoryInternal (in /opt/rti_connext_dds-5.2.0/lib/x64Linux3.xgcc4.6.3/libnddscore.so)
==2299==    by 0x6149BF1: REDAWorkerFactory_createWorker (in /opt/rti_connext_dds-5.2.0/lib/x64Linux3.xgcc4.6.3/libnddscore.so)
==2299==    by 0x57F473A: DDS_DomainParticipantGlobals_get_worker_per_threadI (in /opt/rti_connext_dds-5.2.0/lib/x64Linux3.xgcc4.6.3/libnddsc.so)
==2299==    by 0x57EF27C: DDS_DomainParticipantFactory_get_workerI (in /opt/rti_connext_dds-5.2.0/lib/x64Linux3.xgcc4.6.3/libnddsc.so)
==2299==    by 0x5743BAF: DDS_Condition_get_workerI (in /opt/rti_connext_dds-5.2.0/lib/x64Linux3.xgcc4.6.3/libnddsc.so)
==2299==    by 0x5765BAD: DDS_GuardCondition_set_trigger_value (in /opt/rti_connext_dds-5.2.0/lib/x64Linux3.xgcc4.6.3/libnddsc.so)
==2299==    by 0x50D6BBC: rti::core::cond::GuardCondition::trigger_value(bool) (in /opt/rti_connext_dds-5.2.0/lib/x64Linux3.xgcc4.6.3/libnddscpp2.so)
==2299==    by 0x407C61: trigger_value (TGuardCondition.hpp:110)
==2299==    by 0x407C61: Test::off() (guard-condition-test.cpp:22)
==2299==    by 0x406779: operator() (functional:2439)
==2299==    by 0x406779: operator() (future:1264)
==2299==    by 0x406779: std::_Function_handler<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> (), std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<void>, std::__future_base::_Result_base::_Deleter>, void> >::_M_invoke(std::_Any_data const&) (functional:2025)
==2299==    by 0x406AE1: operator() (functional:2439)
==2299==    by 0x406AE1: std::__future_base::_State_baseV2::_M_do_set(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>&, bool&) (future:485)
==2299==    by 0x4E4244F: pthread_once (pthread_once.S:103)

My questions:

  • Is this a bug, or am I using dds::core::GuardCondition in a way that I shouldn't be using it?
  • If it's a bug, how can I work around it?
  • If I'm using the GuardCondition wrong, how do I have to use it instead? Do I need to confine all access to the trigger_value into a single thread, and if so, why?
  • I am using the GuardConditions in some application-level wrappers I wrote which simplify the interaction with some of my DDS Topics. The GuardConditions allow me to use my wrappers alongside other ReadConditions inside a single WaitSet. Maybe I should be using something other than GuardConditions?

Thanks very much for your help.

Keywords:
Offline
Last seen: 2 years 8 months ago
Joined: 08/09/2017
Posts: 25

This may be an issue of thread resources not getting release. Please take a look at https://community.rti.com/kb/resources-are-not-cleaned-when-write-called-spawned-thread.  Does adding the call to unregister_thread() resolve the issue?

Offline
Last seen: 10 hours 18 sec ago
Joined: 04/02/2013
Posts: 195

Note that the modern C++ API doesn't include the unregister_thread() function yet (it will be available in the next release), so you need to call the C API directy, before leaving the thread, as follows:

DDS_DomainParticipantFactory_unregister_thread(DDS_DomainParticipantFactory_get_instance());

 

Offline
Last seen: 5 years 6 months ago
Joined: 10/29/2015
Posts: 12

Alejandro,

thanks very much! I found out about unregister_thread() recently, but as I couldn't find it in the modern C++ API I assumed it simply wasn't needed there anymore.

Changing my std::async call to the following fixed the issue:

    auto r = std::async(std::launch::async,
                        [&]() {
                          t.off();
                          DDS_DomainParticipantFactory_unregister_thread(
                            DDS_DomainParticipantFactory_get_instance());
                        })
               .share();
    r.get();

Now for the ugly part: Finding all threads in my software that may be suffering from this, and developing a foolproof way to ensure they all unregister. It appears to be safe to call unregister_thread() on a thread that doesn't actually ever use the DDS API, so I should be good writing a small RAII-style wrapper which calls this function for me in its destructor.

Thanks very much for your help!