////////////////////////////////////////////////////////////////////////////// // // (C) Copyright Ion Gaztanaga 2005-2011. Distributed under the Boost // Software License, Version 1.0. (See accompanying file // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // // See http://www.boost.org/libs/interprocess for documentation. // ////////////////////////////////////////////////////////////////////////////// #ifndef BOOST_INTERPROCESS_DETAIL_CONDITION_ALGORITHM_8A_HPP #define BOOST_INTERPROCESS_DETAIL_CONDITION_ALGORITHM_8A_HPP #include #include #include #include namespace boost { namespace interprocess { namespace ipcdetail { //////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// // // Condition variable algorithm taken from pthreads-win32 discussion. // // The algorithm was developed by Alexander Terekhov in colaboration with // Louis Thomas. // // Algorithm 8a / IMPL_SEM,UNBLOCK_STRATEGY == UNBLOCK_ALL // // semBlockLock - bin.semaphore // semBlockQueue - semaphore // mtxExternal - mutex or CS // mtxUnblockLock - mutex or CS // nWaitersGone - int // nWaitersBlocked - int // nWaitersToUnblock - int // // wait( timeout ) { // // [auto: register int result ] // error checking omitted // [auto: register int nSignalsWasLeft ] // [auto: register int nWaitersWasGone ] // // sem_wait( semBlockLock ); // nWaitersBlocked++; // sem_post( semBlockLock ); // // unlock( mtxExternal ); // bTimedOut = sem_wait( semBlockQueue,timeout ); // // lock( mtxUnblockLock ); // if ( 0 != (nSignalsWasLeft = nWaitersToUnblock) ) { // if ( bTimedOut ) { // timeout (or canceled) // if ( 0 != nWaitersBlocked ) { // nWaitersBlocked--; // } // else { // nWaitersGone++; // count spurious wakeups. // } // } // if ( 0 == --nWaitersToUnblock ) { // if ( 0 != nWaitersBlocked ) { // sem_post( semBlockLock ); // open the gate. // nSignalsWasLeft = 0; // do not open the gate // // below again. // } // else if ( 0 != (nWaitersWasGone = nWaitersGone) ) { // nWaitersGone = 0; // } // } // } // else if ( INT_MAX/2 == ++nWaitersGone ) { // timeout/canceled or // // spurious semaphore :-) // sem_wait( semBlockLock ); // nWaitersBlocked -= nWaitersGone; // something is going on here // // - test of timeouts? :-) // sem_post( semBlockLock ); // nWaitersGone = 0; // } // unlock( mtxUnblockLock ); // // if ( 1 == nSignalsWasLeft ) { // if ( 0 != nWaitersWasGone ) { // // sem_adjust( semBlockQueue,-nWaitersWasGone ); // while ( nWaitersWasGone-- ) { // sem_wait( semBlockQueue ); // better now than spurious later // } // } sem_post( semBlockLock ); // open the gate // } // // lock( mtxExternal ); // // return ( bTimedOut ) ? ETIMEOUT : 0; // } // // signal(bAll) { // // [auto: register int result ] // [auto: register int nSignalsToIssue] // // lock( mtxUnblockLock ); // // if ( 0 != nWaitersToUnblock ) { // the gate is closed!!! // if ( 0 == nWaitersBlocked ) { // NO-OP // return unlock( mtxUnblockLock ); // } // if (bAll) { // nWaitersToUnblock += nSignalsToIssue=nWaitersBlocked; // nWaitersBlocked = 0; // } // else { // nSignalsToIssue = 1; // nWaitersToUnblock++; // nWaitersBlocked--; // } // } // else if ( nWaitersBlocked > nWaitersGone ) { // HARMLESS RACE CONDITION! // sem_wait( semBlockLock ); // close the gate // if ( 0 != nWaitersGone ) { // nWaitersBlocked -= nWaitersGone; // nWaitersGone = 0; // } // if (bAll) { // nSignalsToIssue = nWaitersToUnblock = nWaitersBlocked; // nWaitersBlocked = 0; // } // else { // nSignalsToIssue = nWaitersToUnblock = 1; // nWaitersBlocked--; // } // } // else { // NO-OP // return unlock( mtxUnblockLock ); // } // // unlock( mtxUnblockLock ); // sem_post( semBlockQueue,nSignalsToIssue ); // return result; // } //////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// // Required interface for ConditionMembers // class ConditionMembers // { // typedef implementation_defined semaphore_type; // typedef implementation_defined mutex_type; // typedef implementation_defined integer_type; // // integer_type &get_nwaiters_blocked() // integer_type &get_nwaiters_gone() // integer_type &get_nwaiters_to_unblock() // semaphore_type &get_sem_block_queue() // semaphore_type &get_sem_block_lock() // mutex_type &get_mtx_unblock_lock() // }; // template class condition_algorithm_8a { private: condition_algorithm_8a(); ~condition_algorithm_8a(); condition_algorithm_8a(const condition_algorithm_8a &); condition_algorithm_8a &operator=(const condition_algorithm_8a &); typedef typename ConditionMembers::semaphore_type semaphore_type; typedef typename ConditionMembers::mutex_type mutex_type; typedef typename ConditionMembers::integer_type integer_type; // nwaiters_blocked == 0 // nwaiters_gone() == 0 // nwaiters_to_unblock == 0 // sem_block_queue() == initial count 0 // sem_block_lock() == initial count 1 // mtx_unblock_lock (unlocked) public: template static bool wait (ConditionMembers &data, bool timeout_enabled, const boost::posix_time::ptime &abs_time, InterprocessMutex &mut); static void signal(ConditionMembers &data, bool broadcast); }; template inline void condition_algorithm_8a::signal(ConditionMembers &data, bool broadcast) { integer_type nsignals_to_issue; { scoped_lock locker(data.get_mtx_unblock_lock()); if ( 0 != data.get_nwaiters_to_unblock() ) { // the gate is closed!!! if ( 0 == data.get_nwaiters_blocked() ) { // NO-OP //locker's destructor triggers data.get_mtx_unblock_lock().unlock() return; } if (broadcast) { data.get_nwaiters_to_unblock() += nsignals_to_issue = data.get_nwaiters_blocked(); data.get_nwaiters_blocked() = 0; } else { nsignals_to_issue = 1; data.get_nwaiters_to_unblock()++; data.get_nwaiters_blocked()--; } } else if ( data.get_nwaiters_blocked() > data.get_nwaiters_gone() ) { // HARMLESS RACE CONDITION! data.get_sem_block_lock().wait(); // close the gate if ( 0 != data.get_nwaiters_gone() ) { data.get_nwaiters_blocked() -= data.get_nwaiters_gone(); data.get_nwaiters_gone() = 0; } if (broadcast) { nsignals_to_issue = data.get_nwaiters_to_unblock() = data.get_nwaiters_blocked(); data.get_nwaiters_blocked() = 0; } else { nsignals_to_issue = data.get_nwaiters_to_unblock() = 1; data.get_nwaiters_blocked()--; } } else { // NO-OP //locker's destructor triggers data.get_mtx_unblock_lock().unlock() return; } //locker's destructor triggers data.get_mtx_unblock_lock().unlock() } data.get_sem_block_queue().post(nsignals_to_issue); } template template inline bool condition_algorithm_8a::wait (ConditionMembers &data, bool tout_enabled, const boost::posix_time::ptime &abs_time, InterprocessMutex &mtxExternal) { //Initialize to avoid warnings integer_type nsignals_was_left = 0; integer_type nwaiters_was_gone = 0; data.get_sem_block_lock().wait(); ++data.get_nwaiters_blocked(); data.get_sem_block_lock().post(); struct scoped_unlock { InterprocessMutex & mut; scoped_unlock(InterprocessMutex & m) : mut(m) { m.unlock(); } ~scoped_unlock() { mut.lock(); } } unlocker(mtxExternal); bool bTimedOut = tout_enabled ? !data.get_sem_block_queue().timed_wait(abs_time) : (data.get_sem_block_queue().wait(), false); { scoped_lock locker(data.get_mtx_unblock_lock()); if ( 0 != (nsignals_was_left = data.get_nwaiters_to_unblock()) ) { if ( bTimedOut ) { // timeout (or canceled) if ( 0 != data.get_nwaiters_blocked() ) { data.get_nwaiters_blocked()--; } else { data.get_nwaiters_gone()++; // count spurious wakeups. } } if ( 0 == --data.get_nwaiters_to_unblock() ) { if ( 0 != data.get_nwaiters_blocked() ) { data.get_sem_block_lock().post(); // open the gate. nsignals_was_left = 0; // do not open the gate below again. } else if ( 0 != (nwaiters_was_gone = data.get_nwaiters_gone()) ) { data.get_nwaiters_gone() = 0; } } } else if ( (std::numeric_limits::max)()/2 == ++data.get_nwaiters_gone() ) { // timeout/canceled or spurious semaphore :-) data.get_sem_block_lock().wait(); data.get_nwaiters_blocked() -= data.get_nwaiters_gone(); // something is going on here - test of timeouts? :-) data.get_sem_block_lock().post(); data.get_nwaiters_gone() = 0; } //locker's destructor triggers data.get_mtx_unblock_lock().unlock() } if ( 1 == nsignals_was_left ) { if ( 0 != nwaiters_was_gone ) { // sem_adjust( data.get_sem_block_queue(),-nwaiters_was_gone ); while ( nwaiters_was_gone-- ) { data.get_sem_block_queue().wait(); // better now than spurious later } } data.get_sem_block_lock().post(); // open the gate } //mtxExternal.lock(); called from unlocker return ( bTimedOut ) ? false : true; } } //namespace ipcdetail } //namespace interprocess } //namespace boost #include #endif //BOOST_INTERPROCESS_DETAIL_CONDITION_ALGORITHM_8A_HPP