/*************************************************************************\ * Copyright (c) 2002 The University of Chicago, as Operator of Argonne * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. * EPICS BASE is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. \*************************************************************************/ /* * L O S A L A M O S * Los Alamos National Laboratory * Los Alamos, New Mexico 87545 * * Copyright, 1986, The Regents of the University of California. * * Author: Jeff Hill * */ #ifdef _MSC_VER # pragma warning(disable:4355) #endif #define epicsAssertAuthor "Jeff Hill johill@lanl.gov" #include #include #include "errlog.h" #define epicsExportSharedSymbols #include "localHostName.h" #include "iocinf.h" #include "virtualCircuit.h" #include "inetAddrID.h" #include "cac.h" #include "netiiu.h" #include "hostNameCache.h" #include "net_convert.h" #include "bhe.h" #include "epicsSignal.h" #include "caerr.h" #include "udpiiu.h" using namespace std; const unsigned mSecPerSec = 1000u; const unsigned uSecPerSec = 1000u * mSecPerSec; tcpSendThread::tcpSendThread ( class tcpiiu & iiuIn, const char * pName, unsigned stackSize, unsigned priority ) : thread ( *this, pName, stackSize, priority ), iiu ( iiuIn ) { } tcpSendThread::~tcpSendThread () { } void tcpSendThread::start () { this->thread.start (); } void tcpSendThread::show ( unsigned /* level */ ) const { } void tcpSendThread::exitWait () { this->thread.exitWait (); } void tcpSendThread::run () { try { epicsGuard < epicsMutex > guard ( this->iiu.mutex ); bool laborPending = false; while ( true ) { // dont wait if there is still labor to be done below if ( ! laborPending ) { epicsGuardRelease < epicsMutex > unguard ( guard ); this->iiu.sendThreadFlushEvent.wait (); } if ( this->iiu.state != tcpiiu::iiucs_connected ) { break; } laborPending = false; bool flowControlLaborNeeded = this->iiu.busyStateDetected != this->iiu.flowControlActive; bool echoLaborNeeded = this->iiu.echoRequestPending; this->iiu.echoRequestPending = false; if ( flowControlLaborNeeded ) { if ( this->iiu.flowControlActive ) { this->iiu.disableFlowControlRequest ( guard ); this->iiu.flowControlActive = false; debugPrintf ( ( "fc off\n" ) ); } else { this->iiu.enableFlowControlRequest ( guard ); this->iiu.flowControlActive = true; debugPrintf ( ( "fc on\n" ) ); } } if ( echoLaborNeeded ) { this->iiu.echoRequest ( guard ); } while ( nciu * pChan = this->iiu.createReqPend.get () ) { this->iiu.createChannelRequest ( *pChan, guard ); if ( CA_V42 ( this->iiu.minorProtocolVersion ) ) { this->iiu.createRespPend.add ( *pChan ); pChan->channelNode::listMember = channelNode::cs_createRespPend; } else { // This wakes up the resp thread so that it can call // the connect callback. This isnt maximally efficent // but it has the excellent side effect of not requiring // that the UDP thread take the callback lock. There are // almost no V42 servers left at this point. this->iiu.v42ConnCallbackPend.add ( *pChan ); pChan->channelNode::listMember = channelNode::cs_v42ConnCallbackPend; this->iiu.echoRequestPending = true; laborPending = true; } if ( this->iiu.sendQue.flushBlockThreshold () ) { laborPending = true; break; } } while ( nciu * pChan = this->iiu.subscripReqPend.get () ) { // this installs any subscriptions as needed pChan->resubscribe ( guard ); this->iiu.connectedList.add ( *pChan ); pChan->channelNode::listMember = channelNode::cs_connected; if ( this->iiu.sendQue.flushBlockThreshold () ) { laborPending = true; break; } } while ( nciu * pChan = this->iiu.subscripUpdateReqPend.get () ) { // this updates any subscriptions as needed pChan->sendSubscriptionUpdateRequests ( guard ); this->iiu.connectedList.add ( *pChan ); pChan->channelNode::listMember = channelNode::cs_connected; if ( this->iiu.sendQue.flushBlockThreshold () ) { laborPending = true; break; } } if ( ! this->iiu.sendThreadFlush ( guard ) ) { break; } } if ( this->iiu.state == tcpiiu::iiucs_clean_shutdown ) { this->iiu.sendThreadFlush ( guard ); // this should cause the server to disconnect from // the client int status = ::shutdown ( this->iiu.sock, SHUT_WR ); if ( status ) { char sockErrBuf[64]; epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); errlogPrintf ("CAC TCP clean socket shutdown error was %s\n", sockErrBuf ); } } } catch ( ... ) { errlogPrintf ( "cac: tcp send thread received an unexpected exception " "- disconnecting\n"); // this should cause the server to disconnect from // the client int status = ::shutdown ( this->iiu.sock, SHUT_WR ); if ( status ) { char sockErrBuf[64]; epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); errlogPrintf ("CAC TCP clean socket shutdown error was %s\n", sockErrBuf ); } } this->iiu.sendDog.cancel (); this->iiu.recvDog.shutdown (); while ( ! this->iiu.recvThread.exitWait ( 30.0 ) ) { // it is possible to get stuck here if the user calls // ca_context_destroy() when a circuit isnt known to // be unresponsive, but is. That situation is probably // rare, and the IP kernel might have a timeout for // such situations, nevertheless we will attempt to deal // with it here after waiting a reasonable amount of time // for a clean shutdown to finish. epicsGuard < epicsMutex > guard ( this->iiu.mutex ); this->iiu.initiateAbortShutdown ( guard ); } // user threads blocking for send backlog to be reduced // will abort their attempt to get space if // the state of the tcpiiu changes from connected to a // disconnecting state. Nevertheless, we need to wait // for them to finish prior to destroying the IIU. { epicsGuard < epicsMutex > guard ( this->iiu.mutex ); while ( this->iiu.blockingForFlush ) { epicsGuardRelease < epicsMutex > unguard ( guard ); epicsThreadSleep ( 0.1 ); } } this->iiu.cacRef.destroyIIU ( this->iiu ); } unsigned tcpiiu::sendBytes ( const void *pBuf, unsigned nBytesInBuf, const epicsTime & currentTime ) { unsigned nBytes = 0u; assert ( nBytesInBuf <= INT_MAX ); this->sendDog.start ( currentTime ); while ( true ) { int status = ::send ( this->sock, static_cast < const char * > (pBuf), (int) nBytesInBuf, 0 ); if ( status > 0 ) { nBytes = static_cast ( status ); // printf("SEND: %u\n", nBytes ); break; } else { epicsGuard < epicsMutex > guard ( this->mutex ); if ( this->state != iiucs_connected && this->state != iiucs_clean_shutdown ) { break; } // winsock indicates disconnect by returning zero here if ( status == 0 ) { this->disconnectNotify ( guard ); break; } int localError = SOCKERRNO; if ( localError == SOCK_EINTR ) { continue; } if ( localError == SOCK_ENOBUFS ) { errlogPrintf ( "CAC: system low on network buffers " "- send retry in 15 seconds\n" ); { epicsGuardRelease < epicsMutex > unguard ( guard ); epicsThreadSleep ( 15.0 ); } continue; } if ( localError != SOCK_EPIPE && localError != SOCK_ECONNRESET && localError != SOCK_ETIMEDOUT && localError != SOCK_ECONNABORTED && localError != SOCK_SHUTDOWN ) { char sockErrBuf[64]; epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); errlogPrintf ( "CAC: unexpected TCP send error: %s\n", sockErrBuf ); } this->disconnectNotify ( guard ); break; } } this->sendDog.cancel (); return nBytes; } void tcpiiu::recvBytes ( void * pBuf, unsigned nBytesInBuf, statusWireIO & stat ) { assert ( nBytesInBuf <= INT_MAX ); while ( true ) { int status = ::recv ( this->sock, static_cast ( pBuf ), static_cast ( nBytesInBuf ), 0 ); if ( status > 0 ) { stat.bytesCopied = static_cast ( status ); assert ( stat.bytesCopied <= nBytesInBuf ); stat.circuitState = swioConnected; return; } else { epicsGuard < epicsMutex > guard ( this->mutex ); if ( status == 0 ) { this->disconnectNotify ( guard ); stat.bytesCopied = 0u; stat.circuitState = swioPeerHangup; return; } // if the circuit was locally aborted then supress // warning messages about bad file descriptor etc if ( this->state != iiucs_connected && this->state != iiucs_clean_shutdown ) { stat.bytesCopied = 0u; stat.circuitState = swioLocalAbort; return; } int localErrno = SOCKERRNO; if ( localErrno == SOCK_SHUTDOWN ) { stat.bytesCopied = 0u; stat.circuitState = swioPeerHangup; return; } if ( localErrno == SOCK_EINTR ) { continue; } if ( localErrno == SOCK_ENOBUFS ) { errlogPrintf ( "CAC: system low on network buffers " "- receive retry in 15 seconds\n" ); { epicsGuardRelease < epicsMutex > unguard ( guard ); epicsThreadSleep ( 15.0 ); } continue; } char sockErrBuf[64]; epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); // the replacable printf handler isnt called here // because it reqires a callback lock which probably // isnt appropriate here char name[64]; this->hostNameCacheInstance.getName ( name, sizeof ( name ) ); errlogPrintf ( "Unexpected problem with CA circuit to" " server \"%s\" was \"%s\" - disconnecting\n", name, sockErrBuf ); stat.bytesCopied = 0u; stat.circuitState = swioPeerAbort; return; } } } tcpRecvThread::tcpRecvThread ( class tcpiiu & iiuIn, class epicsMutex & cbMutexIn, cacContextNotify & ctxNotifyIn, const char * pName, unsigned int stackSize, unsigned int priority ) : thread ( *this, pName, stackSize, priority ), iiu ( iiuIn ), cbMutex ( cbMutexIn ), ctxNotify ( ctxNotifyIn ) {} tcpRecvThread::~tcpRecvThread () { } void tcpRecvThread::start () { this->thread.start (); } void tcpRecvThread::show ( unsigned /* level */ ) const { } bool tcpRecvThread::exitWait ( double delay ) { return this->thread.exitWait ( delay ); } void tcpRecvThread::exitWait () { this->thread.exitWait (); } bool tcpRecvThread::validFillStatus ( epicsGuard < epicsMutex > & guard, const statusWireIO & stat ) { if ( this->iiu.state != tcpiiu::iiucs_connected && this->iiu.state != tcpiiu::iiucs_clean_shutdown ) { return false; } if ( stat.circuitState == swioConnected ) { return true; } if ( stat.circuitState == swioPeerHangup || stat.circuitState == swioPeerAbort ) { this->iiu.disconnectNotify ( guard ); } else if ( stat.circuitState == swioLinkFailure ) { this->iiu.initiateAbortShutdown ( guard ); } else if ( stat.circuitState == swioLocalAbort ) { // state change already occurred } else { errlogMessage ( "cac: invalid fill status - disconnecting" ); this->iiu.disconnectNotify ( guard ); } return false; } void tcpRecvThread::run () { try { { bool connectSuccess = false; { epicsGuard < epicsMutex > guard ( this->iiu.mutex ); this->connect ( guard ); connectSuccess = this->iiu.state == tcpiiu::iiucs_connected; } if ( ! connectSuccess ) { this->iiu.recvDog.shutdown (); this->iiu.cacRef.destroyIIU ( this->iiu ); return; } if ( this->iiu.isNameService () ) { this->iiu.pSearchDest->setCircuit ( &this->iiu ); this->iiu.pSearchDest->enable (); } } this->iiu.sendThread.start (); epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu ); this->iiu.cacRef.attachToClientCtx (); comBuf * pComBuf = 0; while ( true ) { // // We leave the bytes pending and fetch them after // callbacks are enabled when running in the old preemptive // call back disabled mode so that asynchronous wakeup via // file manager call backs works correctly. This does not // appear to impact performance. // if ( ! pComBuf ) { pComBuf = new ( this->iiu.comBufMemMgr ) comBuf; } statusWireIO stat; pComBuf->fillFromWire ( this->iiu, stat ); epicsTime currentTime = epicsTime::getCurrent (); { epicsGuard < epicsMutex > guard ( this->iiu.mutex ); if ( ! this->validFillStatus ( guard, stat ) ) { break; } if ( stat.bytesCopied == 0u ) { continue; } this->iiu.recvQue.pushLastComBufReceived ( *pComBuf ); pComBuf = 0; this->iiu._receiveThreadIsBusy = true; } bool sendWakeupNeeded = false; { // only one recv thread at a time may call callbacks // - pendEvent() blocks until threads waiting for // this lock get a chance to run callbackManager mgr ( this->ctxNotify, this->cbMutex ); epicsGuard < epicsMutex > guard ( this->iiu.mutex ); // route legacy V42 channel connect through the recv thread - // the only thread that should be taking the callback lock while ( nciu * pChan = this->iiu.v42ConnCallbackPend.first () ) { this->iiu.connectNotify ( guard, *pChan ); pChan->connect ( mgr.cbGuard, guard ); } this->iiu.unacknowledgedSendBytes = 0u; bool protocolOK = false; { epicsGuardRelease < epicsMutex > unguard ( guard ); // execute receive labor protocolOK = this->iiu.processIncoming ( currentTime, mgr ); } if ( ! protocolOK ) { this->iiu.initiateAbortShutdown ( guard ); break; } this->iiu._receiveThreadIsBusy = false; // reschedule connection activity watchdog this->iiu.recvDog.messageArrivalNotify ( guard ); // // if this thread has connected channels with subscriptions // that need to be sent then wakeup the send thread if ( this->iiu.subscripReqPend.count() ) { sendWakeupNeeded = true; } } // // we dont feel comfortable calling this with a lock applied // (it might block for longer than we like) // // we would prefer to improve efficency by trying, first, a // recv with the new MSG_DONTWAIT flag set, but there isnt // universal support // bool bytesArePending = this->iiu.bytesArePendingInOS (); { epicsGuard < epicsMutex > guard ( this->iiu.mutex ); if ( bytesArePending ) { if ( ! this->iiu.busyStateDetected ) { this->iiu.contigRecvMsgCount++; if ( this->iiu.contigRecvMsgCount >= this->iiu.cacRef.maxContiguousFrames ( guard ) ) { this->iiu.busyStateDetected = true; sendWakeupNeeded = true; } } } else { // if no bytes are pending then we must immediately // switch off flow control w/o waiting for more // data to arrive this->iiu.contigRecvMsgCount = 0u; if ( this->iiu.busyStateDetected ) { sendWakeupNeeded = true; this->iiu.busyStateDetected = false; } } } if ( sendWakeupNeeded ) { this->iiu.sendThreadFlushEvent.signal (); } } if ( pComBuf ) { pComBuf->~comBuf (); this->iiu.comBufMemMgr.release ( pComBuf ); } } catch ( std::bad_alloc & ) { errlogPrintf ( "CA client library tcp receive thread " "terminating due to no space in pool " "C++ exception\n" ); epicsGuard < epicsMutex > guard ( this->iiu.mutex ); this->iiu.initiateCleanShutdown ( guard ); } catch ( std::exception & except ) { errlogPrintf ( "CA client library tcp receive thread " "terminating due to C++ exception \"%s\"\n", except.what () ); epicsGuard < epicsMutex > guard ( this->iiu.mutex ); this->iiu.initiateCleanShutdown ( guard ); } catch ( ... ) { errlogPrintf ( "CA client library tcp receive thread " "terminating due to a non-standard C++ exception\n" ); epicsGuard < epicsMutex > guard ( this->iiu.mutex ); this->iiu.initiateCleanShutdown ( guard ); } } /* * tcpRecvThread::connect () */ void tcpRecvThread::connect ( epicsGuard < epicsMutex > & guard ) { // attempt to connect to a CA server while ( true ) { int status; { epicsGuardRelease < epicsMutex > unguard ( guard ); osiSockAddr tmp = this->iiu.address (); status = ::connect ( this->iiu.sock, & tmp.sa, sizeof ( tmp.sa ) ); } if ( this->iiu.state != tcpiiu::iiucs_connecting ) { break; } if ( status >= 0 ) { // put the iiu into the connected state this->iiu.state = tcpiiu::iiucs_connected; this->iiu.recvDog.connectNotify ( guard ); break; } else { int errnoCpy = SOCKERRNO; if ( errnoCpy == SOCK_EINTR ) { continue; } else if ( errnoCpy == SOCK_SHUTDOWN ) { if ( ! this->iiu.isNameService () ) { break; } } else { char sockErrBuf[64]; epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); errlogPrintf ( "CAC: Unable to connect because \"%s\"\n", sockErrBuf ); if ( ! this->iiu.isNameService () ) { this->iiu.disconnectNotify ( guard ); break; } } { double sleepTime = this->iiu.cacRef.connectionTimeout ( guard ); epicsGuardRelease < epicsMutex > unguard ( guard ); epicsThreadSleep ( sleepTime ); } continue; } } return; } // // tcpiiu::tcpiiu () // tcpiiu::tcpiiu ( cac & cac, epicsMutex & mutexIn, epicsMutex & cbMutexIn, cacContextNotify & ctxNotifyIn, double connectionTimeout, epicsTimerQueue & timerQueue, const osiSockAddr & addrIn, comBufMemoryManager & comBufMemMgrIn, unsigned minorVersion, ipAddrToAsciiEngine & engineIn, const cacChannel::priLev & priorityIn, SearchDestTCP * pSearchDestIn ) : caServerID ( addrIn.ia, priorityIn ), hostNameCacheInstance ( addrIn, engineIn ), recvThread ( *this, cbMutexIn, ctxNotifyIn, "CAC-TCP-recv", epicsThreadGetStackSize ( epicsThreadStackBig ), cac::highestPriorityLevelBelow ( cac.getInitializingThreadsPriority() ) ), sendThread ( *this, "CAC-TCP-send", epicsThreadGetStackSize ( epicsThreadStackMedium ), cac::lowestPriorityLevelAbove ( cac.getInitializingThreadsPriority() ) ), recvDog ( cbMutexIn, ctxNotifyIn, mutexIn, *this, connectionTimeout, timerQueue ), sendDog ( cbMutexIn, ctxNotifyIn, mutexIn, *this, connectionTimeout, timerQueue ), sendQue ( *this, comBufMemMgrIn ), recvQue ( comBufMemMgrIn ), curDataMax ( MAX_TCP ), curDataBytes ( 0ul ), comBufMemMgr ( comBufMemMgrIn ), cacRef ( cac ), pCurData ( cac.allocateSmallBufferTCP () ), pSearchDest ( pSearchDestIn ), mutex ( mutexIn ), cbMutex ( cbMutexIn ), minorProtocolVersion ( minorVersion ), state ( iiucs_connecting ), sock ( INVALID_SOCKET ), contigRecvMsgCount ( 0u ), blockingForFlush ( 0u ), socketLibrarySendBufferSize ( 0x1000 ), unacknowledgedSendBytes ( 0u ), channelCountTot ( 0u ), _receiveThreadIsBusy ( false ), busyStateDetected ( false ), flowControlActive ( false ), echoRequestPending ( false ), oldMsgHeaderAvailable ( false ), msgHeaderAvailable ( false ), earlyFlush ( false ), recvProcessPostponedFlush ( false ), discardingPendingData ( false ), socketHasBeenClosed ( false ), unresponsiveCircuit ( false ) { this->sock = epicsSocketCreate ( AF_INET, SOCK_STREAM, IPPROTO_TCP ); if ( this->sock == INVALID_SOCKET ) { cac.releaseSmallBufferTCP ( this->pCurData ); char sockErrBuf[64]; epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); std :: string reason = "CAC: TCP circuit creation failure because \""; reason += sockErrBuf; reason += "\""; throw runtime_error ( reason ); } int flag = true; int status = setsockopt ( this->sock, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof ( flag ) ); if ( status < 0 ) { char sockErrBuf[64]; epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); errlogPrintf ( "CAC: problems setting socket option TCP_NODELAY = \"%s\"\n", sockErrBuf ); } flag = true; status = setsockopt ( this->sock , SOL_SOCKET, SO_KEEPALIVE, ( char * ) &flag, sizeof ( flag ) ); if ( status < 0 ) { char sockErrBuf[64]; epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); errlogPrintf ( "CAC: problems setting socket option SO_KEEPALIVE = \"%s\"\n", sockErrBuf ); } // load message queue with messages informing server // of version, user, and host name of client { epicsGuard < epicsMutex > guard ( this->mutex ); this->versionMessage ( guard, this->priority() ); this->userNameSetRequest ( guard ); this->hostNameSetRequest ( guard ); } # if 0 { int i; /* * some concern that vxWorks will run out of mBuf's * if this change is made joh 11-10-98 */ i = MAX_MSG_SIZE; status = setsockopt ( this->sock, SOL_SOCKET, SO_SNDBUF, ( char * ) &i, sizeof ( i ) ); if (status < 0) { char sockErrBuf[64]; epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); errlogPrintf ( "CAC: problems setting socket option SO_SNDBUF = \"%s\"\n", sockErrBuf ); } i = MAX_MSG_SIZE; status = setsockopt ( this->sock, SOL_SOCKET, SO_RCVBUF, ( char * ) &i, sizeof ( i ) ); if ( status < 0 ) { char sockErrBuf[64]; epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); errlogPrintf ( "CAC: problems setting socket option SO_RCVBUF = \"%s\"\n", sockErrBuf ); } } # endif { int nBytes; osiSocklen_t sizeOfParameter = static_cast < int > ( sizeof ( nBytes ) ); status = getsockopt ( this->sock, SOL_SOCKET, SO_SNDBUF, ( char * ) &nBytes, &sizeOfParameter ); if ( status < 0 || nBytes < 0 || sizeOfParameter != static_cast < int > ( sizeof ( nBytes ) ) ) { char sockErrBuf[64]; epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); errlogPrintf ("CAC: problems getting socket option SO_SNDBUF = \"%s\"\n", sockErrBuf ); } else { this->socketLibrarySendBufferSize = static_cast < unsigned > ( nBytes ); } } # if 0 // // windows has a really strange implementation of thess options // and we can avoid the need for this by using pthread_kill on unix // { struct timeval timeout; double pollInterval = connectionTimeout / 8.0; timeout.tv_sec = static_cast < long > ( pollInterval ); timeout.tv_usec = static_cast < long > ( ( pollInterval - timeout.tv_sec ) * uSecPerSec ); // intentionally ignore status as we dont expect that all systems // will accept this request setsockopt ( this->sock, SOL_SOCKET, SO_SNDTIMEO, ( char * ) & timeout, sizeof ( timeout ) ); // intentionally ignore status as we dont expect that all systems // will accept this request setsockopt ( this->sock, SOL_SOCKET, SO_RCVTIMEO, ( char * ) & timeout, sizeof ( timeout ) ); } # endif if ( isNameService() ) { pSearchDest->setCircuit ( this ); } memset ( (void *) &this->curMsg, '\0', sizeof ( this->curMsg ) ); } // this must always be called by the udp thread when it holds // the callback lock. void tcpiiu::start ( epicsGuard < epicsMutex > & guard ) { guard.assertIdenticalMutex ( this->mutex ); this->recvThread.start (); } void tcpiiu::initiateCleanShutdown ( epicsGuard < epicsMutex > & guard ) { guard.assertIdenticalMutex ( this->mutex ); if ( this->state == iiucs_connected ) { if ( this->unresponsiveCircuit ) { this->initiateAbortShutdown ( guard ); } else { this->state = iiucs_clean_shutdown; this->sendThreadFlushEvent.signal (); this->flushBlockEvent.signal (); } } else if ( this->state == iiucs_clean_shutdown ) { if ( this->unresponsiveCircuit ) { this->initiateAbortShutdown ( guard ); } } else if ( this->state == iiucs_connecting ) { this->initiateAbortShutdown ( guard ); } } void tcpiiu::disconnectNotify ( epicsGuard < epicsMutex > & guard ) { guard.assertIdenticalMutex ( this->mutex ); this->state = iiucs_disconnected; this->sendThreadFlushEvent.signal (); this->flushBlockEvent.signal (); } void tcpiiu::responsiveCircuitNotify ( epicsGuard < epicsMutex > & cbGuard, epicsGuard < epicsMutex > & guard ) { cbGuard.assertIdenticalMutex ( this->cbMutex ); guard.assertIdenticalMutex ( this->mutex ); if ( this->unresponsiveCircuit ) { this->unresponsiveCircuit = false; while ( nciu * pChan = this->unrespCircuit.get() ) { this->subscripUpdateReqPend.add ( *pChan ); pChan->channelNode::listMember = channelNode::cs_subscripUpdateReqPend; pChan->connect ( cbGuard, guard ); } this->sendThreadFlushEvent.signal (); } } void tcpiiu::sendTimeoutNotify ( callbackManager & mgr, epicsGuard < epicsMutex > & guard ) { mgr.cbGuard.assertIdenticalMutex ( this-> cbMutex ); guard.assertIdenticalMutex ( this->mutex ); this->unresponsiveCircuitNotify ( mgr.cbGuard, guard ); // setup circuit probe sequence this->recvDog.sendTimeoutNotify ( mgr.cbGuard, guard ); } void tcpiiu::receiveTimeoutNotify ( callbackManager & mgr, epicsGuard < epicsMutex > & guard ) { mgr.cbGuard.assertIdenticalMutex ( this->cbMutex ); guard.assertIdenticalMutex ( this->mutex ); this->unresponsiveCircuitNotify ( mgr.cbGuard, guard ); } void tcpiiu::unresponsiveCircuitNotify ( epicsGuard < epicsMutex > & cbGuard, epicsGuard < epicsMutex > & guard ) { cbGuard.assertIdenticalMutex ( this->cbMutex ); guard.assertIdenticalMutex ( this->mutex ); if ( ! this->unresponsiveCircuit ) { this->unresponsiveCircuit = true; this->echoRequestPending = true; this->sendThreadFlushEvent.signal (); this->flushBlockEvent.signal (); // must not hold lock when canceling timer { epicsGuardRelease < epicsMutex > unguard ( guard ); { epicsGuardRelease < epicsMutex > cbUnguard ( cbGuard ); this->recvDog.cancel (); this->sendDog.cancel (); } } if ( this->connectedList.count() ) { char hostNameTmp[128]; this->getHostName ( guard, hostNameTmp, sizeof ( hostNameTmp ) ); genLocalExcep ( cbGuard, guard, this->cacRef, ECA_UNRESPTMO, hostNameTmp ); while ( nciu * pChan = this->connectedList.get () ) { // The cac lock is released herein so there is concern that // the list could be changed while we are traversing it. // However, this occurs only if a circuit disconnects, // a user deletes a channel, or a server disconnects a // channel. The callback lock must be taken in all of // these situations so this code is protected. this->unrespCircuit.add ( *pChan ); pChan->channelNode::listMember = channelNode::cs_unrespCircuit; pChan->unresponsiveCircuitNotify ( cbGuard, guard ); } } } } void tcpiiu::initiateAbortShutdown ( epicsGuard < epicsMutex > & guard ) { guard.assertIdenticalMutex ( this->mutex ); if ( ! this->discardingPendingData ) { // force abortive shutdown sequence // (discard outstanding sends and receives) struct linger tmpLinger; tmpLinger.l_onoff = true; tmpLinger.l_linger = 0u; int status = setsockopt ( this->sock, SOL_SOCKET, SO_LINGER, reinterpret_cast ( &tmpLinger ), sizeof (tmpLinger) ); if ( status != 0 ) { char sockErrBuf[64]; epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); errlogPrintf ( "CAC TCP socket linger set error was %s\n", sockErrBuf ); } this->discardingPendingData = true; } iiu_conn_state oldState = this->state; if ( oldState != iiucs_abort_shutdown && oldState != iiucs_disconnected ) { this->state = iiucs_abort_shutdown; epicsSocketSystemCallInterruptMechanismQueryInfo info = epicsSocketSystemCallInterruptMechanismQuery (); switch ( info ) { case esscimqi_socketCloseRequired: // // on winsock and probably vxWorks shutdown() does not // unblock a thread in recv() so we use close() and introduce // some complexity because we must unregister the fd early // if ( ! this->socketHasBeenClosed ) { epicsSocketDestroy ( this->sock ); this->socketHasBeenClosed = true; } break; case esscimqi_socketBothShutdownRequired: { int status = ::shutdown ( this->sock, SHUT_RDWR ); if ( status ) { char sockErrBuf[64]; epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); errlogPrintf ("CAC TCP socket shutdown error was %s\n", sockErrBuf ); } } break; case esscimqi_socketSigAlarmRequired: this->recvThread.interruptSocketRecv (); this->sendThread.interruptSocketSend (); break; default: break; }; // // wake up the send thread if it isnt blocking in send() // this->sendThreadFlushEvent.signal (); this->flushBlockEvent.signal (); } } // // tcpiiu::~tcpiiu () // tcpiiu :: ~tcpiiu () { if ( this->pSearchDest ) { this->pSearchDest->disable (); } this->sendThread.exitWait (); this->recvThread.exitWait (); this->sendDog.cancel (); this->recvDog.shutdown (); if ( ! this->socketHasBeenClosed ) { epicsSocketDestroy ( this->sock ); } // free message body cache if ( this->pCurData ) { if ( this->curDataMax == MAX_TCP ) { this->cacRef.releaseSmallBufferTCP ( this->pCurData ); } else { this->cacRef.releaseLargeBufferTCP ( this->pCurData ); } } } void tcpiiu::show ( unsigned level ) const { epicsGuard < epicsMutex > locker ( this->mutex ); char buf[256]; this->hostNameCacheInstance.getName ( buf, sizeof ( buf ) ); ::printf ( "Virtual circuit to \"%s\" at version V%u.%u state %u\n", buf, CA_MAJOR_PROTOCOL_REVISION, this->minorProtocolVersion, this->state ); if ( level > 1u ) { ::printf ( "\tcurrent data cache pointer = %p current data cache size = %lu\n", static_cast < void * > ( this->pCurData ), this->curDataMax ); ::printf ( "\tcontiguous receive message count=%u, busy detect bool=%u, flow control bool=%u\n", this->contigRecvMsgCount, this->busyStateDetected, this->flowControlActive ); ::printf ( "\receive thread is busy=%u\n", this->_receiveThreadIsBusy ); } if ( level > 2u ) { ::printf ( "\tvirtual circuit socket identifier %d\n", this->sock ); ::printf ( "\tsend thread flush signal:\n" ); this->sendThreadFlushEvent.show ( level-2u ); ::printf ( "\tsend thread:\n" ); this->sendThread.show ( level-2u ); ::printf ( "\trecv thread:\n" ); this->recvThread.show ( level-2u ); ::printf ("\techo pending bool = %u\n", this->echoRequestPending ); ::printf ( "IO identifier hash table:\n" ); if ( this->createReqPend.count () ) { ::printf ( "Create request pending channels\n" ); tsDLIterConst < nciu > pChan = this->createReqPend.firstIter (); while ( pChan.valid () ) { pChan->show ( level - 2u ); pChan++; } } if ( this->createRespPend.count () ) { ::printf ( "Create response pending channels\n" ); tsDLIterConst < nciu > pChan = this->createRespPend.firstIter (); while ( pChan.valid () ) { pChan->show ( level - 2u ); pChan++; } } if ( this->v42ConnCallbackPend.count () ) { ::printf ( "V42 Conn Callback pending channels\n" ); tsDLIterConst < nciu > pChan = this->v42ConnCallbackPend.firstIter (); while ( pChan.valid () ) { pChan->show ( level - 2u ); pChan++; } } if ( this->subscripReqPend.count () ) { ::printf ( "Subscription request pending channels\n" ); tsDLIterConst < nciu > pChan = this->subscripReqPend.firstIter (); while ( pChan.valid () ) { pChan->show ( level - 2u ); pChan++; } } if ( this->connectedList.count () ) { ::printf ( "Connected channels\n" ); tsDLIterConst < nciu > pChan = this->connectedList.firstIter (); while ( pChan.valid () ) { pChan->show ( level - 2u ); pChan++; } } if ( this->unrespCircuit.count () ) { ::printf ( "Unresponsive circuit channels\n" ); tsDLIterConst < nciu > pChan = this->unrespCircuit.firstIter (); while ( pChan.valid () ) { pChan->show ( level - 2u ); pChan++; } } } } bool tcpiiu::setEchoRequestPending ( epicsGuard < epicsMutex > & guard ) // X aCC 361 { guard.assertIdenticalMutex ( this->mutex ); this->echoRequestPending = true; this->sendThreadFlushEvent.signal (); if ( CA_V43 ( this->minorProtocolVersion ) ) { // we send an echo return true; } else { // we send a NOOP return false; } } void tcpiiu::flushIfRecvProcessRequested ( epicsGuard < epicsMutex > & guard ) { if ( this->recvProcessPostponedFlush ) { this->flushRequest ( guard ); this->recvProcessPostponedFlush = false; } } bool tcpiiu::processIncoming ( const epicsTime & currentTime, callbackManager & mgr ) { mgr.cbGuard.assertIdenticalMutex ( this->cbMutex ); while ( true ) { // // fetch a complete message header // if ( ! this->msgHeaderAvailable ) { if ( ! this->oldMsgHeaderAvailable ) { this->oldMsgHeaderAvailable = this->recvQue.popOldMsgHeader ( this->curMsg ); if ( ! this->oldMsgHeaderAvailable ) { epicsGuard < epicsMutex > guard ( this->mutex ); this->flushIfRecvProcessRequested ( guard ); return true; } } if ( this->curMsg.m_postsize == 0xffff ) { static const unsigned annexSize = sizeof ( this->curMsg.m_postsize ) + sizeof ( this->curMsg.m_count ); if ( this->recvQue.occupiedBytes () < annexSize ) { epicsGuard < epicsMutex > guard ( this->mutex ); this->flushIfRecvProcessRequested ( guard ); return true; } this->curMsg.m_postsize = this->recvQue.popUInt32 (); this->curMsg.m_count = this->recvQue.popUInt32 (); } this->msgHeaderAvailable = true; # ifdef DEBUG epicsGuard < epicsMutex > guard ( this->mutex ); debugPrintf ( ( "%s Cmd=%3u Type=%3u Count=%8u Size=%8u", this->pHostName ( guard ), this->curMsg.m_cmmd, this->curMsg.m_dataType, this->curMsg.m_count, this->curMsg.m_postsize) ); debugPrintf ( ( " Avail=%8u Cid=%8u\n", this->curMsg.m_available, this->curMsg.m_cid) ); # endif } // check for 8 byte aligned protocol if ( this->curMsg.m_postsize & 0x7 ) { this->printFormated ( mgr.cbGuard, "CAC: server sent missaligned payload 0x%x\n", this->curMsg.m_postsize ); return false; } // // make sure we have a large enough message body cache // if ( this->curMsg.m_postsize > this->curDataMax ) { if ( this->curDataMax == MAX_TCP && this->cacRef.largeBufferSizeTCP() >= this->curMsg.m_postsize ) { char * pBuf = this->cacRef.allocateLargeBufferTCP (); if ( pBuf ) { this->cacRef.releaseSmallBufferTCP ( this->pCurData ); this->pCurData = pBuf; this->curDataMax = this->cacRef.largeBufferSizeTCP (); } else { this->printFormated ( mgr.cbGuard, "CAC: not enough memory for message body cache (ignoring response message)\n"); } } } if ( this->curMsg.m_postsize <= this->curDataMax ) { if ( this->curMsg.m_postsize > 0u ) { this->curDataBytes += this->recvQue.copyOutBytes ( &this->pCurData[this->curDataBytes], this->curMsg.m_postsize - this->curDataBytes ); if ( this->curDataBytes < this->curMsg.m_postsize ) { epicsGuard < epicsMutex > guard ( this->mutex ); this->flushIfRecvProcessRequested ( guard ); return true; } } bool msgOK = this->cacRef.executeResponse ( mgr, *this, currentTime, this->curMsg, this->pCurData ); if ( ! msgOK ) { return false; } } else { static bool once = false; if ( ! once ) { this->printFormated ( mgr.cbGuard, "CAC: response with payload size=%u > EPICS_CA_MAX_ARRAY_BYTES ignored\n", this->curMsg.m_postsize ); once = true; } this->curDataBytes += this->recvQue.removeBytes ( this->curMsg.m_postsize - this->curDataBytes ); if ( this->curDataBytes < this->curMsg.m_postsize ) { epicsGuard < epicsMutex > guard ( this->mutex ); this->flushIfRecvProcessRequested ( guard ); return true; } } this->oldMsgHeaderAvailable = false; this->msgHeaderAvailable = false; this->curDataBytes = 0u; } # if defined ( __HP_aCC ) && _HP_aCC <= 033300 return false; // to make hpux compiler happy... # endif } void tcpiiu::hostNameSetRequest ( epicsGuard < epicsMutex > & guard ) // X aCC 431 { guard.assertIdenticalMutex ( this->mutex ); if ( ! CA_V41 ( this->minorProtocolVersion ) ) { return; } const char * pName = this->cacRef.pLocalHostName (); unsigned size = strlen ( pName ) + 1u; unsigned postSize = CA_MESSAGE_ALIGN ( size ); assert ( postSize < 0xffff ); if ( this->sendQue.flushEarlyThreshold ( postSize + 16u ) ) { this->flushRequest ( guard ); } comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestHeader ( CA_PROTO_HOST_NAME, postSize, 0u, 0u, 0u, 0u, CA_V49 ( this->minorProtocolVersion ) ); this->sendQue.pushString ( pName, size ); this->sendQue.pushString ( cacNillBytes, postSize - size ); minder.commit (); } /* * tcpiiu::userNameSetRequest () */ void tcpiiu::userNameSetRequest ( epicsGuard < epicsMutex > & guard ) // X aCC 431 { guard.assertIdenticalMutex ( this->mutex ); if ( ! CA_V41 ( this->minorProtocolVersion ) ) { return; } const char *pName = this->cacRef.userNamePointer (); unsigned size = strlen ( pName ) + 1u; unsigned postSize = CA_MESSAGE_ALIGN ( size ); assert ( postSize < 0xffff ); if ( this->sendQue.flushEarlyThreshold ( postSize + 16u ) ) { this->flushRequest ( guard ); } comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestHeader ( CA_PROTO_CLIENT_NAME, postSize, 0u, 0u, 0u, 0u, CA_V49 ( this->minorProtocolVersion ) ); this->sendQue.pushString ( pName, size ); this->sendQue.pushString ( cacNillBytes, postSize - size ); minder.commit (); } void tcpiiu::disableFlowControlRequest ( epicsGuard < epicsMutex > & guard ) // X aCC 431 { guard.assertIdenticalMutex ( this->mutex ); if ( this->sendQue.flushEarlyThreshold ( 16u ) ) { this->flushRequest ( guard ); } comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestHeader ( CA_PROTO_EVENTS_ON, 0u, 0u, 0u, 0u, 0u, CA_V49 ( this->minorProtocolVersion ) ); minder.commit (); } void tcpiiu::enableFlowControlRequest ( epicsGuard < epicsMutex > & guard ) // X aCC 431 { guard.assertIdenticalMutex ( this->mutex ); if ( this->sendQue.flushEarlyThreshold ( 16u ) ) { this->flushRequest ( guard ); } comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestHeader ( CA_PROTO_EVENTS_OFF, 0u, 0u, 0u, 0u, 0u, CA_V49 ( this->minorProtocolVersion ) ); minder.commit (); } void tcpiiu::versionMessage ( epicsGuard < epicsMutex > & guard, // X aCC 431 const cacChannel::priLev & priority ) { guard.assertIdenticalMutex ( this->mutex ); assert ( priority <= 0xffff ); if ( this->sendQue.flushEarlyThreshold ( 16u ) ) { this->flushRequest ( guard ); } comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestHeader ( CA_PROTO_VERSION, 0u, static_cast < ca_uint16_t > ( priority ), CA_MINOR_PROTOCOL_REVISION, 0u, 0u, CA_V49 ( this->minorProtocolVersion ) ); minder.commit (); } void tcpiiu::echoRequest ( epicsGuard < epicsMutex > & guard ) // X aCC 431 { guard.assertIdenticalMutex ( this->mutex ); epicsUInt16 command = CA_PROTO_ECHO; if ( ! CA_V43 ( this->minorProtocolVersion ) ) { // we fake an echo to early server using a read sync command = CA_PROTO_READ_SYNC; } if ( this->sendQue.flushEarlyThreshold ( 16u ) ) { this->flushRequest ( guard ); } comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestHeader ( command, 0u, 0u, 0u, 0u, 0u, CA_V49 ( this->minorProtocolVersion ) ); minder.commit (); } void tcpiiu::writeRequest ( epicsGuard < epicsMutex > & guard, // X aCC 431 nciu &chan, unsigned type, arrayElementCount nElem, const void *pValue ) { guard.assertIdenticalMutex ( this->mutex ); if ( INVALID_DB_REQ ( type ) ) { throw cacChannel::badType (); } comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestWithPayLoad ( CA_PROTO_WRITE, type, nElem, chan.getSID(guard), chan.getCID(guard), pValue, CA_V49 ( this->minorProtocolVersion ) ); minder.commit (); } void tcpiiu::writeNotifyRequest ( epicsGuard < epicsMutex > & guard, // X aCC 431 nciu &chan, netWriteNotifyIO &io, unsigned type, arrayElementCount nElem, const void *pValue ) { guard.assertIdenticalMutex ( this->mutex ); if ( ! this->ca_v41_ok ( guard ) ) { throw cacChannel::unsupportedByService(); } if ( INVALID_DB_REQ ( type ) ) { throw cacChannel::badType (); } comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestWithPayLoad ( CA_PROTO_WRITE_NOTIFY, type, nElem, chan.getSID(guard), io.getId(), pValue, CA_V49 ( this->minorProtocolVersion ) ); minder.commit (); } void tcpiiu::readNotifyRequest ( epicsGuard < epicsMutex > & guard, // X aCC 431 nciu & chan, netReadNotifyIO & io, unsigned dataType, arrayElementCount nElem ) { guard.assertIdenticalMutex ( this->mutex ); if ( INVALID_DB_REQ ( dataType ) ) { throw cacChannel::badType (); } arrayElementCount maxBytes; if ( CA_V49 ( this->minorProtocolVersion ) ) { maxBytes = this->cacRef.largeBufferSizeTCP (); } else { maxBytes = MAX_TCP; } arrayElementCount maxElem = ( maxBytes - dbr_size[dataType] ) / dbr_value_size[dataType]; if ( nElem > maxElem ) { throw cacChannel::msgBodyCacheTooSmall (); } if (nElem == 0 && !CA_V413(this->minorProtocolVersion)) nElem = chan.getcount(); comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestHeader ( CA_PROTO_READ_NOTIFY, 0u, static_cast < ca_uint16_t > ( dataType ), static_cast < ca_uint32_t > ( nElem ), chan.getSID(guard), io.getId(), CA_V49 ( this->minorProtocolVersion ) ); minder.commit (); } void tcpiiu::createChannelRequest ( nciu & chan, epicsGuard < epicsMutex > & guard ) // X aCC 431 { guard.assertIdenticalMutex ( this->mutex ); if ( this->state != iiucs_connected && this->state != iiucs_connecting ) { return; } const char *pName; unsigned nameLength; ca_uint32_t identity; if ( this->ca_v44_ok ( guard ) ) { identity = chan.getCID ( guard ); pName = chan.pName ( guard ); nameLength = chan.nameLen ( guard ); } else { identity = chan.getSID ( guard ); pName = 0; nameLength = 0u; } unsigned postCnt = CA_MESSAGE_ALIGN ( nameLength ); if ( postCnt >= 0xffff ) { throw cacChannel::unsupportedByService(); } comQueSendMsgMinder minder ( this->sendQue, guard ); // // The available field is used (abused) // here to communicate the minor version number // starting with CA 4.1. // this->sendQue.insertRequestHeader ( CA_PROTO_CREATE_CHAN, postCnt, 0u, 0u, identity, CA_MINOR_PROTOCOL_REVISION, CA_V49 ( this->minorProtocolVersion ) ); if ( nameLength ) { this->sendQue.pushString ( pName, nameLength ); } if ( postCnt > nameLength ) { this->sendQue.pushString ( cacNillBytes, postCnt - nameLength ); } minder.commit (); } void tcpiiu::clearChannelRequest ( epicsGuard < epicsMutex > & guard, // X aCC 431 ca_uint32_t sid, ca_uint32_t cid ) { guard.assertIdenticalMutex ( this->mutex ); // there are situations where the circuit is disconnected, but // the channel does not know this yet if ( this->state != iiucs_connected ) { return; } comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestHeader ( CA_PROTO_CLEAR_CHANNEL, 0u, 0u, 0u, sid, cid, CA_V49 ( this->minorProtocolVersion ) ); minder.commit (); } // // this routine return void because if this internally fails the best response // is to try again the next time that we reconnect // void tcpiiu::subscriptionRequest ( epicsGuard < epicsMutex > & guard, // X aCC 431 nciu & chan, netSubscription & subscr ) { guard.assertIdenticalMutex ( this->mutex ); // there are situations where the circuit is disconnected, but // the channel does not know this yet if ( this->state != iiucs_connected && this->state != iiucs_connecting ) { return; } unsigned mask = subscr.getMask(guard); if ( mask > 0xffff ) { throw cacChannel::badEventSelection (); } arrayElementCount nElem = subscr.getCount ( guard, CA_V413(this->minorProtocolVersion) ); arrayElementCount maxBytes; if ( CA_V49 ( this->minorProtocolVersion ) ) { maxBytes = this->cacRef.largeBufferSizeTCP (); } else { maxBytes = MAX_TCP; } unsigned dataType = subscr.getType ( guard ); // data type bounds checked when sunscription created arrayElementCount maxElem = ( maxBytes - dbr_size[dataType] ) / dbr_value_size[dataType]; if ( nElem > maxElem ) { throw cacChannel::msgBodyCacheTooSmall (); } comQueSendMsgMinder minder ( this->sendQue, guard ); // nElement bounds checked above this->sendQue.insertRequestHeader ( CA_PROTO_EVENT_ADD, 16u, static_cast < ca_uint16_t > ( dataType ), static_cast < ca_uint32_t > ( nElem ), chan.getSID(guard), subscr.getId(), CA_V49 ( this->minorProtocolVersion ) ); // extension this->sendQue.pushFloat32 ( 0.0f ); // m_lval this->sendQue.pushFloat32 ( 0.0f ); // m_hval this->sendQue.pushFloat32 ( 0.0f ); // m_toval this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( mask ) ); // m_mask this->sendQue.pushUInt16 ( 0u ); // m_pad minder.commit (); } // // this routine return void because if this internally fails the best response // is to try again the next time that we reconnect // void tcpiiu::subscriptionUpdateRequest ( epicsGuard < epicsMutex > & guard, // X aCC 431 nciu & chan, netSubscription & subscr ) { guard.assertIdenticalMutex ( this->mutex ); // there are situations where the circuit is disconnected, but // the channel does not know this yet if ( this->state != iiucs_connected ) { return; } arrayElementCount nElem = subscr.getCount ( guard, CA_V413(this->minorProtocolVersion) ); arrayElementCount maxBytes; if ( CA_V49 ( this->minorProtocolVersion ) ) { maxBytes = this->cacRef.largeBufferSizeTCP (); } else { maxBytes = MAX_TCP; } unsigned dataType = subscr.getType ( guard ); // data type bounds checked when subscription constructed arrayElementCount maxElem = ( maxBytes - dbr_size[dataType] ) / dbr_value_size[dataType]; if ( nElem > maxElem ) { throw cacChannel::msgBodyCacheTooSmall (); } comQueSendMsgMinder minder ( this->sendQue, guard ); // nElem boounds checked above this->sendQue.insertRequestHeader ( CA_PROTO_READ_NOTIFY, 0u, static_cast < ca_uint16_t > ( dataType ), static_cast < ca_uint32_t > ( nElem ), chan.getSID (guard), subscr.getId (), CA_V49 ( this->minorProtocolVersion ) ); minder.commit (); } void tcpiiu::subscriptionCancelRequest ( epicsGuard < epicsMutex > & guard, // X aCC 431 nciu & chan, netSubscription & subscr ) { guard.assertIdenticalMutex ( this->mutex ); // there are situations where the circuit is disconnected, but // the channel does not know this yet if ( this->state != iiucs_connected ) { return; } comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestHeader ( CA_PROTO_EVENT_CANCEL, 0u, static_cast < ca_uint16_t > ( subscr.getType ( guard ) ), static_cast < ca_uint16_t > ( subscr.getCount ( guard, CA_V413(this->minorProtocolVersion) ) ), chan.getSID(guard), subscr.getId(), CA_V49 ( this->minorProtocolVersion ) ); minder.commit (); } bool tcpiiu::sendThreadFlush ( epicsGuard < epicsMutex > & guard ) { guard.assertIdenticalMutex ( this->mutex ); if ( this->sendQue.occupiedBytes() > 0 ) { while ( comBuf * pBuf = this->sendQue.popNextComBufToSend () ) { epicsTime current = epicsTime::getCurrent (); unsigned bytesToBeSent = pBuf->occupiedBytes (); bool success = false; { // no lock while blocking to send epicsGuardRelease < epicsMutex > unguard ( guard ); success = pBuf->flushToWire ( *this, current ); pBuf->~comBuf (); this->comBufMemMgr.release ( pBuf ); } if ( ! success ) { while ( ( pBuf = this->sendQue.popNextComBufToSend () ) ) { pBuf->~comBuf (); this->comBufMemMgr.release ( pBuf ); } return false; } // set it here with this odd order because we must have // the lock and we must have already sent the bytes this->unacknowledgedSendBytes += bytesToBeSent; if ( this->unacknowledgedSendBytes > this->socketLibrarySendBufferSize ) { this->recvDog.sendBacklogProgressNotify ( guard ); } } } this->earlyFlush = false; if ( this->blockingForFlush ) { this->flushBlockEvent.signal (); } return true; } void tcpiiu :: flush ( epicsGuard < epicsMutex > & guard ) { this->flushRequest ( guard ); // the process thread is not permitted to flush as this // can result in a push / pull deadlock on the TCP pipe. // Instead, the process thread scheduals the flush with the // send thread which runs at a higher priority than the // receive thread. The same applies to the UDP thread for // locking hierarchy reasons. if ( ! epicsThreadPrivateGet ( caClientCallbackThreadId ) ) { // enable / disable of call back preemption must occur here // because the tcpiiu might disconnect while waiting and its // pointer to this cac might become invalid assert ( this->blockingForFlush < UINT_MAX ); this->blockingForFlush++; while ( this->sendQue.flushBlockThreshold() ) { bool userRequestsCanBeAccepted = this->state == iiucs_connected || ( ! this->ca_v42_ok ( guard ) && this->state == iiucs_connecting ); // fail the users request if we have a disconnected // or unresponsive circuit if ( ! userRequestsCanBeAccepted || this->unresponsiveCircuit ) { this->decrementBlockingForFlushCount ( guard ); throw cacChannel::notConnected (); } epicsGuardRelease < epicsMutex > unguard ( guard ); this->flushBlockEvent.wait ( 30.0 ); } this->decrementBlockingForFlushCount ( guard ); } } unsigned tcpiiu::requestMessageBytesPending ( epicsGuard < epicsMutex > & guard ) { guard.assertIdenticalMutex ( this->mutex ); #if 0 if ( ! this->earlyFlush && this->sendQue.flushEarlyThreshold(0u) ) { this->earlyFlush = true; this->sendThreadFlushEvent.signal (); } #endif return sendQue.occupiedBytes (); } void tcpiiu::decrementBlockingForFlushCount ( epicsGuard < epicsMutex > & guard ) { guard.assertIdenticalMutex ( this->mutex ); assert ( this->blockingForFlush > 0u ); this->blockingForFlush--; if ( this->blockingForFlush > 0 ) { this->flushBlockEvent.signal (); } } osiSockAddr tcpiiu::getNetworkAddress ( epicsGuard < epicsMutex > & guard ) const { guard.assertIdenticalMutex ( this->mutex ); return this->address(); } // not inline because its virtual bool tcpiiu::ca_v42_ok ( epicsGuard < epicsMutex > & guard ) const { guard.assertIdenticalMutex ( this->mutex ); return CA_V42 ( this->minorProtocolVersion ); } void tcpiiu::requestRecvProcessPostponedFlush ( epicsGuard < epicsMutex > & guard ) { guard.assertIdenticalMutex ( this->mutex ); this->recvProcessPostponedFlush = true; } unsigned tcpiiu::getHostName ( epicsGuard < epicsMutex > & guard, char * pBuf, unsigned bufLength ) const throw () { guard.assertIdenticalMutex ( this->mutex ); return this->hostNameCacheInstance.getName ( pBuf, bufLength ); } const char * tcpiiu::pHostName ( epicsGuard < epicsMutex > & guard ) const throw () { guard.assertIdenticalMutex ( this->mutex ); return this->hostNameCacheInstance.pointer (); } void tcpiiu::disconnectAllChannels ( epicsGuard < epicsMutex > & cbGuard, epicsGuard < epicsMutex > & guard, class udpiiu & discIIU ) { cbGuard.assertIdenticalMutex ( this->cbMutex ); guard.assertIdenticalMutex ( this->mutex ); while ( nciu * pChan = this->createReqPend.get () ) { discIIU.installDisconnectedChannel ( guard, *pChan ); } while ( nciu * pChan = this->createRespPend.get () ) { // we dont yet know the server's id so we cant // send a channel delete request and will instead // trust that the server can do the proper cleanup // when the circuit disconnects discIIU.installDisconnectedChannel ( guard, *pChan ); } while ( nciu * pChan = this->v42ConnCallbackPend.get () ) { this->clearChannelRequest ( guard, pChan->getSID(guard), pChan->getCID(guard) ); discIIU.installDisconnectedChannel ( guard, *pChan ); } while ( nciu * pChan = this->subscripReqPend.get () ) { pChan->disconnectAllIO ( cbGuard, guard ); this->clearChannelRequest ( guard, pChan->getSID(guard), pChan->getCID(guard) ); discIIU.installDisconnectedChannel ( guard, *pChan ); pChan->unresponsiveCircuitNotify ( cbGuard, guard ); } while ( nciu * pChan = this->connectedList.get () ) { pChan->disconnectAllIO ( cbGuard, guard ); this->clearChannelRequest ( guard, pChan->getSID(guard), pChan->getCID(guard) ); discIIU.installDisconnectedChannel ( guard, *pChan ); pChan->unresponsiveCircuitNotify ( cbGuard, guard ); } while ( nciu * pChan = this->unrespCircuit.get () ) { // if we know that the circuit is unresponsive // then we dont send a channel delete request and // will instead trust that the server can do the // proper cleanup when the circuit disconnects pChan->disconnectAllIO ( cbGuard, guard ); discIIU.installDisconnectedChannel ( guard, *pChan ); } while ( nciu * pChan = this->subscripUpdateReqPend.get () ) { pChan->disconnectAllIO ( cbGuard, guard ); this->clearChannelRequest ( guard, pChan->getSID(guard), pChan->getCID(guard) ); discIIU.installDisconnectedChannel ( guard, *pChan ); pChan->unresponsiveCircuitNotify ( cbGuard, guard ); } this->channelCountTot = 0u; this->initiateCleanShutdown ( guard ); } void tcpiiu::unlinkAllChannels ( epicsGuard < epicsMutex > & cbGuard, epicsGuard < epicsMutex > & guard ) { cbGuard.assertIdenticalMutex ( this->cbMutex ); guard.assertIdenticalMutex ( this->mutex ); while ( nciu * pChan = this->createReqPend.get () ) { pChan->channelNode::listMember = channelNode::cs_none; pChan->serviceShutdownNotify ( cbGuard, guard ); } while ( nciu * pChan = this->createRespPend.get () ) { pChan->channelNode::listMember = channelNode::cs_none; // we dont yet know the server's id so we cant // send a channel delete request and will instead // trust that the server can do the proper cleanup // when the circuit disconnects pChan->serviceShutdownNotify ( cbGuard, guard ); } while ( nciu * pChan = this->v42ConnCallbackPend.get () ) { pChan->channelNode::listMember = channelNode::cs_none; this->clearChannelRequest ( guard, pChan->getSID(guard), pChan->getCID(guard) ); pChan->serviceShutdownNotify ( cbGuard, guard ); } while ( nciu * pChan = this->subscripReqPend.get () ) { pChan->channelNode::listMember = channelNode::cs_none; pChan->disconnectAllIO ( cbGuard, guard ); this->clearChannelRequest ( guard, pChan->getSID(guard), pChan->getCID(guard) ); pChan->serviceShutdownNotify ( cbGuard, guard ); } while ( nciu * pChan = this->connectedList.get () ) { pChan->channelNode::listMember = channelNode::cs_none; pChan->disconnectAllIO ( cbGuard, guard ); this->clearChannelRequest ( guard, pChan->getSID(guard), pChan->getCID(guard) ); pChan->serviceShutdownNotify ( cbGuard, guard ); } while ( nciu * pChan = this->unrespCircuit.get () ) { pChan->channelNode::listMember = channelNode::cs_none; pChan->disconnectAllIO ( cbGuard, guard ); // if we know that the circuit is unresponsive // then we dont send a channel delete request and // will instead trust that the server can do the // proper cleanup when the circuit disconnects pChan->serviceShutdownNotify ( cbGuard, guard ); } while ( nciu * pChan = this->subscripUpdateReqPend.get () ) { pChan->channelNode::listMember = channelNode::cs_none; pChan->disconnectAllIO ( cbGuard, guard ); this->clearChannelRequest ( guard, pChan->getSID(guard), pChan->getCID(guard) ); pChan->serviceShutdownNotify ( cbGuard, guard ); } this->channelCountTot = 0u; this->initiateCleanShutdown ( guard ); } void tcpiiu::installChannel ( epicsGuard < epicsMutex > & guard, nciu & chan, unsigned sidIn, ca_uint16_t typeIn, arrayElementCount countIn ) { guard.assertIdenticalMutex ( this->mutex ); this->createReqPend.add ( chan ); this->channelCountTot++; chan.channelNode::listMember = channelNode::cs_createReqPend; chan.searchReplySetUp ( *this, sidIn, typeIn, countIn, guard ); // The tcp send thread runs at apriority below the udp thread // so that this will not send small packets this->sendThreadFlushEvent.signal (); } bool tcpiiu :: connectNotify ( epicsGuard < epicsMutex > & guard, nciu & chan ) { guard.assertIdenticalMutex ( this->mutex ); bool wasExpected = false; // this improves robustness in the face of a server sending // protocol that does not match its declared protocol revision if ( chan.channelNode::listMember == channelNode::cs_createRespPend ) { this->createRespPend.remove ( chan ); this->subscripReqPend.add ( chan ); chan.channelNode::listMember = channelNode::cs_subscripReqPend; wasExpected = true; } else if ( chan.channelNode::listMember == channelNode::cs_v42ConnCallbackPend ) { this->v42ConnCallbackPend.remove ( chan ); this->subscripReqPend.add ( chan ); chan.channelNode::listMember = channelNode::cs_subscripReqPend; wasExpected = true; } // the TCP send thread is awakened by its receive thread whenever the receive thread // is about to block if this->subscripReqPend has items in it return wasExpected; } void tcpiiu::uninstallChan ( epicsGuard < epicsMutex > & guard, nciu & chan ) { guard.assertIdenticalMutex ( this->mutex ); switch ( chan.channelNode::listMember ) { case channelNode::cs_createReqPend: this->createReqPend.remove ( chan ); break; case channelNode::cs_createRespPend: this->createRespPend.remove ( chan ); break; case channelNode::cs_v42ConnCallbackPend: this->v42ConnCallbackPend.remove ( chan ); break; case channelNode::cs_subscripReqPend: this->subscripReqPend.remove ( chan ); break; case channelNode::cs_connected: this->connectedList.remove ( chan ); break; case channelNode::cs_unrespCircuit: this->unrespCircuit.remove ( chan ); break; case channelNode::cs_subscripUpdateReqPend: this->subscripUpdateReqPend.remove ( chan ); break; default: errlogPrintf ( "cac: attempt to uninstall channel from tcp iiu, but it inst installed there?" ); } chan.channelNode::listMember = channelNode::cs_none; this->channelCountTot--; if ( this->channelCountTot == 0 && ! this->isNameService() ) { this->initiateCleanShutdown ( guard ); } } int tcpiiu :: printFormated ( epicsGuard < epicsMutex > & cbGuard, const char *pformat, ... ) { cbGuard.assertIdenticalMutex ( this->cbMutex ); va_list theArgs; int status; va_start ( theArgs, pformat ); status = this->cacRef.varArgsPrintFormated ( cbGuard, pformat, theArgs ); va_end ( theArgs ); return status; } void tcpiiu::flushRequest ( epicsGuard < epicsMutex > & ) { if ( this->sendQue.occupiedBytes () > 0 ) { this->sendThreadFlushEvent.signal (); } } bool tcpiiu::bytesArePendingInOS () const { #if 0 FD_SET readBits; FD_ZERO ( & readBits ); FD_SET ( this->sock, & readBits ); struct timeval tmo; tmo.tv_sec = 0; tmo.tv_usec = 0; int status = select ( this->sock + 1, & readBits, NULL, NULL, & tmo ); if ( status > 0 ) { if ( FD_ISSET ( this->sock, & readBits ) ) { return true; } } return false; #else osiSockIoctl_t bytesPending = 0; /* shut up purifys yapping */ int status = socket_ioctl ( this->sock, // X aCC 392 FIONREAD, & bytesPending ); if ( status >= 0 ) { if ( bytesPending > 0 ) { return true; } } return false; #endif } double tcpiiu::receiveWatchdogDelay ( epicsGuard < epicsMutex > & ) const { return this->recvDog.delay (); } /* * Certain OS, such as HPUX, do not unblock a socket system call * when another thread asynchronously calls both shutdown() and * close(). To solve this problem we need to employ OS specific * mechanisms. */ void tcpRecvThread::interruptSocketRecv () { epicsThreadId threadId = this->thread.getId (); if ( threadId ) { epicsSignalRaiseSigAlarm ( threadId ); } } void tcpSendThread::interruptSocketSend () { epicsThreadId threadId = this->thread.getId (); if ( threadId ) { epicsSignalRaiseSigAlarm ( threadId ); } } void tcpiiu::operator delete ( void * /* pCadaver */ ) { // Visual C++ .net appears to require operator delete if // placement operator delete is defined? I smell a ms rat // because if I declare placement new and delete, but // comment out the placement delete definition there are // no undefined symbols. errlogPrintf ( "%s:%d this compiler is confused about " "placement delete - memory was probably leaked", __FILE__, __LINE__ ); } unsigned tcpiiu::channelCount ( epicsGuard < epicsMutex > & guard ) { guard.assertIdenticalMutex ( this->mutex ); return this->channelCountTot; } void tcpiiu::uninstallChanDueToSuccessfulSearchResponse ( epicsGuard < epicsMutex > & guard, nciu & chan, const class epicsTime & currentTime ) { netiiu::uninstallChanDueToSuccessfulSearchResponse ( guard, chan, currentTime ); } bool tcpiiu::searchMsg ( epicsGuard < epicsMutex > & guard, ca_uint32_t id, const char * pName, unsigned nameLength ) { return netiiu::searchMsg ( guard, id, pName, nameLength ); } SearchDestTCP :: SearchDestTCP ( cac & cacIn, const osiSockAddr & addrIn ) : _ptcpiiu ( NULL ), _cac ( cacIn ), _addr ( addrIn ), _active ( false ) { } void SearchDestTCP :: disable () { _active = false; _ptcpiiu = NULL; } void SearchDestTCP :: enable () { _active = true; } void SearchDestTCP :: searchRequest ( epicsGuard < epicsMutex > & guard, const char * pBuf, size_t len ) { // restart circuit if it was shut down if ( ! _ptcpiiu ) { tcpiiu * piiu = NULL; bool newIIU = _cac.findOrCreateVirtCircuit ( guard, _addr, cacChannel::priorityDefault, piiu, CA_UKN_MINOR_VERSION, this ); if ( newIIU ) { piiu->start ( guard ); } _ptcpiiu = piiu; } // does this server support TCP-based name resolution? if ( CA_V412 ( _ptcpiiu->minorProtocolVersion ) ) { guard.assertIdenticalMutex ( _ptcpiiu->mutex ); assert ( CA_MESSAGE_ALIGN ( len ) == len ); comQueSendMsgMinder minder ( _ptcpiiu->sendQue, guard ); _ptcpiiu->sendQue.pushString ( pBuf, len ); minder.commit (); _ptcpiiu->flushRequest ( guard ); } } void SearchDestTCP :: show ( epicsGuard < epicsMutex > & guard, unsigned level ) const { :: printf ( "tcpiiu :: SearchDestTCP\n" ); } void tcpiiu :: versionRespNotify ( const caHdrLargeArray & msg ) { this->minorProtocolVersion = msg.m_count; } void tcpiiu :: searchRespNotify ( const epicsTime & currentTime, const caHdrLargeArray & msg ) { /* * the type field is abused to carry the port number * so that we can have multiple servers on one host */ osiSockAddr serverAddr; if ( msg.m_cid != INADDR_BROADCAST ) { serverAddr.ia.sin_family = AF_INET; serverAddr.ia.sin_addr.s_addr = htonl ( msg.m_cid ); serverAddr.ia.sin_port = htons ( msg.m_dataType ); } else { serverAddr = this->address (); } cacRef.transferChanToVirtCircuit ( msg.m_available, msg.m_cid, 0xffff, 0, minorProtocolVersion, serverAddr, currentTime ); }