/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "OpenWireFormat.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace std; using namespace activemq; using namespace activemq::util; using namespace activemq::commands; using namespace activemq::transport; using namespace activemq::exceptions; using namespace activemq::wireformat; using namespace activemq::wireformat::openwire; using namespace activemq::wireformat::openwire::marshal; using namespace activemq::wireformat::openwire::utils; using namespace decaf::io; using namespace decaf::util; using namespace decaf::lang; using namespace decaf::lang::exceptions; //////////////////////////////////////////////////////////////////////////////// const unsigned char OpenWireFormat::NULL_TYPE = 0; const int OpenWireFormat::DEFAULT_VERSION = 1; const int OpenWireFormat::MAX_SUPPORTED_VERSION = 9; //////////////////////////////////////////////////////////////////////////////// OpenWireFormat::OpenWireFormat(const decaf::util::Properties& properties) : properties(properties), preferedWireFormatInfo(), dataMarshallers(256), id(UUID::randomUUID().toString()), receiving(), version(0), stackTraceEnabled(true), tcpNoDelayEnabled(true), cacheEnabled(true), cacheSize(1024), tightEncodingEnabled(false), sizePrefixDisabled(false), maxInactivityDuration(30000), maxInactivityDurationInitialDelay(10000) { // initialize the universal marshalers, don't need to reset them again // after this so its safe to do this here. generated::MarshallerFactory().configure(this); // Set to Default as lowest common denominator, then we will try // and move up to the preferred when the wireformat is negotiated. this->setVersion(DEFAULT_VERSION); } //////////////////////////////////////////////////////////////////////////////// OpenWireFormat::~OpenWireFormat() { try { this->destroyMarshalers(); } AMQ_CATCHALL_NOTHROW() } //////////////////////////////////////////////////////////////////////////////// Pointer OpenWireFormat::createNegotiator(const Pointer transport) { try { return Pointer(new OpenWireFormatNegotiator(this, transport)); } AMQ_CATCH_RETHROW(UnsupportedOperationException) AMQ_CATCHALL_THROW(UnsupportedOperationException) } //////////////////////////////////////////////////////////////////////////////// void OpenWireFormat::destroyMarshalers() { try { for (size_t i = 0; i < dataMarshallers.size(); ++i) { delete dataMarshallers[i]; dataMarshallers[i] = NULL; } } AMQ_CATCH_NOTHROW(ActiveMQException) AMQ_CATCHALL_NOTHROW() } //////////////////////////////////////////////////////////////////////////////// void OpenWireFormat::setVersion(int version) { try { if (version == this->getVersion()) { return; } if (version > MAX_SUPPORTED_VERSION) { throw IllegalArgumentException(__FILE__, __LINE__, "OpenWireFormat::setVersion - " "Given Version: %d , is not yet supported", version); } // Clear old marshalers in preparation for the new set. this->version = version; } AMQ_CATCH_RETHROW(IllegalArgumentException) AMQ_CATCHALL_THROW(IllegalArgumentException) } //////////////////////////////////////////////////////////////////////////////// void OpenWireFormat::addMarshaller(DataStreamMarshaller* marshaller) { unsigned char type = marshaller->getDataStructureType(); dataMarshallers[type & 0xFF] = marshaller; } //////////////////////////////////////////////////////////////////////////////// void OpenWireFormat::setPreferedWireFormatInfo(const Pointer info) { this->preferedWireFormatInfo = info; } //////////////////////////////////////////////////////////////////////////////// void OpenWireFormat::marshal(const Pointer command, const activemq::transport::Transport* transport, decaf::io::DataOutputStream* dataOut) { if (transport == NULL) { throw decaf::io::IOException(__FILE__, __LINE__, "Transport passed is NULL"); } if (dataOut == NULL) { throw decaf::io::IOException(__FILE__, __LINE__, "DataOutputStream passed is NULL"); } try { int size = 1; if (command != NULL) { DataStructure* dataStructure = dynamic_cast(command.get()); unsigned char type = dataStructure->getDataStructureType(); DataStreamMarshaller* dsm = dataMarshallers[type & 0xFF]; if (dsm == NULL) { throw IOException(__FILE__, __LINE__, (string("OpenWireFormat::marshal - Unknown data type: ") + Integer::toString(type)).c_str()); } if (tightEncodingEnabled) { BooleanStream bs; size += dsm->tightMarshal1(this, dataStructure, &bs); size += bs.marshalledSize(); if (!sizePrefixDisabled) { dataOut->writeInt(size); } dataOut->writeByte(type); bs.marshal(dataOut); dsm->tightMarshal2(this, dataStructure, dataOut, &bs); } else { if (sizePrefixDisabled) { dataOut->writeByte(type); dsm->looseMarshal(this, dataStructure, dataOut); } else { ByteArrayOutputStream* baos = new ByteArrayOutputStream(); std::auto_ptr looseOut(new DataOutputStream(baos, true)); looseOut->writeByte(type); dsm->looseMarshal(this, dataStructure, looseOut.get()); looseOut->close(); // Now the data goes to the transport from out byte buffer. dataOut->writeInt((int) baos->size()); if (baos->size() > 0) { std::pair array = baos->toByteArray(); try { dataOut->write(array.first, array.second); } catch (Exception& ex) { delete[] array.first; throw; } delete[] array.first; } } } } else { dataOut->writeInt(size); dataOut->writeByte(NULL_TYPE); } } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(ActiveMQException, IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// Pointer OpenWireFormat::unmarshal(const activemq::transport::Transport* transport AMQCPP_UNUSED, decaf::io::DataInputStream* dis) { try { if (dis == NULL) { throw decaf::io::IOException(__FILE__, __LINE__, "DataInputStream passed is NULL"); } if (!sizePrefixDisabled) { dis->readInt(); } // Get the unmarshalled DataStructure Pointer data(doUnmarshal(dis)); if (data == NULL) { throw IOException(__FILE__, __LINE__, "OpenWireFormat::doUnmarshal - " "Failed to unmarshal an Object"); } // Now all unmarshals from this level should result in an object // that is a commands::Command type, if its not then the cast will // throw an ClassCastException. Pointer command = data.dynamicCast(); return command; } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(ActiveMQException, IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// commands::DataStructure* OpenWireFormat::doUnmarshal(DataInputStream* dis) { try { class Finally { private: decaf::util::concurrent::atomic::AtomicBoolean* state; private: Finally(const Finally&); Finally& operator=(const Finally&); public: Finally(decaf::util::concurrent::atomic::AtomicBoolean* state) : state(state) { state->set(true); } ~Finally() { state->set(false); } } finalizer(&(this->receiving)); unsigned char dataType = dis->readByte(); if (dataType != NULL_TYPE) { DataStreamMarshaller* dsm = dynamic_cast(dataMarshallers[dataType & 0xFF]); if (dsm == NULL) { throw IOException(__FILE__, __LINE__, (string("OpenWireFormat::marshal - Unknown data type: ") + Integer::toString(dataType)).c_str()); } // Ask the DataStreamMarshaller to create a new instance of its // command so that we can fill in its data. std::auto_ptr data(dsm->createObject()); if (this->tightEncodingEnabled) { BooleanStream bs; bs.unmarshal(dis); dsm->tightUnmarshal(this, data.get(), dis, &bs); } else { dsm->looseUnmarshal(this, data.get(), dis); } return data.release(); } return NULL; } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(ActiveMQException, IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// int OpenWireFormat::tightMarshalNestedObject1(commands::DataStructure* object, utils::BooleanStream* bs) { try { bs->writeBoolean(object != NULL); if (object == NULL) { return 0; } if (object->isMarshalAware()) { std::vector sequence = object->getMarshaledForm(this); bs->writeBoolean(!sequence.empty()); if (!sequence.empty()) { return (int) (1 + sequence.size()); } } unsigned char type = object->getDataStructureType(); if (type == 0) { throw IOException(__FILE__, __LINE__, "No valid data structure type for object of this type"); } DataStreamMarshaller* dsm = dataMarshallers[type & 0xFF]; if (dsm == NULL) { throw IOException(__FILE__, __LINE__, (string("OpenWireFormat::marshal - Unknown data type: ") + Integer::toString(type)).c_str()); } return 1 + dsm->tightMarshal1(this, object, bs); } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(ActiveMQException, IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// void OpenWireFormat::tightMarshalNestedObject2(DataStructure* o, DataOutputStream* ds, BooleanStream* bs) { try { if (!bs->readBoolean()) { return; } unsigned char type = o->getDataStructureType(); ds->writeByte(type); if (o->isMarshalAware() && bs->readBoolean()) { MarshalAware* ma = dynamic_cast(o); vector sequence = ma->getMarshaledForm(this); ds->write(&sequence[0], (int) sequence.size()); } else { DataStreamMarshaller* dsm = dataMarshallers[type & 0xFF]; if (dsm == NULL) { throw IOException(__FILE__, __LINE__, (string("OpenWireFormat::marshal - Unknown data type: ") + Integer::toString(type)).c_str()); } dsm->tightMarshal2(this, o, ds, bs); } } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(ActiveMQException, IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// DataStructure* OpenWireFormat::tightUnmarshalNestedObject(DataInputStream* dis, BooleanStream* bs) { try { if (bs->readBoolean()) { const unsigned char dataType = dis->readByte(); DataStreamMarshaller* dsm = dataMarshallers[dataType & 0xFF]; if (dsm == NULL) { throw IOException(__FILE__, __LINE__, (string("OpenWireFormat::marshal - Unknown data type: ") + Integer::toString(dataType)).c_str()); } std::auto_ptr data(dsm->createObject()); if (data->isMarshalAware() && bs->readBoolean()) { dis->readInt(); dis->readByte(); BooleanStream bs2; bs2.unmarshal(dis); dsm->tightUnmarshal(this, data.get(), dis, &bs2); } else { dsm->tightUnmarshal(this, data.get(), dis, bs); } return data.release(); } else { return NULL; } } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(ActiveMQException, IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// DataStructure* OpenWireFormat::looseUnmarshalNestedObject(decaf::io::DataInputStream* dis) { try { if (dis->readBoolean()) { unsigned char dataType = dis->readByte(); DataStreamMarshaller* dsm = dataMarshallers[dataType & 0xFF]; if (dsm == NULL) { throw IOException(__FILE__, __LINE__, (string("OpenWireFormat::marshal - Unknown data type: ") + Integer::toString(dataType)).c_str()); } std::auto_ptr data(dsm->createObject()); dsm->looseUnmarshal(this, data.get(), dis); return data.release(); } else { return NULL; } } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(ActiveMQException, IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// void OpenWireFormat::looseMarshalNestedObject(commands::DataStructure* o, decaf::io::DataOutputStream* dataOut) { try { dataOut->writeBoolean(o != NULL); if (o != NULL) { unsigned char dataType = o->getDataStructureType(); dataOut->writeByte(dataType); DataStreamMarshaller* dsm = dataMarshallers[dataType & 0xFF]; if (dsm == NULL) { throw IOException(__FILE__, __LINE__, (string("OpenWireFormat::marshal - Unknown data type: ") + Integer::toString(dataType)).c_str()); } dsm->looseMarshal(this, o, dataOut); } } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(ActiveMQException, IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// void OpenWireFormat::renegotiateWireFormat(const WireFormatInfo& info) { if (preferedWireFormatInfo == NULL) { throw IllegalStateException(__FILE__, __LINE__, "OpenWireFormat::renegotiateWireFormat - Wireformat cannot not be renegotiated."); } this->setVersion(Math::min(preferedWireFormatInfo->getVersion(), info.getVersion())); this->stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo->isStackTraceEnabled(); this->tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo->isTcpNoDelayEnabled(); this->cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo->isCacheEnabled(); this->tightEncodingEnabled = info.isTightEncodingEnabled() && preferedWireFormatInfo->isTightEncodingEnabled(); this->sizePrefixDisabled = info.isSizePrefixDisabled() && preferedWireFormatInfo->isSizePrefixDisabled(); this->cacheSize = min(info.getCacheSize(), preferedWireFormatInfo->getCacheSize()); this->maxInactivityDuration = min(info.getMaxInactivityDuration(), preferedWireFormatInfo->getMaxInactivityDuration()); this->maxInactivityDurationInitialDelay = min(info.getMaxInactivityDurationInitalDelay(), preferedWireFormatInfo->getMaxInactivityDurationInitalDelay()); }