![]() |
Encapsulates a socket file descriptor (multi-threaded). More...
#include <ThreadedSocketConnection.h>
Public Types | |
typedef std::set< SessionID > | Sessions |
Public Member Functions | |
ThreadedSocketConnection (int s, Sessions sessions, Application &application, Log *pLog) | |
ThreadedSocketConnection (const SessionID &, int s, const std::string &address, short port, Application &, Log *pLog) | |
virtual | ~ThreadedSocketConnection () |
Session * | getSession () const |
int | getSocket () const |
bool | connect () |
void | disconnect () |
bool | read () |
Private Member Functions | |
bool | readMessage (std::string &msg) throw ( SocketRecvFailed ) |
void | processStream () |
bool | send (const std::string &) |
bool | setSession (const std::string &msg) |
Private Attributes | |
int | m_socket |
char | m_buffer [BUFSIZ] |
std::string | m_address |
int | m_port |
Application & | m_application |
Log * | m_pLog |
Parser | m_parser |
Sessions | m_sessions |
Session * | m_pSession |
bool | m_disconnect |
fd_set | m_fds |
Encapsulates a socket file descriptor (multi-threaded).
Definition at line 44 of file ThreadedSocketConnection.h.
typedef std::set<SessionID> FIX::ThreadedSocketConnection::Sessions |
Definition at line 47 of file ThreadedSocketConnection.h.
FIX::ThreadedSocketConnection::ThreadedSocketConnection | ( | int | s, | |
Sessions | sessions, | |||
Application & | application, | |||
Log * | pLog | |||
) |
Definition at line 36 of file ThreadedSocketConnection.cpp.
00037 : m_socket( s ), m_application( application ), m_pLog( pLog ), 00038 m_sessions( sessions ), m_pSession( 0 ), 00039 m_disconnect( false ) 00040 { 00041 FD_ZERO( &m_fds ); 00042 FD_SET( m_socket, &m_fds ); 00043 }
FIX::ThreadedSocketConnection::ThreadedSocketConnection | ( | const SessionID & | sessionID, | |
int | s, | |||
const std::string & | address, | |||
short | port, | |||
Application & | application, | |||
Log * | pLog | |||
) |
Definition at line 46 of file ThreadedSocketConnection.cpp.
00049 : m_socket( s ), m_address( address ), m_port( port ), 00050 m_application( application ), m_pLog( m_pLog ), 00051 m_pSession( Session::lookupSession( sessionID ) ), 00052 m_disconnect( false ) 00053 { 00054 FD_ZERO( &m_fds ); 00055 FD_SET( m_socket, &m_fds ); 00056 if ( m_pSession ) m_pSession->setResponder( this ); 00057 }
FIX::ThreadedSocketConnection::~ThreadedSocketConnection | ( | ) | [virtual] |
Definition at line 59 of file ThreadedSocketConnection.cpp.
References FIX::Session::getSessionID(), m_pSession, FIX::Session::setResponder(), and FIX::Session::unregisterSession().
00060 { 00061 if ( m_pSession ) 00062 { 00063 m_pSession->setResponder( 0 ); 00064 Session::unregisterSession( m_pSession->getSessionID() ); 00065 } 00066 }
bool FIX::ThreadedSocketConnection::connect | ( | ) |
Definition at line 74 of file ThreadedSocketConnection.cpp.
References getSocket(), m_address, m_port, QF_STACK_POP, QF_STACK_PUSH, and FIX::socket_connect().
Referenced by FIX::ThreadedSocketInitiator::socketThread().
00075 { QF_STACK_PUSH(ThreadedSocketConnection::connect) 00076 return socket_connect(getSocket(), m_address.c_str(), m_port) >= 0; 00077 QF_STACK_POP 00078 }
void FIX::ThreadedSocketConnection::disconnect | ( | ) | [virtual] |
Implements FIX::Responder.
Definition at line 80 of file ThreadedSocketConnection.cpp.
References m_disconnect, m_socket, QF_STACK_POP, QF_STACK_PUSH, and FIX::socket_close().
Referenced by FIX::ThreadedSocketInitiator::doConnect(), processStream(), read(), and FIX::ThreadedSocketInitiator::socketThread().
00081 { QF_STACK_PUSH(ThreadedSocketConnection::disconnect) 00082 00083 m_disconnect = true; 00084 socket_close( m_socket ); 00085 00086 QF_STACK_POP 00087 }
Session* FIX::ThreadedSocketConnection::getSession | ( | ) | const [inline] |
Definition at line 55 of file ThreadedSocketConnection.h.
References m_pSession.
Referenced by FIX::ThreadedSocketInitiator::socketThread().
00055 { return m_pSession; }
int FIX::ThreadedSocketConnection::getSocket | ( | ) | const [inline] |
Definition at line 56 of file ThreadedSocketConnection.h.
References m_socket.
Referenced by connect(), FIX::ThreadedSocketAcceptor::socketConnectionThread(), and FIX::ThreadedSocketInitiator::socketThread().
00056 { return m_socket; }
void FIX::ThreadedSocketConnection::processStream | ( | ) | [private] |
Definition at line 154 of file ThreadedSocketConnection.cpp.
References disconnect(), FIX::Session::isLoggedOn(), m_pSession, FIX::Session::next(), QF_STACK_POP, QF_STACK_PUSH, readMessage(), setSession(), and FIX::TYPE::UtcTimeStamp.
Referenced by read().
00155 { QF_STACK_PUSH(ThreadedSocketConnection::processStream) 00156 00157 std::string msg; 00158 while( readMessage(msg) ) 00159 { 00160 if ( !m_pSession ) 00161 { 00162 if ( !setSession( msg ) ) 00163 { disconnect(); continue; } 00164 } 00165 try 00166 { 00167 m_pSession->next( msg, UtcTimeStamp() ); 00168 } 00169 catch( InvalidMessage& ) 00170 { 00171 if( !m_pSession->isLoggedOn() ) 00172 { 00173 disconnect(); 00174 return; 00175 } 00176 } 00177 } 00178 00179 QF_STACK_POP 00180 }
bool FIX::ThreadedSocketConnection::read | ( | ) |
Definition at line 89 of file ThreadedSocketConnection.cpp.
References FIX::Parser::addToStream(), disconnect(), FIX::Session::disconnect(), FIX::Session::getLog(), m_buffer, m_disconnect, m_fds, m_parser, m_pSession, m_socket, FIX::Session::next(), FIX::Log::onEvent(), processStream(), QF_STACK_POP, and QF_STACK_PUSH.
Referenced by FIX::ThreadedSocketAcceptor::socketConnectionThread(), and FIX::ThreadedSocketInitiator::socketThread().
00090 { QF_STACK_PUSH(ThreadedSocketConnection::read) 00091 00092 struct timeval timeout = { 1, 0 }; 00093 fd_set readset = m_fds; 00094 00095 try 00096 { 00097 // Wait for input (1 second timeout) 00098 int result = select( 1 + m_socket, &readset, 0, 0, &timeout ); 00099 00100 if( result > 0 ) // Something to read 00101 { 00102 // We can read without blocking 00103 int size = recv( m_socket, m_buffer, sizeof(m_buffer), 0 ); 00104 if ( size <= 0 ) { throw SocketRecvFailed( size ); } 00105 m_parser.addToStream( m_buffer, size ); 00106 } 00107 else if( result == 0 && m_pSession ) // Timeout 00108 { 00109 m_pSession->next(); 00110 } 00111 else if( result < 0 ) // Error 00112 { 00113 throw SocketRecvFailed( result ); 00114 } 00115 00116 processStream(); 00117 return true; 00118 } 00119 catch ( SocketRecvFailed& e ) 00120 { 00121 if( m_disconnect ) 00122 return false; 00123 00124 if( m_pSession ) 00125 { 00126 m_pSession->getLog()->onEvent( e.what() ); 00127 m_pSession->disconnect(); 00128 } 00129 else 00130 { 00131 disconnect(); 00132 } 00133 00134 return false; 00135 } 00136 00137 QF_STACK_POP 00138 }
bool FIX::ThreadedSocketConnection::readMessage | ( | std::string & | msg | ) | throw ( SocketRecvFailed ) [private] |
Definition at line 140 of file ThreadedSocketConnection.cpp.
References QF_STACK_POP, and QF_STACK_PUSH.
Referenced by processStream().
00142 { QF_STACK_PUSH(ThreadedSocketConnection::readMessage) 00143 00144 try 00145 { 00146 return m_parser.readFixMessage( msg ); 00147 } 00148 catch ( MessageParseError& ) {} 00149 return true; 00150 00151 QF_STACK_POP 00152 }
bool FIX::ThreadedSocketConnection::send | ( | const std::string & | msg | ) | [private, virtual] |
Implements FIX::Responder.
Definition at line 68 of file ThreadedSocketConnection.cpp.
References m_socket, QF_STACK_POP, QF_STACK_PUSH, and FIX::socket_send().
00069 { QF_STACK_PUSH(ThreadedSocketConnection::send) 00070 return socket_send( m_socket, msg.c_str(), msg.length() ) >= 0; 00071 QF_STACK_POP 00072 }
bool FIX::ThreadedSocketConnection::setSession | ( | const std::string & | msg | ) | [private] |
Definition at line 182 of file ThreadedSocketConnection.cpp.
References FIX::Session::getSessionID(), FIX::Session::isSessionRegistered(), FIX::Session::lookupSession(), m_pLog, m_pSession, m_sessions, FIX::Log::onEvent(), FIX::Log::onIncoming(), FIX::process_sleep(), QF_STACK_POP, QF_STACK_PUSH, FIX::Session::registerSession(), and FIX::Session::setResponder().
Referenced by processStream().
00183 { QF_STACK_PUSH(ThreadedSocketConnection::setSession) 00184 00185 m_pSession = Session::lookupSession( msg, true ); 00186 if ( !m_pSession ) 00187 { 00188 if( m_pLog ) 00189 { 00190 m_pLog->onEvent( "Session not found for incoming message: " + msg ); 00191 m_pLog->onIncoming( msg ); 00192 } 00193 return false; 00194 } 00195 00196 SessionID sessionID = m_pSession->getSessionID(); 00197 m_pSession = 0; 00198 00199 // see if the session frees up within 5 seconds 00200 for( int i = 1; i <= 5; i++ ) 00201 { 00202 if( !Session::isSessionRegistered( sessionID ) ) 00203 m_pSession = Session::registerSession( sessionID ); 00204 if( m_pSession ) break; 00205 process_sleep( 1 ); 00206 } 00207 00208 if ( !m_pSession ) 00209 return false; 00210 if ( m_sessions.find(m_pSession->getSessionID()) == m_sessions.end() ) 00211 return false; 00212 00213 m_pSession->setResponder( this ); 00214 return true; 00215 00216 QF_STACK_POP 00217 }
std::string FIX::ThreadedSocketConnection::m_address [private] |
Definition at line 70 of file ThreadedSocketConnection.h.
Referenced by connect().
Definition at line 73 of file ThreadedSocketConnection.h.
char FIX::ThreadedSocketConnection::m_buffer[BUFSIZ] [private] |
Definition at line 68 of file ThreadedSocketConnection.h.
Referenced by read().
bool FIX::ThreadedSocketConnection::m_disconnect [private] |
Definition at line 78 of file ThreadedSocketConnection.h.
Referenced by disconnect(), and read().
fd_set FIX::ThreadedSocketConnection::m_fds [private] |
Definition at line 79 of file ThreadedSocketConnection.h.
Referenced by read().
Definition at line 75 of file ThreadedSocketConnection.h.
Referenced by read().
Log* FIX::ThreadedSocketConnection::m_pLog [private] |
Definition at line 74 of file ThreadedSocketConnection.h.
Referenced by setSession().
int FIX::ThreadedSocketConnection::m_port [private] |
Definition at line 71 of file ThreadedSocketConnection.h.
Referenced by connect().
Session* FIX::ThreadedSocketConnection::m_pSession [private] |
Definition at line 77 of file ThreadedSocketConnection.h.
Referenced by getSession(), processStream(), read(), setSession(), and ~ThreadedSocketConnection().
Definition at line 76 of file ThreadedSocketConnection.h.
Referenced by setSession().
int FIX::ThreadedSocketConnection::m_socket [private] |
Definition at line 67 of file ThreadedSocketConnection.h.
Referenced by disconnect(), getSocket(), read(), and send().