/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_ #define _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_ #include #include #include #include #include #include #include #include #include #include #include #include namespace decaf { namespace util { namespace concurrent { using decaf::lang::Pointer; /** * A BlockingQueue derivative that allows for a bound to be placed on the number of elements * that can be enqueued at any one time. Elements are inserted and removed in FIFO order. * The internal structure of the queue is based on a linked nodes which provides for better * performance over their array based versions but the performance is less predictable. * * The capacity bound of this class default to Integer::MAX_VALUE. * * @since 1.0 */ template class LinkedBlockingQueue : public BlockingQueue { private: template< typename U > class QueueNode { private: U value; bool unlinked; bool dequeued; public: Pointer< QueueNode > next; private: QueueNode(const QueueNode&); QueueNode& operator=(const QueueNode&); public: QueueNode() : value(), unlinked(false), dequeued(false), next() {} QueueNode(const U& value) : value(value), unlinked(false), dequeued(false), next() {} void set(Pointer< QueueNode > next, const U& value) { this->next = next; this->value = value; this->unlinked = false; this->dequeued = false; } E get() const { return this->value; } E getAndDequeue() { E result = this->value; this->value = E(); this->dequeued = true; return result; } void unlink() { this->value = E(); this->unlinked = true; } bool isUnlinked() const { return this->unlinked; } bool isDequeued() const { return this->dequeued; } }; class TotalLock { private: TotalLock(const TotalLock& src); TotalLock& operator=(const TotalLock& src); private: const LinkedBlockingQueue* parent; public: TotalLock(const LinkedBlockingQueue* parent) : parent(parent) { parent->putLock.lock(); parent->takeLock.lock(); } ~TotalLock() { parent->putLock.unlock(); parent->takeLock.unlock(); } }; private: int capacity; decaf::util::concurrent::atomic::AtomicInteger count; /** Lock held by take, poll, etc */ mutable locks::ReentrantLock takeLock; /** Wait queue for waiting takes */ Pointer notEmpty; // takeLock.newCondition(); /** Lock held by put, offer, etc */ mutable locks::ReentrantLock putLock; /** Wait queue for waiting puts */ Pointer notFull; // putLock.newCondition(); Pointer< QueueNode > head; Pointer< QueueNode > tail; public: /** * Create a new instance with a Capacity of Integer::MAX_VALUE */ LinkedBlockingQueue() : BlockingQueue(), capacity(lang::Integer::MAX_VALUE), count(), takeLock(), notEmpty(), putLock(), notFull(), head(new QueueNode()), tail() { this->tail = this->head; this->notEmpty.reset(this->takeLock.newCondition()); this->notFull.reset(this->putLock.newCondition()); } /** * Create a new instance with the given initial capacity value. * * @param capacity * The initial capacity value to assign to this Queue. * * @throws IllegalArgumentException if the specified capacity is not greater than zero. */ LinkedBlockingQueue(int capacity) : BlockingQueue(), capacity(capacity), count(), takeLock(), notEmpty(), putLock(), notFull(), head(new QueueNode()), tail() { if(capacity <= 0) { throw decaf::lang::exceptions::IllegalArgumentException( __FILE__, __LINE__, "Capacity value must be greater than zero."); } this->tail = this->head; this->notEmpty.reset(this->takeLock.newCondition()); this->notFull.reset(this->putLock.newCondition()); } /** * Create a new instance with a Capacity of Integer::MAX_VALUE and adds all the * values contained in the specified collection to this Queue. * * @param collection * The Collection whose elements are to be copied to this Queue. * * @throws IllegalStateException if the number of elements in the collection exceeds * this Queue's capacity. */ LinkedBlockingQueue(const Collection& collection) : BlockingQueue(), capacity(lang::Integer::MAX_VALUE), count(), takeLock(), notEmpty(), putLock(), notFull(), head(new QueueNode()), tail() { this->tail = this->head; this->notEmpty.reset(this->takeLock.newCondition()); this->notFull.reset(this->putLock.newCondition()); Pointer< Iterator > iter(collection.iterator()); try { int count = 0; while(iter->hasNext()) { if(count == this->capacity) { throw decaf::lang::exceptions::IllegalStateException( __FILE__, __LINE__, "Number of elements in the Collection exceeds this Queue's Capacity."); } this->enqueue(iter->next()); ++count; } this->count.set(count); } DECAF_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException) DECAF_CATCH_RETHROW(decaf::lang::Exception) DECAF_CATCHALL_THROW(decaf::lang::Exception) } /** * Create a new instance with a Capacity of Integer::MAX_VALUE and adds all the * values contained in the specified LinkedBlockingQueue to this Queue. * * @param queue * The LinkedBlockingQueue whose elements are to be copied to this Queue. * * @throws IllegalStateException if the number of elements in the collection exceeds * this Queue's capacity. */ LinkedBlockingQueue(const LinkedBlockingQueue& queue) : BlockingQueue(), capacity(lang::Integer::MAX_VALUE), count(), takeLock(), notEmpty(), putLock(), notFull(), head(new QueueNode()), tail() { this->tail = this->head; this->notEmpty.reset(this->takeLock.newCondition()); this->notFull.reset(this->putLock.newCondition()); Pointer< Iterator > iter(queue.iterator()); try { int count = 0; while(iter->hasNext()) { if(count == this->capacity) { throw decaf::lang::exceptions::IllegalStateException( __FILE__, __LINE__, "Number of elements in the Collection exceeds this Queue's Capacity."); } this->enqueue(iter->next()); ++count; } this->count.set(count); } DECAF_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException) DECAF_CATCH_RETHROW(decaf::lang::Exception) DECAF_CATCHALL_THROW(decaf::lang::Exception) } virtual ~LinkedBlockingQueue() { try{ this->purgeList(); } catch(...) {} } public: LinkedBlockingQueue& operator= ( const LinkedBlockingQueue& queue ) { this->clear(); this->addAll(queue); return *this; } LinkedBlockingQueue& operator= ( const Collection& collection ) { this->clear(); this->addAll(collection); return *this; } public: virtual int size() const { return this->count.get(); } virtual void clear() { TotalLock lock(this); this->purgeList(); this->tail = this->head; this->count.set(0); if(this->count.getAndSet(0) == this->capacity) { this->notFull->signal(); } } virtual int remainingCapacity() const { return this->capacity - this->count.get(); } virtual void put( const E& value ) { int c = -1; this->putLock.lockInterruptibly(); try { // Note that count is used in wait guard even though it is not // protected by lock. This works because count can only decrease at // this point (all other puts are shut out by lock), and we (or some // other waiting put) are signaled if it ever changes from capacity. // Similarly for all other uses of count in other wait guards. while (this->count.get() == this->capacity) { this->notFull->await(); } // This method now owns the putLock so we know we have at least // enough capacity for one put, if we enqueue an item and there's // still more room we should signal a waiting put to ensure that // threads don't wait forever. enqueue(value); c = this->count.getAndIncrement(); if(c + 1 < this->capacity) { this->notFull->signal(); } } catch(decaf::lang::Exception& ex) { this->putLock.unlock(); throw; } this->putLock.unlock(); // When c is zero it means we at least incremented once so there was // something in the Queue, another take could have already happened but // we don't know so wake up a waiting taker. if (c == 0) { this->signalNotEmpty(); } } virtual bool offer( const E& value, long long timeout, const TimeUnit& unit ) { int c = -1; long long nanos = unit.toNanos(timeout); this->putLock.lockInterruptibly(); try { while(this->count.get() == this->capacity) { if (nanos <= 0) { return false; } nanos = this->notFull->awaitNanos(nanos); } enqueue(value); c = this->count.getAndIncrement(); if(c + 1 < this->capacity) { this->notFull->signal(); } } catch(decaf::lang::Exception& ex) { this->putLock.unlock(); throw; } this->putLock.unlock(); if(c == 0) { this->signalNotEmpty(); } return true; } virtual bool offer(const E& value) { if (this->count.get() == this->capacity) { return false; } int c = -1; this->putLock.lockInterruptibly(); try { if (this->count.get() < this->capacity) { enqueue(value); c = this->count.getAndIncrement(); if (c + 1 < this->capacity) { this->notFull->signal(); } } } catch (decaf::lang::Exception& ex) { this->putLock.unlock(); throw; } this->putLock.unlock(); if (c == 0) { this->signalNotEmpty(); } return c >= 0; } virtual E take() { E value = E(); int c = -1; this->takeLock.lockInterruptibly(); try { while (this->count.get() == 0) { this->notEmpty->await(); } // Since this methods owns the takeLock and count != 0 we know that // its safe to take one element. if c is greater than one then there // is at least one more so we try to wake up another taker if any. value = dequeue(); c = this->count.getAndDecrement(); if (c > 1) { this->notEmpty->signal(); } } catch (decaf::lang::Exception& ex) { this->takeLock.unlock(); throw; } this->takeLock.unlock(); // When c equals capacity we have removed at least one element // from the Queue so we wake a blocked put operation if there is // one to prevent a deadlock. if (c == this->capacity) { this->signalNotFull(); } return value; } virtual bool poll(E& result, long long timeout, const TimeUnit& unit) { int c = -1; long long nanos = unit.toNanos(timeout); this->takeLock.lockInterruptibly(); try { while (this->count.get() == 0) { if (nanos <= 0) { return false; } nanos = this->notEmpty->awaitNanos(nanos); } result = dequeue(); c = this->count.getAndDecrement(); if (c > 1) { this->notEmpty->signal(); } } catch (decaf::lang::Exception& ex) { this->takeLock.unlock(); throw; } this->takeLock.unlock(); if(c == this->capacity) { this->signalNotFull(); } return true; } virtual bool poll(E& result) { if (this->count.get() == 0) { return false; } int c = -1; this->takeLock.lock(); try { if (this->count.get() > 0) { result = dequeue(); c = this->count.getAndDecrement(); if (c > 1) { this->notEmpty->signal(); } } } catch (decaf::lang::Exception& ex) { this->takeLock.unlock(); throw; } this->takeLock.unlock(); if (c == this->capacity) { this->signalNotFull(); } return true; } virtual bool peek(E& result) const { if(this->count.get() == 0) { return false; } this->takeLock.lock(); try { Pointer< QueueNode > front = this->head->next; if(front == NULL) { return false; } else { result = front->get(); } } catch (decaf::lang::Exception& ex) { this->takeLock.unlock(); throw; } this->takeLock.unlock(); return true; } using AbstractQueue::remove; virtual bool remove(const E& value) { TotalLock lock(this); for(Pointer< QueueNode > predicessor = this->head, p = predicessor->next; p != NULL; predicessor = p, p = p->next) { if(value == p->get()) { unlink(p, predicessor); return true; } } return false; } virtual std::vector toArray() const { TotalLock lock(this); int size = this->count.get(); std::vector array; array.reserve(size); for(Pointer< QueueNode > p = this->head->next; p != NULL; p = p->next) { array.push_back(p->get()); } return array; } virtual std::string toString() const { return std::string("LinkedBlockingQueue [ current size = ") + decaf::lang::Integer::toString(this->count.get()) + "]"; } virtual int drainTo( Collection& c ) { return this->drainTo(c, decaf::lang::Integer::MAX_VALUE); } virtual int drainTo( Collection& sink, int maxElements ) { if(&sink == this) { throw decaf::lang::exceptions::IllegalArgumentException(__FILE__, __LINE__, "Cannot drain this Collection to itself."); } bool signalNotFull = false; bool shouldThrow = false; decaf::lang::Exception delayed; int result = 0; this->takeLock.lock(); try { // We get the count of Nodes that exist now, any puts that are done // after this are not drained and since we hold the lock nothing can // get taken so state should remain consistent. result = decaf::lang::Math::min(maxElements, this->count.get()); Pointer< QueueNode > node = this->head; int i = 0; try { while(i < result) { Pointer< QueueNode > p = node->next; sink.add( p->getAndDequeue() ); node = p; ++i; } } catch(decaf::lang::Exception& e) { delayed = e; shouldThrow = true; } if (i > 0) { this->head = node; signalNotFull = (this->count.getAndAdd(-i) == this->capacity); } } catch(decaf::lang::Exception& ex) { this->takeLock.unlock(); throw; } this->takeLock.unlock(); if (signalNotFull) { this->signalNotFull(); } if (shouldThrow) { throw delayed; } return result; } private: class LinkedIterator : public Iterator { private: Pointer< QueueNode > current; Pointer< QueueNode > last; E currentElement; LinkedBlockingQueue* parent; private: LinkedIterator(const LinkedIterator&); LinkedIterator& operator= (const LinkedIterator&); public: LinkedIterator(LinkedBlockingQueue* parent) : current(), last(), currentElement(), parent(parent) { TotalLock lock(parent); this->current = parent->head->next; if(this->current != NULL) { this->currentElement = current->get(); } } virtual bool hasNext() const { return this->current != NULL; } virtual E next() { TotalLock lock(this->parent); if(this->current == NULL) { throw decaf::util::NoSuchElementException(__FILE__, __LINE__, "Iterator next called with no matching next element."); } E result = this->currentElement; this->last = this->current; this->current = this->nextNode(this->current); this->currentElement = (this->current == NULL) ? E() : this->current->get(); return result; } virtual void remove() { if(this->last == NULL) { throw decaf::lang::exceptions::IllegalStateException(__FILE__, __LINE__, "Iterator remove called without having called next()."); } TotalLock lock(this->parent); Pointer< QueueNode > node; node.swap(this->last); for(Pointer< QueueNode > trail = this->parent->head, p = trail->next; p != NULL; trail = p, p = p->next) { if(p == node) { this->parent->unlink(p, trail); break; } } } private: Pointer< QueueNode > nextNode(Pointer< QueueNode >& p) { // Handle the case of a dequeued Node, the new head of Queue // will be parent->head->next() even if the Queue is empty. if(p->isDequeued()) { return this->parent->head->next; } Pointer< QueueNode > s = p->next; // Handle Nodes that have been removed from the interior of the // Queue, these are tagged but still retain their next() value // in order to account for multiple removes. If all nodes were // removed from the last call then eventually we reach next() == NULL // which is the old tail. while(s != NULL && s->isUnlinked()) { s = s->next; } return s; } }; class ConstLinkedIterator : public Iterator { private: Pointer< QueueNode > current; Pointer< QueueNode > last; E currentElement; const LinkedBlockingQueue* parent; private: ConstLinkedIterator(const ConstLinkedIterator&); ConstLinkedIterator& operator= (const ConstLinkedIterator&); public: ConstLinkedIterator(const LinkedBlockingQueue* parent) : current(), last(), currentElement(), parent(parent) { TotalLock lock(parent); this->current = parent->head->next; if(this->current != NULL) { this->currentElement = current->get(); } } virtual bool hasNext() const { return this->current != NULL; } virtual E next() { TotalLock lock(this->parent); if(this->current == NULL) { throw decaf::util::NoSuchElementException(__FILE__, __LINE__, "Iterator next called with no matching next element."); } E result = this->currentElement; this->last = this->current; this->current = this->nextNode(this->current); this->currentElement = (this->current == NULL) ? E() : this->current->get(); return result; } virtual void remove() { throw lang::exceptions::UnsupportedOperationException( __FILE__, __LINE__, "Cannot write to a const ListIterator." ); } private: Pointer< QueueNode > nextNode(Pointer< QueueNode >& p) { // Handle the case of a dequeued Node, the new head of Queue // will be parent->head->next() even if the Queue is empty. if(p->isDequeued()) { return this->parent->head->next; } Pointer< QueueNode > s = p->next; // Handle Nodes that have been removed from the interior of the // Queue, these are tagged but still retain their next() value // in order to account for multiple removes. If all nodes were // removed from the last call then eventually we reach next() == NULL // which is the old tail. while(s != NULL && s->isUnlinked()) { s = s->next; } return s; } }; public: virtual decaf::util::Iterator* iterator() { return new LinkedIterator(this); } virtual decaf::util::Iterator* iterator() const { return new ConstLinkedIterator(this); } private: void unlink(Pointer< QueueNode >& p, Pointer< QueueNode >& predicessor) { // In order to prevent Iterators from losing their ability to provide // weakly consistent iteration the next value of p is left intact but // the node is marked as unlinked and it value is reset to default. p->unlink(); predicessor->next = p->next; if(this->tail == p) { this->tail = predicessor; } if(this->count.getAndDecrement() == capacity) { this->signalNotFull(); } } void signalNotEmpty() { this->takeLock.lock(); try { this->notEmpty->signal(); } catch(decaf::lang::Exception& ex) { this->takeLock.unlock(); throw; } this->takeLock.unlock(); } void signalNotFull() { this->putLock.lock(); try { this->notFull->signal(); } catch(decaf::lang::Exception& ex) { this->putLock.unlock(); throw; } this->putLock.unlock(); } // Must be called with the putLock locked. void enqueue(E value) { Pointer< QueueNode > newTail( new QueueNode(value) ); this->tail->next = newTail; this->tail = newTail; } // Must be called with the takeLock locked. E dequeue() { Pointer< QueueNode > temp = this->head; Pointer< QueueNode > newHead = temp->next; this->head = newHead; return newHead->getAndDequeue(); } void purgeList() { Pointer< QueueNode > current = this->head->next; Pointer< QueueNode > temp; while(current != NULL) { temp = current; current = current->next; temp->next.reset(NULL); temp.reset(NULL); } } }; }}} #endif /* _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_ */