/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef _ACTIVEMQ_CORE_ACTIVEMQCONNECTIONFACTORY_H_ #define _ACTIVEMQ_CORE_ACTIVEMQCONNECTIONFACTORY_H_ #include #include #include #include #include #include namespace activemq{ namespace core{ using decaf::lang::Pointer; class ActiveMQConnection; class FactorySettings; class PrefetchPolicy; class RedeliveryPolicy; class AMQCPP_API ActiveMQConnectionFactory : public cms::ConnectionFactory { public: // Default Broker URI if none specified 'failover:tcp://localhost:61616' static const std::string DEFAULT_URI; private: FactorySettings* settings; private: ActiveMQConnectionFactory(const ActiveMQConnectionFactory&); ActiveMQConnectionFactory& operator=(const ActiveMQConnectionFactory&); public: ActiveMQConnectionFactory(); /** * Constructor * * @param uri * The URI of the Broker we are connecting to. * @param username * The user name to authenticate with this connection. * @param password * The password to authenticate with this connection. */ ActiveMQConnectionFactory(const std::string& uri, const std::string& username = "", const std::string& password = ""); /** * Constructor * * @param uri * The URI of the Broker we are connecting to. * @param username * The user name to authenticate with this connection. * @param password * The password to authenticate with this connection. */ ActiveMQConnectionFactory(const decaf::net::URI& uri, const std::string& username = "", const std::string& password = ""); virtual ~ActiveMQConnectionFactory(); /** * Creates a connection with the default user identity. The * connection is created in stopped mode. No messages will be * delivered until the Connection.start method is explicitly * called. * * @returns a Connection Pointer * * @throws CMSException if an error occurs. */ virtual cms::Connection* createConnection(); /** * Creates a connection with the specified user identity. The * connection is created in stopped mode. No messages will be * delivered until the Connection.start method is explicitly * called. The user name and password values passed here do not * change the defaults, subsequent calls to the parameterless * createConnection will continue to use the default values that * were set in the Constructor. * * @param username * The user name to authenticate with this connection. * @param password * The password to authenticate with this connection. * * @returns a Connection Pointer * * @throws CMSSecurityException if the user credentials are invalid. * @throws CMSException if an error occurs. */ virtual cms::Connection* createConnection(const std::string& username, const std::string& password); /** * Creates a connection with the specified user identity. The * connection is created in stopped mode. No messages will be * delivered until the Connection.start method is explicitly * called. The username and password values passed here do not * change the defaults, subsequent calls to the parameterless * createConnection will continue to use the default values that * were set in the Constructor. * * @param username * The user name to authenticate with this connection. * @param password * The password to authenticate with this connection. * @param clientId * The client Id to assign to connection if "" then a random client * Id is created for this connection. * * @returns a Connection Pointer * * @throws CMSSecurityException if the user credentials are invalid. * @throws CMSException if an error occurs. */ virtual cms::Connection* createConnection(const std::string& username, const std::string& password, const std::string& clientId); public: // Configuration Options /** * Sets the username that should be used when creating a new connection * @param username string */ void setUsername(const std::string& username); /** * Gets the username that this factory will use when creating a new * connection instance. * @return username string, "" for default credentials */ const std::string& getUsername() const; /** * Sets the password that should be used when creating a new connection * @param password string */ void setPassword(const std::string& password); /** * Gets the password that this factory will use when creating a new * connection instance. * @return password string, "" for default credentials */ const std::string& getPassword() const; /** * Gets the Configured Client Id. * @return the clientId. */ std::string getClientId() const; /** * Sets the Client Id. * @param clientId - The new clientId value. */ void setClientId(const std::string& clientId); /** * Sets the Broker URI that should be used when creating a new connection instance. * * @param brokerURI * The string form of the Broker URI, this will be converted to a URI object. */ void setBrokerURI(const std::string& uri); /** * Sets the Broker URI that should be used when creating a new connection instance. * * @param brokerURI * The URI of the broker that this client will connect to. */ void setBrokerURI(const decaf::net::URI& uri); /** * Gets the Broker URI that this factory will use when creating a new * connection instance. * @return brokerURI string */ const decaf::net::URI& getBrokerURI() const; /** * Set an CMS ExceptionListener that will be set on eat connection once it has been * created. The factory does not take ownership of this pointer, the client must ensure * that its lifetime is scoped to the connection that it is applied to. * * @param listener * The listener to set on the connection or NULL for no listener. */ virtual void setExceptionListener(cms::ExceptionListener* listener); /** * Returns the currently set ExceptionListener that will be set on any new Connection * instance that is created by this factory. * * @return a pointer to a CMS ExceptionListener instance or NULL if not set. */ virtual cms::ExceptionListener* getExceptionListener() const; /** * Set an MessageTransformer instance that is passed on to all Connection objects created from * this ConnectionFactory * * @param transformer * Pointer to the cms::MessageTransformer to set on all newly created Connection objects. */ virtual void setMessageTransformer(cms::MessageTransformer* transformer); /** * Gets the currently configured MessageTransformer for this ConnectionFactory. * * @returns the pointer to the currently set cms::MessageTransformer. */ virtual cms::MessageTransformer* getMessageTransformer() const; /** * Sets the PrefetchPolicy instance that this factory should use when it creates * new Connection instances. The PrefetchPolicy passed becomes the property of the * factory and will be deleted when the factory is destroyed. * * @param policy * The new PrefetchPolicy that the ConnectionFactory should clone for Connections. */ void setPrefetchPolicy(PrefetchPolicy* policy); /** * Gets the pointer to the current PrefetchPolicy that is in use by this ConnectionFactory. * * @returns a pointer to this objects PrefetchPolicy. */ PrefetchPolicy* getPrefetchPolicy() const; /** * Sets the RedeliveryPolicy instance that this factory should use when it creates * new Connection instances. The RedeliveryPolicy passed becomes the property of the * factory and will be deleted when the factory is destroyed. * * @param policy * The new RedeliveryPolicy that the ConnectionFactory should clone for Connections. */ void setRedeliveryPolicy(RedeliveryPolicy* policy); /** * Gets the pointer to the current RedeliveryPolicy that is in use by this ConnectionFactory. * * @returns a pointer to this objects RedeliveryPolicy. */ RedeliveryPolicy* getRedeliveryPolicy() const; /** * @return The value of the dispatch asynchronously option sent to the broker. */ bool isDispatchAsync() const; /** * Should messages be dispatched synchronously or asynchronously from the producer * thread for non-durable topics in the broker? For fast consumers set this to false. * For slow consumers set it to true so that dispatching will not block fast consumers. . * * @param value * The value of the dispatch asynchronously option sent to the broker. */ void setDispatchAsync(bool value); /** * Gets if the Connection should always send things Synchronously. * * @return true if sends should always be Synchronous. */ bool isAlwaysSyncSend() const; /** * Sets if the Connection should always send things Synchronously. * @param value * true if sends should always be Synchronous. */ void setAlwaysSyncSend(bool value); /** * Gets if the useAsyncSend option is set * @returns true if on false if not. */ bool isUseAsyncSend() const; /** * Sets the useAsyncSend option * @param value - true to activate, false to disable. */ void setUseAsyncSend(bool value); /** * Returns whether Message acknowledgments are sent asynchronously meaning no * response is required from the broker before the ack completes. * * @return the sendAcksAsync configured value. (defaults to true) */ bool isSendAcksAsync() const; /** * Sets whether Message acknowledgments are sent asynchronously meaning no * response is required from the broker before the ack completes. * * @param sendAcksAsync * The sendAcksAsync configuration value to set. */ void setSendAcksAsync(bool sendAcksAsync); /** * Gets if the Connection is configured for Message body compression. * @returns if the Message body will be Compressed or not. */ bool isUseCompression() const; /** * Sets whether Message body compression is enabled. * * @param value * Boolean indicating if Message body compression is enabled. */ void setUseCompression(bool value); /** * Sets the Compression level used when Message body compression is enabled, a * value of -1 causes the Compression Library to use the default setting which * is a balance of speed and compression. The range of compression levels is * [0..9] where 0 indicates best speed and 9 indicates best compression. * * @param value * A signed int value that controls the compression level. */ void setCompressionLevel(int value); /** * Gets the currently configured Compression level for Message bodies. * * @return the int value of the current compression level. */ int getCompressionLevel() const; /** * Gets the assigned send timeout for this Connector * @return the send timeout configured in the connection uri */ unsigned int getSendTimeout() const; /** * Sets the send timeout to use when sending Message objects, this will * cause all messages to be sent using a Synchronous request is non-zero. * @param timeout - The time to wait for a response. */ void setSendTimeout(unsigned int timeout); /** * Gets the assigned close timeout for this Connector * @return the close timeout configured in the connection uri */ unsigned int getCloseTimeout() const; /** * Sets the close timeout to use when sending the disconnect request. * @param timeout - The time to wait for a close message. */ void setCloseTimeout(unsigned int timeout); /** * Gets the configured producer window size for Producers that are created * from this connector. This only applies if there is no send timeout and the * producer is able to send asynchronously. * @return size in bytes of messages that this producer can produce before * it must block and wait for ProducerAck messages to free resources. */ unsigned int getProducerWindowSize() const; /** * Sets the size in Bytes of messages that a producer can send before it is blocked * to await a ProducerAck from the broker that frees enough memory to allow another * message to be sent. * @param windowSize - The size in bytes of the Producers memory window. */ void setProducerWindowSize(unsigned int windowSize); /** * @returns true if the Connections that this factory creates should support the * message based priority settings. */ bool isMessagePrioritySupported() const; /** * Set whether or not this factory should create Connection objects with the Message * priority support function enabled. * * @param value * Boolean indicating if Message priority should be enabled. */ void setMessagePrioritySupported(bool value); /** * Should all created consumers be retroactive. * * @returns true if consumer will be created with the retroactive flag set. */ bool isUseRetroactiveConsumer() const; /** * Sets whether or not retroactive consumers are enabled. Retroactive * consumers allow non-durable topic subscribers to receive old messages * that were published before the non-durable subscriber started. * * @param useRetroactiveConsumer * The value of this configuration option. */ void setUseRetroactiveConsumer(bool useRetroactiveConsumer); /** * Should all created consumers be exclusive. * * @returns true if consumer will be created with the exclusive flag set. */ bool isExclusiveConsumer() const; /** * Enables or disables whether or not queue consumers should be exclusive or * not for example to preserve ordering when not using Message Groups. * * @param exclusiveConsumer * The value of this configuration option. */ void setExclusiveConsumer(bool exclusiveConsumer); /** * Is the Connection created by this factory configured to watch for advisory messages * that inform the Connection about temporary destination create / destroy. * * @return true if Connection's will listen for temporary destination advisory messages. */ bool isWatchTopicAdvisories() const; /** * Sets whether Connection's created by this factory will listen for advisory messages * regarding temporary destination creation and deletion. * * @param value * Boolean indicating if advisory message monitoring should be enabled. */ void setWatchTopicAdvisories(bool value); /** * Get the audit depth for Messages for consumers when using a fault * tolerant transport. The higher the value the more messages are checked * for duplication, and the larger the performance impact of duplicate * detection will be. * * @returns the configured audit depth. */ int getAuditDepth() const; /** * Set the audit depth for Messages for consumers when using a fault * tolerant transport. The higher the value the more messages are checked * for duplication, and the larger the performance impact of duplicate * detection will be. * * @param auditDepth * The configured audit depth. */ void setAuditDepth(int auditDepth); /** * The number of Producers that will be audited. * * @returns the configured number of producers to include in the audit. */ int getAuditMaximumProducerNumber() const; /** * The number of Producers that will be audited. * * @param auditMaximumProducerNumber * The configured number of producers to include in the audit. */ void setAuditMaximumProducerNumber(int auditMaximumProducerNumber); /** * Gets the value of the configured Duplicate Message detection feature. * * When enabled and a fault tolerant transport is used (think failover) then * this feature will help to detect and filter duplicate messages that might * otherwise be delivered to a consumer after a connection failure. * * Disabling this can increase performance since no Message auditing will * occur. * * @return the checkForDuplicates value currently set. */ bool isCheckForDuplicates() const; /** * Gets the value of the configured Duplicate Message detection feature. * * When enabled and a fault tolerant transport is used (think failover) then * this feature will help to detect and filter duplicate messages that might * otherwise be delivered to a consumer after a connection failure. * * Disabling this can increase performance since no Message auditing will * occur. * * @param checkForDuplicates * The checkForDuplicates value to be configured. */ void setCheckForDuplicates(bool checkForDuplicates); /** * when true, submit individual transacted acks immediately rather than with transaction * completion. This allows the acks to represent delivery status which can be persisted on * rollback Used in conjunction with KahaDB set to Rewrite On Redelivery. * * @returns true if this option is enabled. */ bool isTransactedIndividualAck() const; /** * when true, submit individual transacted acks immediately rather than with transaction * completion. This allows the acks to represent delivery status which can be persisted on * rollback Used in conjunction with KahaDB set to Rewrite On Redelivery. * * @param transactedIndividualAck * The value to set. */ void setTransactedIndividualAck(bool transactedIndividualAck); /** * Returns true if non-blocking redelivery of Messages is configured for Consumers * that are rolled back or recovered. * * @return true if non-blocking redelivery is enabled. */ bool isNonBlockingRedelivery() const; /** * When true a MessageConsumer will not stop Message delivery before re-delivering Messages * from a rolled back transaction. This implies that message order will not be preserved and * also will result in the TransactedIndividualAck option to be enabled. * * @param nonBlockingRedelivery * The value to configure for non-blocking redelivery. */ void setNonBlockingRedelivery(bool nonBlockingRedelivery); /** * Gets the delay period for a consumer redelivery. * * @returns configured time delay in milliseconds. */ long long getConsumerFailoverRedeliveryWaitPeriod() const; /** * Sets the delay period for a consumer redelivery. * * @param value * The configured time delay in milliseconds. */ void setConsumerFailoverRedeliveryWaitPeriod(long long value); /** * @return true if optimizeAcknowledge is enabled. */ bool isOptimizeAcknowledge() const; /** * Sets if Consumers are configured to use Optimized Acknowledge by default. * * @param optimizeAcknowledge * The optimizeAcknowledge mode to set. */ void setOptimizeAcknowledge(bool optimizeAcknowledge); /** * Gets the time between optimized ack batches in milliseconds. * * @returns time between optimized ack batches in Milliseconds. */ long long getOptimizeAcknowledgeTimeOut() const; /** * The max time in milliseconds between optimized ack batches. * * @param optimizeAcknowledgeTimeOut * The time in milliseconds for optimized ack batches. */ void setOptimizeAcknowledgeTimeOut(long long optimizeAcknowledgeTimeOut); /** * Gets the configured time interval that is used to force all MessageConsumers that have * optimizedAcknowledge enabled to send an ack for any outstanding Message Acks. By default * this value is set to zero meaning that the consumers will not do any background Message * acknowledgment. * * @return the scheduledOptimizedAckInterval */ long long getOptimizedAckScheduledAckInterval() const; /** * Sets the amount of time between scheduled sends of any outstanding Message Acks for * consumers that have been configured with optimizeAcknowledge enabled. * * Time is given in Milliseconds. * * @param optimizedAckScheduledAckInterval * The scheduledOptimizedAckInterval to use for new Consumers. */ void setOptimizedAckScheduledAckInterval(long long optimizedAckScheduledAckInterval); public: /** * Creates a connection with the specified user identity. The * connection is created in stopped mode. No messages will be * delivered until the Connection.start method is explicitly called. * * @param uri * The URI of the Broker we are connecting to. * @param username * The name of the user to authenticate with. * @param password * The password for the user to authenticate with. * @param clientId * The unique client id to assign to connection, defaults to "". * * @throw CMSException. */ static cms::Connection* createConnection(const std::string& uri, const std::string& username, const std::string& password, const std::string& clientId = ""); protected: /** * Create a new ActiveMQConnection instnace using the provided Transport and Properties. * Subclasses can override this to control the actual type of ActiveMQConnection that * is created. * * @param transport * The Transport that the Connection should use to communicate with the Broker. * @param properties * The Properties that are assigned to the new Connection instance. * * @returns a new ActiveMQConnection pointer instance. */ virtual ActiveMQConnection* createActiveMQConnection(const Pointer& transport, const Pointer& properties); private: cms::Connection* doCreateConnection(const decaf::net::URI& uri, const std::string& username, const std::string& password, const std::string& clientId); void configureConnection(ActiveMQConnection* connection); }; }} #endif /*_ACTIVEMQ_CORE_ACTIVEMQCONNECTIONFACTORY_H_*/