SocketConnection.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "SocketConnection.h"
00028 #include "SocketAcceptor.h"
00029 #include "SocketConnector.h"
00030 #include "SocketInitiator.h"
00031 #include "Session.h"
00032 #include "Utility.h"
00033
00034 namespace FIX
00035 {
00036 SocketConnection::SocketConnection( int s, Sessions sessions,
00037 SocketMonitor* pMonitor )
00038 : m_socket( s ), m_sendLength( 0 ),
00039 m_sessions(sessions), m_pSession( 0 ), m_pMonitor( pMonitor )
00040 {
00041 FD_ZERO( &m_fds );
00042 FD_SET( m_socket, &m_fds );
00043 }
00044
00045 SocketConnection::SocketConnection( SocketInitiator& i,
00046 const SessionID& sessionID, int s,
00047 SocketMonitor* pMonitor )
00048 : m_socket( s ), m_sendLength( 0 ),
00049 m_pSession( i.getSession( sessionID, *this ) ),
00050 m_pMonitor( pMonitor )
00051 {
00052 FD_ZERO( &m_fds );
00053 FD_SET( m_socket, &m_fds );
00054 m_sessions.insert( sessionID );
00055 }
00056
00057 SocketConnection::~SocketConnection()
00058 {
00059 if ( m_pSession )
00060 Session::unregisterSession( m_pSession->getSessionID() );
00061 }
00062
00063 bool SocketConnection::send( const std::string& msg )
00064 { QF_STACK_PUSH(SocketConnection::send)
00065
00066 Locker l( m_mutex );
00067
00068 m_sendQueue.push_back( msg );
00069 processQueue();
00070 signal();
00071 return true;
00072
00073 QF_STACK_POP
00074 }
00075
00076 bool SocketConnection::processQueue()
00077 { QF_STACK_PUSH(SocketConnection::processQueue)
00078
00079 Locker l( m_mutex );
00080
00081 if( !m_sendQueue.size() ) return true;
00082
00083 struct timeval timeout = { 0, 0 };
00084 fd_set writeset = m_fds;
00085 if( select( 1 + m_socket, 0, &writeset, 0, &timeout ) <= 0 )
00086 return false;
00087
00088 const std::string& msg = m_sendQueue.front();
00089
00090 int result = socket_send
00091 ( m_socket, msg.c_str() + m_sendLength, msg.length() - m_sendLength );
00092
00093 if( result > 0 )
00094 m_sendLength += result;
00095
00096 if( m_sendLength == msg.length() )
00097 {
00098 m_sendLength = 0;
00099 m_sendQueue.pop_front();
00100 }
00101
00102 return !m_sendQueue.size();
00103
00104 QF_STACK_POP
00105 }
00106
00107 void SocketConnection::disconnect()
00108 { QF_STACK_PUSH(SocketConnection::disconnect)
00109
00110 if ( m_pMonitor )
00111 m_pMonitor->drop( m_socket );
00112
00113 QF_STACK_POP
00114 }
00115
00116 bool SocketConnection::read( SocketConnector& s )
00117 { QF_STACK_PUSH(SocketConnection::read)
00118
00119 if ( !m_pSession ) return false;
00120
00121 try
00122 {
00123 readFromSocket();
00124 readMessages( s.getMonitor() );
00125 }
00126 catch( SocketRecvFailed& e )
00127 {
00128 m_pSession->getLog()->onEvent( e.what() );
00129 return false;
00130 }
00131 return true;
00132
00133 QF_STACK_POP
00134 }
00135
00136 bool SocketConnection::read( SocketAcceptor& a, SocketServer& s )
00137 { QF_STACK_PUSH(SocketConnection::read)
00138
00139 std::string msg;
00140 try
00141 {
00142 readFromSocket();
00143
00144 if ( !m_pSession )
00145 {
00146 if ( !readMessage( msg ) ) return false;
00147 m_pSession = Session::lookupSession( msg, true );
00148 if( !isValidSession() )
00149 {
00150 m_pSession = 0;
00151 if( a.getLog() )
00152 {
00153 a.getLog()->onEvent( "Session not found for incoming message: " + msg );
00154 a.getLog()->onIncoming( msg );
00155 }
00156 }
00157 if( m_pSession )
00158 m_pSession = a.getSession( msg, *this );
00159 if( m_pSession )
00160 m_pSession->next( msg, UtcTimeStamp() );
00161 if( !m_pSession )
00162 {
00163 s.getMonitor().drop( m_socket );
00164 return false;
00165 }
00166
00167 Session::registerSession( m_pSession->getSessionID() );
00168 }
00169
00170 readMessages( s.getMonitor() );
00171 return true;
00172 }
00173 catch ( SocketRecvFailed& e )
00174 {
00175 if( m_pSession )
00176 m_pSession->getLog()->onEvent( e.what() );
00177 s.getMonitor().drop( m_socket );
00178 }
00179 catch ( InvalidMessage& )
00180 {
00181 s.getMonitor().drop( m_socket );
00182 }
00183 return false;
00184
00185 QF_STACK_POP
00186 }
00187
00188 bool SocketConnection::isValidSession()
00189 { QF_STACK_PUSH(SocketConnection::isValidSession)
00190
00191 if( m_pSession == 0 )
00192 return false;
00193 SessionID sessionID = m_pSession->getSessionID();
00194 if( Session::isSessionRegistered(sessionID) )
00195 return false;
00196 return !( m_sessions.find(sessionID) == m_sessions.end() );
00197
00198 QF_STACK_POP
00199 }
00200
00201 void SocketConnection::readFromSocket()
00202 throw( SocketRecvFailed )
00203 { QF_STACK_PUSH(SocketConnection::readFromSocket)
00204
00205 int size = recv( m_socket, m_buffer, sizeof(m_buffer), 0 );
00206 if( size <= 0 ) throw SocketRecvFailed( size );
00207 m_parser.addToStream( m_buffer, size );
00208
00209 QF_STACK_POP
00210 }
00211
00212 bool SocketConnection::readMessage( std::string& msg )
00213 { QF_STACK_PUSH(SocketConnection::readMessage)
00214
00215 try
00216 {
00217 return m_parser.readFixMessage( msg );
00218 }
00219 catch ( MessageParseError& ) {}
00220 return true;
00221
00222 QF_STACK_POP
00223 }
00224
00225 void SocketConnection::readMessages( SocketMonitor& s )
00226 {
00227 if( !m_pSession ) return;
00228
00229 std::string msg;
00230 while( readMessage( msg ) )
00231 {
00232 try
00233 {
00234 m_pSession->next( msg, UtcTimeStamp() );
00235 }
00236 catch ( InvalidMessage& )
00237 {
00238 if( !m_pSession->isLoggedOn() )
00239 s.drop( m_socket );
00240 }
00241 }
00242 }
00243
00244 void SocketConnection::onTimeout()
00245 { QF_STACK_PUSH(SocketConnection::onTimeout)
00246 if ( m_pSession ) m_pSession->next();
00247 QF_STACK_POP
00248 }
00249 }