/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "AdvisoryConsumer.h" #include #include #include #include using namespace activemq; using namespace activemq::core; using namespace activemq::util; using namespace activemq::commands; using namespace activemq::exceptions; using namespace decaf; using namespace decaf::lang; using namespace decaf::lang::exceptions; using namespace decaf::util; using namespace decaf::util::concurrent; using namespace decaf::util::concurrent::atomic; //////////////////////////////////////////////////////////////////////////////// namespace activemq { namespace core { class AdvisoryConsumerConfig { public: int deliveredCounter; Pointer info; AtomicBoolean closed; int hashCode; AdvisoryConsumerConfig() : deliveredCounter(0), info(), closed(false), hashCode() { } }; }} //////////////////////////////////////////////////////////////////////////////// AdvisoryConsumer::AdvisoryConsumer(ActiveMQConnection* connection, Pointer consumerId) : Dispatcher(), config(new AdvisoryConsumerConfig()), connection(connection) { if (connection == NULL) { throw NullPointerException(__FILE__, __LINE__, "Parent Connection pointer was NULL"); } this->config->info.reset(new ConsumerInfo()); this->config->info->setConsumerId(consumerId); this->config->info->setDestination( Pointer(AdvisorySupport::getTempDestinationCompositeAdvisoryTopic())); this->config->info->setPrefetchSize(1000); this->config->info->setNoLocal(true); this->config->info->setDispatchAsync(true); this->config->hashCode = consumerId->getHashCode(); try { this->connection->addDispatcher(this->config->info->getConsumerId(), this); this->connection->syncRequest(this->config->info); } catch(...) { delete this->config; throw; } } //////////////////////////////////////////////////////////////////////////////// AdvisoryConsumer::~AdvisoryConsumer() { try { delete config; } AMQ_CATCHALL_NOTHROW() } //////////////////////////////////////////////////////////////////////////////// void AdvisoryConsumer::dispose() { if (!this->config->closed.compareAndSet(false, true)) { try { this->connection->oneway(this->config->info->createRemoveCommand()); } catch (cms::CMSException& e) { } this->connection->removeDispatcher(this->config->info->getConsumerId()); } } //////////////////////////////////////////////////////////////////////////////// void AdvisoryConsumer::dispatch(const Pointer& message) { // Auto ack messages when we reach 75% of the prefetch this->config->deliveredCounter++; if (this->config->deliveredCounter > (0.75 * this->config->info->getPrefetchSize())) { try { Pointer ack(new MessageAck()); ack->setAckType(ActiveMQConstants::ACK_TYPE_CONSUMED); ack->setConsumerId(this->config->info->getConsumerId()); ack->setDestination(message->getDestination()); ack->setMessageCount(this->config->deliveredCounter); ack->setLastMessageId(message->getMessage()->getMessageId()); this->connection->oneway(ack); this->config->deliveredCounter = 0; } catch (Exception& e) { this->connection->onClientInternalException(e); } } Pointer object = message->getMessage()->getDataStructure(); if (object != NULL) { try { Pointer info = object.dynamicCast(); processDestinationInfo(info); } catch (ClassCastException& ex) { } } } //////////////////////////////////////////////////////////////////////////////// void AdvisoryConsumer::processDestinationInfo(Pointer info) { Pointer dest = info->getDestination(); if (!dest->isTemporary()) { return; } Pointer tempDest = dest.dynamicCast(); if (info->getOperationType() == ActiveMQConstants::DESTINATION_ADD_OPERATION) { this->connection->addTempDestination(tempDest); } else if (info->getOperationType() == ActiveMQConstants::DESTINATION_REMOVE_OPERATION) { this->connection->removeTempDestination(tempDest); } } //////////////////////////////////////////////////////////////////////////////// int AdvisoryConsumer::getHashCode() const { return this->config->hashCode; }