Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy 

SocketMonitor.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 ** Copyright (c) quickfixengine.org  All rights reserved.
00003 **
00004 ** This file is part of the QuickFIX FIX Engine
00005 **
00006 ** This file may be distributed under the terms of the quickfixengine.org
00007 ** license as defined by quickfixengine.org and appearing in the file
00008 ** LICENSE included in the packaging of this file.
00009 **
00010 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
00011 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00012 **
00013 ** See http://www.quickfixengine.org/LICENSE for licensing information.
00014 **
00015 ** Contact ask@quickfixengine.org if any conditions of this licensing are
00016 ** not clear to you.
00017 **
00018 ****************************************************************************/
00019 
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026 
00027 #include "SocketMonitor.h"
00028 #include "Utility.h"
00029 #include <exception>
00030 #include <set>
00031 #include <algorithm>
00032 #include <iostream>
00033 
00034 namespace FIX
00035 {
00036 SocketMonitor::SocketMonitor( int timeout )
00037 : m_timeout( timeout )
00038 {
00039   socket_init();
00040 
00041   std::pair<int, int> sockets = socket_createpair();
00042   m_signal = sockets.first;
00043   m_interrupt = sockets.second;
00044   socket_setnonblock( m_signal );
00045   socket_setnonblock( m_interrupt );
00046   m_readSockets.insert( m_interrupt );
00047 
00048   m_timeval.tv_sec = 0;
00049   m_timeval.tv_usec = 0;
00050 #ifndef SELECT_DECREMENTS_TIME
00051   m_ticks = clock();
00052 #endif
00053 }
00054 
00055 SocketMonitor::~SocketMonitor()
00056 {
00057   Sockets::iterator i;
00058   for ( i = m_readSockets.begin(); i != m_readSockets.end(); ++i ) {
00059     socket_close( *i );
00060   }
00061 
00062   socket_close( m_signal );
00063   socket_term();
00064 }
00065 
00066 bool SocketMonitor::addConnect( int s )
00067 { QF_STACK_PUSH(SocketMonitor::addConnect)
00068 
00069   socket_setnonblock( s );
00070   Sockets::iterator i = m_connectSockets.find( s );
00071   if( i != m_connectSockets.end() ) return false;
00072 
00073   m_connectSockets.insert( s );
00074   return true;
00075 
00076   QF_STACK_POP
00077 }
00078 
00079 bool SocketMonitor::addRead( int s )
00080 { QF_STACK_PUSH(SocketMonitor::addRead)
00081 
00082   socket_setnonblock( s );
00083   Sockets::iterator i = m_readSockets.find( s );
00084   if( i != m_readSockets.end() ) return false;
00085 
00086   m_readSockets.insert( s );
00087   return true;
00088 
00089   QF_STACK_POP
00090 }
00091 
00092 bool SocketMonitor::addWrite( int s )
00093 { QF_STACK_PUSH(SocketMonitor::addWrite)
00094 
00095   if( m_readSockets.find(s) == m_readSockets.end() )
00096     return false;
00097 
00098   socket_setnonblock( s );
00099   Sockets::iterator i = m_writeSockets.find( s );
00100   if( i != m_writeSockets.end() ) return false;
00101 
00102   m_writeSockets.insert( s );
00103   return true;
00104 
00105   QF_STACK_POP
00106 }
00107 
00108 bool SocketMonitor::drop( int s )
00109 { QF_STACK_PUSH(SocketMonitor::drop)
00110 
00111   Sockets::iterator i = m_readSockets.find( s );
00112   Sockets::iterator j = m_writeSockets.find( s );
00113   Sockets::iterator k = m_connectSockets.find( s );
00114 
00115   if ( i != m_readSockets.end() || 
00116        j != m_writeSockets.end() ||
00117        k != m_connectSockets.end() )
00118   {
00119     socket_close( s );
00120     m_readSockets.erase( s );
00121     m_writeSockets.erase( s );
00122     m_connectSockets.erase( s );
00123     m_dropped.push( s );
00124     return true;
00125   }
00126   return false;
00127 
00128   QF_STACK_POP
00129 }
00130 
00131 inline timeval* SocketMonitor::getTimeval( bool poll, double timeout )
00132 { QF_STACK_PUSH(SocketMonitor::getTimeval)
00133 
00134   if ( poll )
00135   {
00136     m_timeval.tv_sec = 0;
00137     m_timeval.tv_usec = 0;
00138     return &m_timeval;
00139   }
00140 
00141   timeout = m_timeout;
00142 
00143   if ( !timeout )
00144     return 0;
00145 #ifdef SELECT_MODIFIES_TIMEVAL
00146   if ( !m_timeval.tv_sec && !m_timeval.tv_usec && timeout )
00147     m_timeval.tv_sec = timeout;
00148   return &m_timeval;
00149 #else
00150 double elapsed = ( double ) ( clock() - m_ticks ) / ( double ) CLOCKS_PER_SEC;
00151   if ( elapsed >= timeout || elapsed == 0.0 )
00152   {
00153     m_ticks = clock();
00154     m_timeval.tv_sec = 0;
00155     m_timeval.tv_usec = (long)(timeout * 1000000);
00156   }
00157   else
00158   {
00159     m_timeval.tv_sec = 0;
00160     m_timeval.tv_usec = ( long ) ( ( timeout - elapsed ) * 1000000 );
00161   }
00162   return &m_timeval;
00163 #endif
00164 
00165   QF_STACK_POP
00166 }
00167 
00168 bool SocketMonitor::sleepIfEmpty( bool poll )
00169 { QF_STACK_PUSH(SocketMonitor::sleepIfEmpty)
00170 
00171   if( poll )
00172     return false;
00173 
00174   if ( m_readSockets.empty() && 
00175        m_writeSockets.empty() &&
00176        m_connectSockets.empty() )
00177   {
00178     process_sleep( m_timeout );
00179     return true;
00180   }
00181   else
00182     return false;
00183 
00184   QF_STACK_POP
00185 }
00186 
00187 void SocketMonitor::signal( int socket )
00188 { QF_STACK_PUSH(SocketMonitor::signal)
00189   socket_send( m_signal, (char*)&socket, sizeof(socket) );
00190   QF_STACK_POP
00191 }
00192 
00193 void SocketMonitor::unsignal( int s )
00194 { QF_STACK_PUSH(SocketMonitor::unsignal)
00195 
00196   Sockets::iterator i = m_writeSockets.find( s );
00197   if( i == m_writeSockets.end() ) return;
00198 
00199   m_writeSockets.erase( s );
00200 
00201   QF_STACK_POP
00202 }
00203 
00204 void SocketMonitor::block( Strategy& strategy, bool poll, double timeout )
00205 { QF_STACK_PUSH(SocketMonitor::block)
00206 
00207   while ( m_dropped.size() )
00208   {
00209     strategy.onError( *this, m_dropped.front() );
00210     m_dropped.pop();
00211     if ( m_dropped.size() == 0 )
00212       return ;
00213   }
00214 
00215   fd_set readSet;
00216   FD_ZERO( &readSet );
00217   buildSet( m_readSockets, readSet );
00218   fd_set writeSet;
00219   FD_ZERO( &writeSet );
00220   buildSet( m_connectSockets, writeSet );
00221   buildSet( m_writeSockets, writeSet );
00222   fd_set exceptSet;
00223   FD_ZERO( &exceptSet );
00224   buildSet( m_connectSockets, exceptSet );
00225 
00226   if ( sleepIfEmpty(poll) )
00227   {
00228     strategy.onTimeout( *this );
00229     return;
00230   }
00231 
00232   int result = select( FD_SETSIZE, &readSet, &writeSet, &exceptSet, getTimeval(poll, timeout) );
00233 
00234   if ( result == 0 )
00235   {
00236     strategy.onTimeout( *this );
00237     return;
00238   }
00239   else if ( result > 0 )
00240   {
00241     processExceptSet( strategy, exceptSet );
00242     processWriteSet( strategy, writeSet );
00243     processReadSet( strategy, readSet );
00244   }
00245   else
00246   {
00247     strategy.onError( *this );
00248   }
00249 
00250   QF_STACK_POP
00251 }
00252 
00253 void SocketMonitor::processReadSet( Strategy& strategy, fd_set& readSet )
00254 { QF_STACK_PUSH(SocketMonitor::processReadSet)
00255 
00256 #ifdef _MSC_VER
00257   for ( unsigned i = 0; i < readSet.fd_count; ++i )
00258   {
00259     int s = readSet.fd_array[ i ];
00260     if( s == m_interrupt )
00261     {
00262       int socket = 0;
00263       recv( s, (char*)&socket, sizeof(socket), 0 );
00264       addWrite( socket );
00265     }
00266     else
00267     {
00268       strategy.onEvent( *this, s );
00269     }
00270   }
00271 #else
00272     Sockets::iterator i;
00273     Sockets sockets = m_readSockets;
00274     for ( i = sockets.begin(); i != sockets.end(); ++i )
00275     {
00276       int s = *i;
00277       if ( !FD_ISSET( *i, &readSet ) )
00278         continue;
00279       if( s == m_interrupt )
00280       {
00281         int socket = 0;
00282         recv( s, (char*)&socket, sizeof(socket), 0 );
00283         addWrite( socket );
00284       }
00285       else
00286       {
00287         strategy.onEvent( *this, s );
00288       }
00289     }
00290 #endif
00291 
00292   QF_STACK_POP
00293 }
00294 
00295 void SocketMonitor::processWriteSet( Strategy& strategy, fd_set& writeSet )
00296 { QF_STACK_PUSH(SocketMonitor::processWriteSet)
00297 
00298 #ifdef _MSC_VER
00299   for ( unsigned i = 0; i < writeSet.fd_count; ++i )
00300   {
00301     int s = writeSet.fd_array[ i ];
00302     if( m_connectSockets.find(s) != m_connectSockets.end() )
00303     {
00304       m_connectSockets.erase( s );
00305       m_readSockets.insert( s );
00306       strategy.onConnect( *this, s );
00307     }
00308     else
00309     {
00310       strategy.onWrite( *this, s );
00311     }
00312   }
00313 #else
00314   Sockets::iterator i;
00315   Sockets sockets = m_connectSockets;
00316   for( i = sockets.begin(); i != sockets.end(); ++i )
00317   {
00318     int s = *i;
00319     if ( !FD_ISSET( *i, &writeSet ) )
00320       continue;
00321     m_connectSockets.erase( s );
00322     m_readSockets.insert( s );
00323     strategy.onConnect( *this, s );
00324   }
00325 
00326   sockets = m_writeSockets;
00327   for( i = sockets.begin(); i != sockets.end(); ++i )
00328   {
00329     int s = *i;
00330     if ( !FD_ISSET( *i, &writeSet ) )
00331       continue;
00332     strategy.onWrite( *this, s );
00333   }
00334 #endif
00335 
00336   QF_STACK_POP
00337 }
00338 
00339 void SocketMonitor::processExceptSet( Strategy& strategy, fd_set& exceptSet )
00340 { QF_STACK_PUSH(SocketMonitor::processExceptSet)
00341 
00342 #ifdef _MSC_VER
00343   for ( unsigned i = 0; i < exceptSet.fd_count; ++i )
00344   {
00345     int s = exceptSet.fd_array[ i ];
00346     strategy.onError( *this, s );
00347   }
00348 #else
00349     Sockets::iterator i;
00350     Sockets sockets = m_connectSockets;
00351     for ( i = sockets.begin(); i != sockets.end(); ++i )
00352     {
00353       int s = *i;
00354       if ( !FD_ISSET( *i, &exceptSet ) )
00355         continue;
00356       strategy.onError( *this, s );
00357     }
00358 #endif
00359 
00360   QF_STACK_POP
00361 }
00362 
00363 void SocketMonitor::buildSet( const Sockets& sockets, fd_set& watchSet )
00364 { QF_STACK_PUSH(SocketMonitor::buildSet)
00365 
00366   Sockets::const_iterator iter;
00367   for ( iter = sockets.begin(); iter != sockets.end(); ++iter ) {
00368     FD_SET( *iter, &watchSet );
00369   }
00370   QF_STACK_POP
00371 }
00372 }

Generated on Mon Apr 5 20:59:51 2010 for QuickFIX by doxygen 1.6.1 written by Dimitri van Heesch, © 1997-2001