/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef _ACTIVEMQ_WIREFORMAT_STOMP_STOMPWIREFORMAT_H_ #define _ACTIVEMQ_WIREFORMAT_STOMP_STOMPWIREFORMAT_H_ #include #include #include #include #include #include namespace activemq { namespace wireformat { namespace stomp { using decaf::lang::Pointer; using activemq::commands::Command; class StompHelper; class StompWireformatProperties; class AMQCPP_API StompWireFormat : public WireFormat { private: /** * Performs conversions for STOMP types and canonical types. */ StompHelper* helper; // Stored after we connect to use when validating that a durable subscribe // and unsubscribe are set to use the client Id. std::string clientId; // Indicates when we are in the doUnmarshal call decaf::util::concurrent::atomic::AtomicBoolean receiving; // Internal structure for holding class internal data that can change without // affecting binary compatibility. StompWireformatProperties* properties; private: StompWireFormat(const StompWireFormat&); StompWireFormat& operator=(const StompWireFormat&); public: StompWireFormat(); virtual ~StompWireFormat(); public: /** * Stream based marshaling of a Command, this method blocks until the entire * Command has been written out to the output stream. * * @param command * The Command to Marshal to the output stream. * @param transport * The Transport that initiated this marshal call. * @param out * The output stream to write the command to. * * @throws IOException */ virtual void marshal(const Pointer command, const activemq::transport::Transport* transport, decaf::io::DataOutputStream* out); /** * Stream based un-marshaling, blocks on reads on the input stream until a complete * command has been read and unmarshaled into the correct form. Returns a Pointer * to the newly unmarshaled Command. * * @param transport - Pointer to the transport that is making this request. * @param in - the input stream to read the command from. * @returns the newly marshaled Command, caller owns the pointer * @throws IOException */ virtual Pointer unmarshal(const activemq::transport::Transport* transport, decaf::io::DataInputStream* in); /** * Set the Version * @param the version of the wire format */ virtual void setVersion(int version AMQCPP_UNUSED) {} /** * Get the Version * @return the version of the wire format */ virtual int getVersion() const { return 1; } /** * Gets the prefix used to address Topics * * @return the string prefix used to address Topics. */ std::string getTopicPrefix() const; /** * Sets the prefix used to address Topics. * * @param prefix * The prefix to use. */ void setTopicPrefix(const std::string& prefix); /** * Gets the prefix used to address Queues * * @return the string prefix used to address Queues. */ std::string getQueuePrefix() const; /** * Sets the prefix used to address Queues. * * @param prefix * The prefix to use. */ void setQueuePrefix(const std::string& prefix); /** * Gets the prefix used to address Temporary Topics * * @return the string prefix used to address Temporary Topics. */ std::string getTempTopicPrefix() const; /** * Sets the prefix used to address Temporary Topics. * * @param prefix * The prefix to use. */ void setTempTopicPrefix(const std::string& prefix); /** * Gets the prefix used to address Temporary Queues * * @return the string prefix used to address Temporary Queues. */ std::string getTempQueuePrefix() const; /** * Sets the prefix used to address Temporary Queues. * * @param prefix * The prefix to use. */ void setTempQueuePrefix(const std::string& prefix); /** * Is there a Message being unmarshaled? * * @return true while in the doUnmarshal method. */ virtual bool inReceive() const { return this->receiving.get(); } /** * Returns true if this WireFormat has a Negotiator that needs to wrap the * Transport that uses it. * @returns true if the WireFormat provides a Negotiator. */ virtual bool hasNegotiator() const { return false; } /** * If the Transport Provides a Negotiator this method will create and return * a news instance of the Negotiator. * @returns new instance of a WireFormatNegotiator. * @throws UnsupportedOperationException if the WireFormat doesn't have a Negotiator. */ virtual Pointer createNegotiator( const Pointer transport); private: Pointer unmarshalMessage(const Pointer frame); Pointer unmarshalReceipt(const Pointer frame); Pointer unmarshalConnected(const Pointer frame); Pointer unmarshalError(const Pointer frame); Pointer marshalMessage(const Pointer command); Pointer marshalAck(const Pointer command); Pointer marshalConnectionInfo(const Pointer command); Pointer marshalTransactionInfo(const Pointer command); Pointer marshalShutdownInfo(const Pointer command); Pointer marshalRemoveInfo(const Pointer command); Pointer marshalConsumerInfo(const Pointer command); Pointer marshalRemoveSubscriptionInfo(const Pointer command); }; }}} #endif /* _ACTIVEMQ_WIREFORMAT_STOMP_STOMPWIREFORMAT_H_ */