/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "IOTransport.h" #include #include #include #include #include #include #include using namespace activemq; using namespace activemq::transport; using namespace activemq::exceptions; using namespace activemq::commands; using namespace activemq::wireformat; using namespace decaf; using namespace decaf::io; using namespace decaf::lang; using namespace decaf::lang::exceptions; using namespace decaf::util::concurrent; using namespace decaf::util::concurrent::atomic; //////////////////////////////////////////////////////////////////////////////// LOGDECAF_INITIALIZE( logger, IOTransport, "activemq.transport.IOTransport") namespace activemq { namespace transport { class IOTransportImpl { private: IOTransportImpl(const IOTransportImpl&); IOTransportImpl& operator= (const IOTransportImpl&); public: Pointer wireFormat; TransportListener* listener; decaf::io::DataInputStream* inputStream; decaf::io::DataOutputStream* outputStream; Pointer thread; AtomicBoolean closed; AtomicBoolean started; IOTransportImpl() : wireFormat(), listener(NULL), inputStream(NULL), outputStream(NULL), thread(), closed(false) { } IOTransportImpl(const Pointer wireFormat) : wireFormat(wireFormat), listener(NULL), inputStream(NULL), outputStream(NULL), thread(), closed(false) { } }; }} //////////////////////////////////////////////////////////////////////////////// IOTransport::IOTransport() : impl(new IOTransportImpl()) { } //////////////////////////////////////////////////////////////////////////////// IOTransport::IOTransport(const Pointer wireFormat) : impl(new IOTransportImpl(wireFormat)) { } //////////////////////////////////////////////////////////////////////////////// IOTransport::~IOTransport() { try { close(); } AMQ_CATCHALL_NOTHROW() try { delete this->impl; } AMQ_CATCHALL_NOTHROW() } //////////////////////////////////////////////////////////////////////////////// void IOTransport::fire(decaf::lang::Exception& ex) { if (this->impl->listener != NULL && this->impl->started.get() && !this->impl->closed.get()) { try { this->impl->listener->onException(ex); } catch (...) { } } } //////////////////////////////////////////////////////////////////////////////// void IOTransport::fire(const Pointer command) { try { // If we have been closed then we don't deliver any messages that // might have sneaked in while we where closing. if (this->impl->listener == NULL || this->impl->closed.get()) { return; } this->impl->listener->onCommand(command); } AMQ_CATCHALL_NOTHROW() } //////////////////////////////////////////////////////////////////////////////// void IOTransport::oneway(const Pointer command) { try { if (impl->closed.get()) { throw IOException(__FILE__, __LINE__, "IOTransport::oneway() - transport is closed!"); } // Make sure the thread has been started. if (impl->thread == NULL) { throw IOException(__FILE__, __LINE__, "IOTransport::oneway() - transport is not started"); } // Make sure the command object is valid. if (command == NULL) { throw IOException(__FILE__, __LINE__, "IOTransport::oneway() - attempting to write NULL command"); } // Make sure we have an output stream to write to. if (impl->outputStream == NULL) { throw IOException(__FILE__, __LINE__, "IOTransport::oneway() - invalid output stream"); } synchronized(impl->outputStream) { // Write the command to the output stream. this->impl->wireFormat->marshal(command, this, this->impl->outputStream); this->impl->outputStream->flush(); } } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// void IOTransport::start() { try { if (impl->started.compareAndSet(false, true)) { if (impl->closed.get()) { throw IOException(__FILE__, __LINE__, "IOTransport::start() - transport is already closed - cannot restart"); } // Make sure all variables that we need have been set. if (impl->inputStream == NULL || impl->outputStream == NULL || impl->wireFormat.get() == NULL) { throw IOException(__FILE__, __LINE__, "IOTransport::start() - " "IO streams and wireFormat instances must be set before calling start"); } // Start the polling thread. impl->thread.reset(new Thread(this, "IOTransport reader Thread")); impl->thread->start(); } } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// void IOTransport::stop() { try { this->impl->started.set(false); } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// void IOTransport::close() { class Finalizer { private: Pointer target; public: Finalizer(Pointer target) : target(target) {} ~Finalizer() { try { target->join(); target.reset(NULL); } DECAF_CATCHALL_NOTHROW() } }; try { // Mark this transport as closed. if (impl->closed.compareAndSet(false, true)) { Finalizer finalize(impl->thread); // No need to fire anymore async events now. this->impl->listener = NULL; IOException error; bool hasException = false; // We have to close the input stream before we stop the thread. this will // force us to wake up the thread if it's stuck in a read (which is likely). // Otherwise, the join that follows will block forever. try { if (impl->inputStream != NULL) { impl->inputStream->close(); } } catch (IOException& ex) { error = ex; error.setMark(__FILE__, __LINE__); hasException = true; } try { // Close the output stream. if (impl->outputStream != NULL) { impl->outputStream->close(); } } catch (IOException& ex) { if (!hasException) { error = ex; error.setMark(__FILE__, __LINE__); hasException = true; } } if (hasException) { throw error; } } } AMQ_CATCH_RETHROW(IOException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) AMQ_CATCHALL_THROW(IOException) } //////////////////////////////////////////////////////////////////////////////// void IOTransport::run() { try { while (this->impl->started.get() && !this->impl->closed.get()) { // Read the next command from the input stream. Pointer command(impl->wireFormat->unmarshal(this, this->impl->inputStream)); // Notify the listener. fire(command); } } catch (exceptions::ActiveMQException& ex) { ex.setMark(__FILE__, __LINE__); fire(ex); } catch (decaf::lang::Exception& ex) { exceptions::ActiveMQException exl(ex); exl.setMark(__FILE__, __LINE__); fire(exl); } catch (...) { exceptions::ActiveMQException ex(__FILE__, __LINE__, "IOTransport::run - caught unknown exception"); LOGDECAF_WARN(logger, ex.getStackTraceString()); fire(ex); } } //////////////////////////////////////////////////////////////////////////////// Pointer IOTransport::asyncRequest(const Pointer command AMQCPP_UNUSED, const Pointer responseCallback AMQCPP_UNUSED) { throw UnsupportedOperationException(__FILE__, __LINE__, "IOTransport::asyncRequest() - unsupported operation"); } //////////////////////////////////////////////////////////////////////////////// Pointer IOTransport::request(const Pointer command AMQCPP_UNUSED) { throw UnsupportedOperationException(__FILE__, __LINE__, "IOTransport::request() - unsupported operation"); } //////////////////////////////////////////////////////////////////////////////// Pointer IOTransport::request(const Pointer command AMQCPP_UNUSED, unsigned int timeout AMQCPP_UNUSED) { throw UnsupportedOperationException(__FILE__, __LINE__, "IOTransport::request() - unsupported operation"); } //////////////////////////////////////////////////////////////////////////////// void IOTransport::setInputStream(decaf::io::DataInputStream* is) { this->impl->inputStream = is; } //////////////////////////////////////////////////////////////////////////////// void IOTransport::setOutputStream(decaf::io::DataOutputStream* os) { this->impl->outputStream = os; } //////////////////////////////////////////////////////////////////////////////// Pointer IOTransport::getWireFormat() const { return this->impl->wireFormat; } //////////////////////////////////////////////////////////////////////////////// void IOTransport::setWireFormat(const Pointer wireFormat) { this->impl->wireFormat = wireFormat; } //////////////////////////////////////////////////////////////////////////////// void IOTransport::setTransportListener(TransportListener* listener) { this->impl->listener = listener; } //////////////////////////////////////////////////////////////////////////////// TransportListener* IOTransport::getTransportListener() const { return this->impl->listener; } //////////////////////////////////////////////////////////////////////////////// bool IOTransport::isConnected() const { return !this->impl->closed.get(); } //////////////////////////////////////////////////////////////////////////////// bool IOTransport::isClosed() const { return this->impl->closed.get(); }