/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "ConnectionStateTracker.h" #include #include #include #include #include #include #include #include #include #include #include #include using namespace activemq; using namespace activemq::core; using namespace activemq::state; using namespace activemq::commands; using namespace activemq::exceptions; using namespace decaf; using namespace decaf::lang; using namespace decaf::io; using namespace decaf::lang::exceptions; //////////////////////////////////////////////////////////////////////////////// namespace activemq { namespace state { class MessageCache : public LinkedHashMap, Pointer > { protected: ConnectionStateTracker* parent; public: int currentCacheSize; public: MessageCache(ConnectionStateTracker* parent) : /* amd275 - Patched for gcc v4.4.6, please see: https://issues.apache.org/jira/browse/AMQCPP-483 */ /* LinkedHashMap(), parent(parent), currentCacheSize(0) { */ LinkedHashMap, Pointer >(), parent(parent), currentCacheSize(0) { } virtual ~MessageCache() {} virtual bool removeEldestEntry(const MapEntry, Pointer >& eldest) { bool result = currentCacheSize > parent->getMaxMessageCacheSize(); if (result) { Pointer message = eldest.getValue().dynamicCast(); currentCacheSize -= message->getSize(); } return result; } }; class MessagePullCache : public LinkedHashMap > { protected: ConnectionStateTracker* parent; public: MessagePullCache(ConnectionStateTracker* parent) : LinkedHashMap >(), parent(parent) { } virtual ~MessagePullCache() {} virtual bool removeEldestEntry(const MapEntry >& eldest AMQCPP_UNUSED) { return size() > parent->getMaxMessagePullCacheSize(); } }; class StateTrackerImpl { private: StateTrackerImpl(const StateTrackerImpl&); StateTrackerImpl& operator= (const StateTrackerImpl&); public: /** Parent ConnectionStateTracker */ ConnectionStateTracker* parent; /** Creates a unique marker for this state tracker */ const Pointer TRACKED_RESPONSE_MARKER; /** Map holding the ConnectionStates, indexed by the ConnectionId */ ConcurrentStlMap, Pointer, ConnectionId::COMPARATOR> connectionStates; /** Store Messages if trackMessages == true */ MessageCache messageCache; /** Store MessagePull commands for replay */ MessagePullCache messagePullCache; StateTrackerImpl(ConnectionStateTracker * parent) : parent(parent), TRACKED_RESPONSE_MARKER(new Tracked()), connectionStates(), messageCache(parent), messagePullCache(parent) { } ~StateTrackerImpl() { try { connectionStates.clear(); messageCache.clear(); messagePullCache.clear(); } AMQ_CATCHALL_NOTHROW() } }; class RemoveTransactionAction : public Runnable { private: Pointer info; ConnectionStateTracker* stateTracker; private: RemoveTransactionAction(const RemoveTransactionAction&); RemoveTransactionAction& operator=(const RemoveTransactionAction&); public: RemoveTransactionAction(ConnectionStateTracker* stateTracker, Pointer info) : info(info), stateTracker(stateTracker) { } virtual ~RemoveTransactionAction() {} virtual void run() { Pointer connectionId = info->getConnectionId(); Pointer cs = stateTracker->impl->connectionStates.get(connectionId); Pointer txState = cs->removeTransactionState(info->getTransactionId()); if (txState != NULL) { txState->clear(); } } }; }} //////////////////////////////////////////////////////////////////////////////// ConnectionStateTracker::ConnectionStateTracker() : impl(new StateTrackerImpl(this)), trackTransactions(false), restoreSessions(true), restoreConsumers(true), restoreProducers(true), restoreTransaction(true), trackMessages(true), trackTransactionProducers(true), maxMessageCacheSize(128 * 1024), maxMessagePullCacheSize(10) { } //////////////////////////////////////////////////////////////////////////////// ConnectionStateTracker::~ConnectionStateTracker() { try { delete impl; } AMQ_CATCHALL_NOTHROW() } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::track(Pointer command) { try{ Pointer result = command->visit(this); if (result == NULL) { return Pointer(); } else { return result.dynamicCast(); } } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// void ConnectionStateTracker::trackBack(Pointer command) { try { if (command != NULL) { if (trackMessages && command->isMessage()) { Pointer message = command.dynamicCast(); if (message->getTransactionId() == NULL) { this->impl->messageCache.currentCacheSize += message->getSize(); } } } } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// void ConnectionStateTracker::restore(Pointer transport) { try { Pointer > > iterator( this->impl->connectionStates.values().iterator()); while (iterator->hasNext()) { Pointer state = iterator->next(); Pointer info = state->getInfo(); info->setFailoverReconnect(true); transport->oneway(info); doRestoreTempDestinations(transport, state); if (restoreSessions) { doRestoreSessions(transport, state); } if (restoreTransaction) { doRestoreTransactions(transport, state); } } // Now we flush messages Pointer > > messages(this->impl->messageCache.values().iterator()); while (messages->hasNext()) { transport->oneway(messages->next()); } Pointer > > messagePullIter(this->impl->messagePullCache.values().iterator()); while (messagePullIter->hasNext()) { transport->oneway(messagePullIter->next()); } } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// void ConnectionStateTracker::doRestoreTransactions(Pointer transport, Pointer connectionState) { try { std::vector > toRollback; // For any completed transactions we don't know if the commit actually made it to the broker // or was lost along the way, so they need to be rolled back. Pointer > > iter(connectionState->getTransactionStates().iterator()); while (iter->hasNext()) { Pointer txState = iter->next(); Pointer lastCommand = txState->getCommands().getLast(); if (lastCommand->isTransactionInfo()) { Pointer transactionInfo = lastCommand.dynamicCast(); if (transactionInfo->getType() == ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE) { toRollback.push_back(transactionInfo); continue; } } // replay short lived producers that may have been involved in the transaction Pointer > > state(txState->getProducerStates().iterator()); while (state->hasNext()) { transport->oneway(state->next()->getInfo()); } std::auto_ptr > > commands(txState->getCommands().iterator()); while (commands->hasNext()) { transport->oneway(commands->next()); } state.reset(txState->getProducerStates().iterator()); while (state->hasNext()) { transport->oneway(state->next()->getInfo()->createRemoveCommand()); } } // Trigger failure of commit for all outstanding completed but in doubt transactions. std::vector >::const_iterator command = toRollback.begin(); for (; command != toRollback.end(); ++command) { Pointer response(new ExceptionResponse()); Pointer exception(new BrokerError()); exception->setExceptionClass("TransactionRolledBackException"); exception->setMessage( std::string("Transaction completion in doubt due to failover. Forcing rollback of ") + (*command)->getTransactionId()->toString()); response->setException(exception); response->setCorrelationId((*command)->getCommandId()); transport->getTransportListener()->onCommand(response); } } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// void ConnectionStateTracker::doRestoreSessions(Pointer transport, Pointer connectionState) { try { Pointer > > iter(connectionState->getSessionStates().iterator()); while (iter->hasNext()) { Pointer state = iter->next(); transport->oneway(state->getInfo()); if (restoreProducers) { doRestoreProducers(transport, state); } if (restoreConsumers) { doRestoreConsumers(transport, state); } } } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// void ConnectionStateTracker::doRestoreConsumers(Pointer transport, Pointer sessionState) { try { // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete Pointer connectionState = this->impl->connectionStates.get(sessionState->getInfo()->getSessionId()->getParentId()); bool connectionInterruptionProcessingComplete = connectionState->isConnectionInterruptProcessingComplete(); Pointer > > state(sessionState->getConsumerStates().iterator()); while (state->hasNext()) { Pointer infoToSend = state->next()->getInfo(); Pointer wireFormat = transport->getWireFormat(); if (!connectionInterruptionProcessingComplete && infoToSend->getPrefetchSize() > 0 && wireFormat->getVersion() > 5) { Pointer oldInfoToSend = infoToSend; infoToSend.reset(oldInfoToSend->cloneDataStructure()); connectionState->getRecoveringPullConsumers().put(infoToSend->getConsumerId(), oldInfoToSend); infoToSend->setPrefetchSize(0); } transport->oneway(infoToSend); } } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// void ConnectionStateTracker::doRestoreProducers(Pointer transport, Pointer sessionState) { try { // Restore the session's producers Pointer > > iter(sessionState->getProducerStates().iterator()); while (iter->hasNext()) { Pointer state = iter->next(); transport->oneway(state->getInfo()); } } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// void ConnectionStateTracker::doRestoreTempDestinations(Pointer transport, Pointer connectionState) { try { std::auto_ptr > > iter(connectionState->getTempDesinations().iterator()); while (iter->hasNext()) { transport->oneway(iter->next()); } } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processDestinationInfo(DestinationInfo* info) { try { if (info != NULL) { Pointer cs = this->impl->connectionStates.get(info->getConnectionId()); if (cs != NULL && info->getDestination()->isTemporary()) { cs->addTempDestination(Pointer(info->cloneDataStructure())); } } return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processRemoveDestination(DestinationInfo* info) { try { if (info != NULL) { Pointer cs = this->impl->connectionStates.get(info->getConnectionId()); if (cs != NULL && info->getDestination()->isTemporary()) { cs->removeTempDestination(info->getDestination()); } } return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processProducerInfo(ProducerInfo* info) { try { if (info != NULL && info->getProducerId() != NULL) { Pointer sessionId = info->getProducerId()->getParentId(); if (sessionId != NULL) { Pointer connectionId = sessionId->getParentId(); if (connectionId != NULL) { Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer ss = cs->getSessionState(sessionId); if (ss != NULL) { ss->addProducer(Pointer(info->cloneDataStructure())); } } } } } return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processRemoveProducer(ProducerId* id) { try { if (id != NULL) { Pointer sessionId = id->getParentId(); if (sessionId != NULL) { Pointer connectionId = sessionId->getParentId(); if (connectionId != NULL) { Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer ss = cs->getSessionState(sessionId); if (ss != NULL) { ss->removeProducer(Pointer(id->cloneDataStructure())); } } } } } return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processConsumerInfo(ConsumerInfo* info) { try { if (info != NULL) { Pointer sessionId = info->getConsumerId()->getParentId(); if (sessionId != NULL) { Pointer connectionId = sessionId->getParentId(); if (connectionId != NULL) { Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer ss = cs->getSessionState(sessionId); if (ss != NULL) { ss->addConsumer(Pointer(info->cloneDataStructure())); } } } } } return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processRemoveConsumer(ConsumerId* id) { try { if (id != NULL) { Pointer sessionId = id->getParentId(); if (sessionId != NULL) { Pointer connectionId = sessionId->getParentId(); if (connectionId != NULL) { Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer ss = cs->getSessionState(sessionId); if (ss != NULL) { ss->removeConsumer(Pointer(id->cloneDataStructure())); } } } } } return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processSessionInfo(SessionInfo* info) { try { if (info != NULL) { Pointer connectionId = info->getSessionId()->getParentId(); if (connectionId != NULL) { Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { cs->addSession(Pointer(info->cloneDataStructure())); } } } return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processRemoveSession(SessionId* id) { try { if (id != NULL) { Pointer connectionId = id->getParentId(); if (connectionId != NULL) { Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { cs->removeSession(Pointer(id->cloneDataStructure())); } } } return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processConnectionInfo(ConnectionInfo* info) { try { if (info != NULL) { Pointer infoCopy(info->cloneDataStructure()); this->impl->connectionStates.put( info->getConnectionId(), Pointer(new ConnectionState(infoCopy))); } return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processRemoveConnection(ConnectionId* id) { try { if (id != NULL) { this->impl->connectionStates.remove(Pointer(id->cloneDataStructure())); } return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processMessage(Message* message) { try { if (message != NULL) { if (trackTransactions && message->getTransactionId() != NULL) { Pointer producerId = message->getProducerId(); Pointer connectionId = producerId->getParentId()->getParentId(); if (connectionId != NULL) { Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer transactionState = cs->getTransactionState(message->getTransactionId()); if (transactionState != NULL) { transactionState->addCommand(Pointer(message->cloneDataStructure())); if (trackTransactionProducers) { // Track the producer in case it is closed before a commit Pointer sessionState = cs->getSessionState(producerId->getParentId()); Pointer producerState = sessionState->getProducerState(producerId); producerState->setTransactionState(transactionState); } } } } return this->impl->TRACKED_RESPONSE_MARKER; } else if (trackMessages) { this->impl->messageCache.put( message->getMessageId(), Pointer(message->cloneDataStructure())); } } return Pointer(); } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processBeginTransaction(TransactionInfo* info) { try { if (trackTransactions && info != NULL) { Pointer connectionId = info->getConnectionId(); if (connectionId != NULL) { Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { cs->addTransactionState(info->getTransactionId()); Pointer transactionState = cs->getTransactionState(info->getTransactionId()); transactionState->addCommand(Pointer(info->cloneDataStructure())); } } return this->impl->TRACKED_RESPONSE_MARKER; } return Pointer(); } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processPrepareTransaction(TransactionInfo* info) { try { if (trackTransactions && info != NULL) { Pointer connectionId = info->getConnectionId(); if (connectionId != NULL) { Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer transactionState = cs->getTransactionState(info->getTransactionId()); if (transactionState != NULL) { transactionState->addCommand(Pointer(info->cloneDataStructure())); } } } return this->impl->TRACKED_RESPONSE_MARKER; } return Pointer(); } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processCommitTransactionOnePhase(TransactionInfo* info) { try { if (trackTransactions && info != NULL) { Pointer connectionId = info->getConnectionId(); if (connectionId != NULL) { Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer transactionState = cs->getTransactionState(info->getTransactionId()); if (transactionState != NULL) { Pointer infoCopy(info->cloneDataStructure()); transactionState->addCommand(infoCopy); return Pointer(new Tracked(Pointer(new RemoveTransactionAction(this, infoCopy)))); } } } } return Pointer(); } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processCommitTransactionTwoPhase(TransactionInfo* info) { try { if (trackTransactions && info != NULL) { Pointer connectionId = info->getConnectionId(); if (connectionId != NULL) { Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer transactionState = cs->getTransactionState(info->getTransactionId()); if (transactionState != NULL) { Pointer infoCopy(info->cloneDataStructure()); transactionState->addCommand(infoCopy); return Pointer(new Tracked(Pointer(new RemoveTransactionAction(this, infoCopy)))); } } } } return Pointer(); } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processRollbackTransaction(TransactionInfo* info) { try { if (trackTransactions && info != NULL) { Pointer connectionId = info->getConnectionId(); if (connectionId != NULL) { Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer transactionState = cs->getTransactionState(info->getTransactionId()); if (transactionState != NULL) { Pointer infoCopy(info->cloneDataStructure()); transactionState->addCommand(infoCopy); return Pointer(new Tracked(Pointer(new RemoveTransactionAction(this, infoCopy)))); } } } } return Pointer(); } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processEndTransaction(TransactionInfo* info) { try { if (trackTransactions && info != NULL) { Pointer connectionId = info->getConnectionId(); if (connectionId != NULL) { Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer transactionState = cs->getTransactionState(info->getTransactionId()); if (transactionState != NULL) { transactionState->addCommand(Pointer(info->cloneDataStructure())); } } } return this->impl->TRACKED_RESPONSE_MARKER; } return Pointer(); } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processMessagePull(MessagePull* pull) { try { if (pull != NULL && pull->getDestination() != NULL && pull->getConsumerId() != NULL) { std::string id = pull->getDestination()->toString() + "::" + pull->getConsumerId()->toString(); this->impl->messagePullCache.put(id, Pointer(pull->cloneDataStructure())); } return Pointer(); } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// void ConnectionStateTracker::connectionInterruptProcessingComplete(transport::Transport* transport, Pointer connectionId) { Pointer connectionState = this->impl->connectionStates.get(connectionId); if (connectionState != NULL) { connectionState->setConnectionInterruptProcessingComplete(true); StlMap, Pointer, ConsumerId::COMPARATOR> stalledConsumers = connectionState->getRecoveringPullConsumers(); Pointer > > key(stalledConsumers.keySet().iterator()); while (key->hasNext()) { Pointer control(new ConsumerControl()); Pointer theKey = key->next(); control->setConsumerId(theKey); control->setPrefetch(stalledConsumers.get(theKey)->getPrefetchSize()); control->setDestination(stalledConsumers.get(theKey)->getDestination()); try { transport->oneway(control); } catch (Exception& ex) { } } stalledConsumers.clear(); } } //////////////////////////////////////////////////////////////////////////////// void ConnectionStateTracker::transportInterrupted() { Pointer > > state(this->impl->connectionStates.values().iterator()); while (state->hasNext()) { state->next()->setConnectionInterruptProcessingComplete(false); } }