/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef _ACTIVEMQ_CORE_KERNELS_ACTIVEMQPRODUCERKERNEL_H_ #define _ACTIVEMQ_CORE_KERNELS_ACTIVEMQPRODUCERKERNEL_H_ #include #include #include #include #include #include #include #include #include #include #include #include namespace activemq { namespace core { namespace kernels { using decaf::lang::Pointer; class ActiveMQSessionKernel; class AMQCPP_API ActiveMQProducerKernel : public cms::MessageProducer { private: // Disable sending timestamps bool disableTimestamps; // Disable adding a Message Id bool disableMessageId; // The default delivery Mode of this Producer int defaultDeliveryMode; // The default priority Level to send at int defaultPriority; // The default time to live value for messages in milliseconds long long defaultTimeToLive; // The default Send Timeout for this Producer. long long sendTimeout; // Session that this producer sends to. ActiveMQSessionKernel* session; // This Producers protocol specific info object Pointer producerInfo; // Boolean that indicates if the consumer has been closed bool closed; // Memory Usage Class, created only if the Producer is tracking its usage. std::auto_ptr memoryUsage; // The Destination assigned at creation, NULL if not assigned. Pointer destination; // Generator of Message Sequence Id numbers for this producer. util::LongSequenceGenerator messageSequence; // Used to tranform Message before sending them to the CMS bus. cms::MessageTransformer* transformer; private: ActiveMQProducerKernel(const ActiveMQProducerKernel&); ActiveMQProducerKernel& operator=(const ActiveMQProducerKernel&); public: /** * Constructor, creates an instance of an ActiveMQProducerKernel * * @param session * The Session which is the parent of this Producer. * @param parent * Pointer to the cms::MessageProducer that will wrap this kernel object. * @param producerId * Pointer to a ProducerId object which identifies this producer. * @param destination * The assigned Destination this Producer sends to, or null if not set. * The Producer does not own the Pointer passed. * @param sendTimeout * The configured send timeout for this Producer. */ ActiveMQProducerKernel(ActiveMQSessionKernel* session, const Pointer& producerId, const Pointer& destination, long long sendTimeout); virtual ~ActiveMQProducerKernel(); public: // cms::MessageProducer methods. virtual void close(); virtual void send(cms::Message* message); virtual void send(cms::Message* message, cms::AsyncCallback* callback); virtual void send(cms::Message* message, int deliveryMode, int priority, long long timeToLive); virtual void send(cms::Message* message, int deliveryMode, int priority, long long timeToLive, cms::AsyncCallback* callback); virtual void send(const cms::Destination* destination, cms::Message* message); virtual void send(const cms::Destination* destination, cms::Message* message, cms::AsyncCallback* callback); virtual void send(const cms::Destination* destination, cms::Message* message, int deliveryMode, int priority, long long timeToLive); virtual void send(const cms::Destination* destination, cms::Message* message, int deliveryMode, int priority, long long timeToLive, cms::AsyncCallback* callback); /** * Set an MessageTransformer instance that is applied to all cms::Message objects before they * are sent on to the CMS bus. * * @param transformer * Pointer to the cms::MessageTransformer to apply on each cms:;MessageSend. */ virtual void setMessageTransformer(cms::MessageTransformer* transformer) { this->transformer = transformer; } /** * Gets the currently configured MessageTransformer for this MessageProducer. * * @returns the pointer to the currently set cms::MessageTransformer. */ virtual cms::MessageTransformer* getMessageTransformer() const { return this->transformer; } /** * Sets the delivery mode for this Producer * @param mode - The DeliveryMode to use for Message sends. */ virtual void setDeliveryMode(int mode) { this->defaultDeliveryMode = mode; } /** * Gets the delivery mode for this Producer * @return The DeliveryMode */ virtual int getDeliveryMode() const { return this->defaultDeliveryMode; } /** * Sets if Message Ids are disabled for this Producer * @param value - boolean indicating enable / disable (true / false) */ virtual void setDisableMessageID(bool value) { this->disableMessageId = value; } /** * Gets if Message Ids are disabled for this Producer * @return a boolean indicating state enable / disable (true / false) for MessageIds. */ virtual bool getDisableMessageID() const { return this->disableMessageId; } /** * Sets if Message Time Stamps are disabled for this Producer * @param value - boolean indicating enable / disable (true / false) */ virtual void setDisableMessageTimeStamp(bool value) { this->disableTimestamps = value; } /** * Gets if Message Time Stamps are disabled for this Producer * @returns boolean indicating state of enable / disable (true / false) */ virtual bool getDisableMessageTimeStamp() const { return this->disableTimestamps; } /** * Sets the Priority that this Producers sends messages at * @param priority int value for Priority level */ virtual void setPriority(int priority) { this->defaultPriority = priority; } /** * Gets the Priority level that this producer sends messages at * @return int based priority level */ virtual int getPriority() const { return this->defaultPriority; } /** * Sets the Time to Live that this Producers sends messages with * @param time The new default time to live value in milliseconds. */ virtual void setTimeToLive(long long time) { this->defaultTimeToLive = time; } /** * Gets the Time to Live that this producer sends messages with * @return The default time to live value in milliseconds. */ virtual long long getTimeToLive() const { return this->defaultTimeToLive; } /** * Sets the Send Timeout that this Producers sends messages with * @param time The new default send timeout value in milliseconds. */ virtual void setSendTimeout(long long time) { this->sendTimeout = time; } /** * Gets the Send Timeout that this producer sends messages with * @return The default send timeout value in milliseconds. */ virtual long long getSendTimeout() const { return this->sendTimeout; } /** * @returns true if this Producer has been closed. */ bool isClosed() const { return this->closed; } /** * Retries this object ProducerInfo pointer * @return ProducerInfo Reference */ const Pointer& getProducerInfo() const { this->checkClosed(); return this->producerInfo; } /** * Retries this object ProducerId or NULL if closed. * @return ProducerId Reference */ const Pointer& getProducerId() const { this->checkClosed(); return this->producerInfo->getProducerId(); } /** * Handles the work of Processing a ProducerAck Command from the Broker. * @param ack - The ProducerAck message received from the Broker. */ virtual void onProducerAck(const commands::ProducerAck& ack); /** * Performs Producer object cleanup but doesn't attempt to send the Remove command * to the broker. Called when the parent resource if closed first to avoid the message * send and avoid any exceptions that might be thrown from an attempt to send a remove * command to a failed transport. */ void dispose(); /** * @returns the next sequence number for a Message sent from this Producer. */ long long getNextMessageSequence() { return this->messageSequence.getNextSequenceId(); } private: // Checks for the closed state and throws if so. void checkClosed() const; }; }}} #endif /* _ACTIVEMQ_CORE_KERNELS_ACTIVEMQPRODUCERKERNEL_H_ */