Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy 

FIX::ThreadedSocketInitiator Class Reference
[User]

Threaded Socket implementation of Initiator. More...

#include <ThreadedSocketInitiator.h>

Inheritance diagram for FIX::ThreadedSocketInitiator:
Inheritance graph
[legend]
Collaboration diagram for FIX::ThreadedSocketInitiator:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 ThreadedSocketInitiator (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError )
 ThreadedSocketInitiator (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError )
virtual ~ThreadedSocketInitiator ()

Private Types

typedef std::map< int, thread_idSocketToThread
typedef std::map< SessionID, int > SessionToHostNum
typedef std::pair
< ThreadedSocketInitiator
*, ThreadedSocketConnection * > 
ThreadPair

Private Member Functions

void onConfigure (const SessionSettings &) throw ( ConfigError )
 Implemented to configure acceptor.
void onInitialize (const SessionSettings &) throw ( RuntimeError )
 Implemented to initialize initiator.
void onStart ()
 Implemented to start connecting to targets.
bool onPoll (double timeout)
 Implemented to connect and poll for events.
void onStop ()
 Implemented to stop a running initiator.
void doConnect (const SessionID &s, const Dictionary &d)
 Implemented to connect a session to its target.
void addThread (int s, thread_id t)
void removeThread (int s)
void lock ()
void getHost (const SessionID &, const Dictionary &, std::string &, short &)

Static Private Member Functions

static THREAD_PROC socketThread (void *p)

Private Attributes

SessionSettings m_settings
SessionToHostNum m_sessionToHostNum
time_t m_lastConnect
int m_reconnectInterval
bool m_noDelay
int m_sendBufSize
int m_rcvBufSize
bool m_stop
SocketToThread m_threads
Mutex m_mutex

Detailed Description

Threaded Socket implementation of Initiator.

Definition at line 39 of file ThreadedSocketInitiator.h.


Member Typedef Documentation

typedef std::map< SessionID, int > FIX::ThreadedSocketInitiator::SessionToHostNum [private]

Definition at line 52 of file ThreadedSocketInitiator.h.

typedef std::map< int, thread_id > FIX::ThreadedSocketInitiator::SocketToThread [private]

Definition at line 51 of file ThreadedSocketInitiator.h.

Definition at line 53 of file ThreadedSocketInitiator.h.


Constructor & Destructor Documentation

FIX::ThreadedSocketInitiator::ThreadedSocketInitiator ( Application application,
MessageStoreFactory factory,
const SessionSettings settings 
) throw ( ConfigError )

Definition at line 33 of file ThreadedSocketInitiator.cpp.

References FIX::socket_init().

00037 : Initiator( application, factory, settings ),
00038   m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ), 
00039   m_sendBufSize( 0 ), m_rcvBufSize( 0 ) 
00040 { 
00041   socket_init(); 
00042 }

FIX::ThreadedSocketInitiator::ThreadedSocketInitiator ( Application application,
MessageStoreFactory factory,
const SessionSettings settings,
LogFactory logFactory 
) throw ( ConfigError )

Definition at line 44 of file ThreadedSocketInitiator.cpp.

References FIX::socket_init().

00049 : Initiator( application, factory, settings, logFactory ),
00050   m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ), 
00051   m_sendBufSize( 0 ), m_rcvBufSize( 0 ) 
00052 { 
00053   socket_init(); 
00054 }

FIX::ThreadedSocketInitiator::~ThreadedSocketInitiator (  )  [virtual]

Definition at line 56 of file ThreadedSocketInitiator.cpp.

References FIX::socket_term().

00057 { 
00058   socket_term(); 
00059 }


Member Function Documentation

void FIX::ThreadedSocketInitiator::addThread ( int  s,
thread_id  t 
) [private]

Definition at line 196 of file ThreadedSocketInitiator.cpp.

References m_mutex, m_threads, QF_STACK_POP, and QF_STACK_PUSH.

Referenced by doConnect().

00197 { QF_STACK_PUSH(ThreadedSocketInitiator::addThread)
00198 
00199   Locker l(m_mutex);
00200 
00201   m_threads[ s ] = t;
00202   QF_STACK_POP
00203 }

void FIX::ThreadedSocketInitiator::doConnect ( const SessionID ,
const Dictionary  
) [private, virtual]

Implemented to connect a session to its target.

Implements FIX::Initiator.

Definition at line 145 of file ThreadedSocketInitiator.cpp.

References addThread(), FIX::IntConvertor::convert(), FIX::ThreadedSocketConnection::disconnect(), FIX::Initiator::getApplication(), getHost(), FIX::Initiator::getLog(), FIX::Session::getLog(), FIX::Session::isSessionTime(), FIX::Session::lookupSession(), m_mutex, m_noDelay, m_rcvBufSize, m_sendBufSize, FIX::Log::onEvent(), QF_STACK_POP, QF_STACK_PUSH, FIX::Initiator::setDisconnected(), FIX::Initiator::setPending(), FIX::socket_createConnector(), FIX::socket_setsockopt(), socketThread(), and FIX::thread_spawn().

00146 { QF_STACK_PUSH(ThreadedSocketInitiator::doConnect)
00147 
00148   try
00149   {
00150     Session* session = Session::lookupSession( s );
00151     if( !session->isSessionTime(UtcTimeStamp()) ) return;
00152 
00153     Log* log = session->getLog();
00154 
00155     std::string address;
00156     short port = 0;
00157     getHost( s, d, address, port );
00158 
00159     int socket = socket_createConnector();
00160     if( m_noDelay )
00161       socket_setsockopt( socket, TCP_NODELAY );
00162     if( m_sendBufSize )
00163       socket_setsockopt( socket, SO_SNDBUF, m_sendBufSize );
00164     if( m_rcvBufSize )
00165       socket_setsockopt( socket, SO_RCVBUF, m_rcvBufSize );
00166 
00167     setPending( s );
00168     log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) );
00169 
00170     ThreadedSocketConnection* pConnection =
00171       new ThreadedSocketConnection( s, socket, address, port, getApplication(), getLog() );
00172 
00173     ThreadPair* pair = new ThreadPair( this, pConnection );
00174 
00175     {
00176       Locker l( m_mutex );
00177       thread_id thread;
00178       if ( thread_spawn( &socketThread, pair, thread ) )
00179       {
00180         addThread( socket, thread );
00181       }
00182       else
00183       {
00184         delete pair;
00185         pConnection->disconnect();
00186         delete pConnection;
00187         setDisconnected( s );
00188       }
00189     }
00190   }
00191   catch ( std::exception& ) {}
00192 
00193   QF_STACK_POP
00194 }

void FIX::ThreadedSocketInitiator::getHost ( const SessionID s,
const Dictionary d,
std::string &  address,
short &  port 
) [private]

Definition at line 263 of file ThreadedSocketInitiator.cpp.

References FIX::Dictionary::getLong(), FIX::Dictionary::getString(), FIX::Dictionary::has(), m_sessionToHostNum, QF_STACK_POP, QF_STACK_PUSH, FIX::SOCKET_CONNECT_HOST, and FIX::SOCKET_CONNECT_PORT.

Referenced by doConnect().

00265 { QF_STACK_PUSH(ThreadedSocketInitiator::getHost)
00266 
00267   int num = 0;
00268   SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
00269   if ( i != m_sessionToHostNum.end() ) num = i->second;
00270 
00271   std::stringstream hostStream;
00272   hostStream << SOCKET_CONNECT_HOST << num;
00273   std::string hostString = hostStream.str();
00274 
00275   std::stringstream portStream;
00276   std::string portString = portStream.str();
00277   portStream << SOCKET_CONNECT_PORT << num;
00278 
00279   if( d.has(hostString) && d.has(portString) )
00280   {
00281     address = d.getString( hostString );
00282     port = ( short ) d.getLong( portString );
00283   }
00284   else
00285   {
00286     num = 0;
00287     address = d.getString( SOCKET_CONNECT_HOST );
00288     port = ( short ) d.getLong( SOCKET_CONNECT_PORT );
00289   }
00290 
00291   m_sessionToHostNum[ s ] = ++num;
00292 
00293   QF_STACK_POP
00294 }

void FIX::ThreadedSocketInitiator::lock (  )  [inline, private]

Definition at line 66 of file ThreadedSocketInitiator.h.

References m_mutex.

00066 { Locker l(m_mutex); }

void FIX::ThreadedSocketInitiator::onConfigure ( const SessionSettings  )  throw ( ConfigError ) [private, virtual]

Implemented to configure acceptor.

Reimplemented from FIX::Initiator.

Definition at line 61 of file ThreadedSocketInitiator.cpp.

References QF_STACK_POP, QF_STACK_PUSH, FIX::SOCKET_NODELAY, FIX::SOCKET_RECEIVE_BUFFER_SIZE, and FIX::SOCKET_SEND_BUFFER_SIZE.

00063 { QF_STACK_PUSH(ThreadedSocketInitiator::onConfigure)
00064 
00065   try { m_reconnectInterval = s.get().getLong( "ReconnectInterval" ); }
00066   catch ( std::exception& ) {}
00067   if( s.get().has( SOCKET_NODELAY ) )
00068     m_noDelay = s.get().getBool( SOCKET_NODELAY );
00069   if( s.get().has( SOCKET_SEND_BUFFER_SIZE ) )
00070     m_sendBufSize = s.get().getLong( SOCKET_SEND_BUFFER_SIZE );
00071   if( s.get().has( SOCKET_RECEIVE_BUFFER_SIZE ) )
00072     m_rcvBufSize = s.get().getLong( SOCKET_RECEIVE_BUFFER_SIZE );
00073 
00074   QF_STACK_POP
00075 }

void FIX::ThreadedSocketInitiator::onInitialize ( const SessionSettings  )  throw ( RuntimeError ) [private, virtual]

Implemented to initialize initiator.

Reimplemented from FIX::Initiator.

Definition at line 77 of file ThreadedSocketInitiator.cpp.

References QF_STACK_POP, and QF_STACK_PUSH.

bool FIX::ThreadedSocketInitiator::onPoll ( double  timeout  )  [private, virtual]

Implemented to connect and poll for events.

Implements FIX::Initiator.

Definition at line 104 of file ThreadedSocketInitiator.cpp.

References QF_STACK_POP, and QF_STACK_PUSH.

00105 { QF_STACK_PUSH(ThreadedSocketInitiator::onPoll)
00106 
00107   return false;
00108 
00109   QF_STACK_POP
00110 }

void FIX::ThreadedSocketInitiator::onStart (  )  [private, virtual]

Implemented to start connecting to targets.

Implements FIX::Initiator.

Definition at line 83 of file ThreadedSocketInitiator.cpp.

References FIX::Initiator::connect(), FIX::Initiator::isStopped(), m_lastConnect, m_mutex, m_reconnectInterval, FIX::process_sleep(), QF_STACK_POP, and QF_STACK_PUSH.

00084 { QF_STACK_PUSH(ThreadedSocketInitiator::onStart)
00085 
00086   while ( !isStopped() )
00087   {
00088     time_t now;
00089     ::time( &now );
00090 
00091     if ( (now - m_lastConnect) >= m_reconnectInterval )
00092     {
00093       Locker l( m_mutex );
00094       connect();
00095       m_lastConnect = now;
00096     }
00097 
00098     process_sleep( 1 );
00099   }
00100 
00101   QF_STACK_POP
00102 }

void FIX::ThreadedSocketInitiator::onStop (  )  [private, virtual]

Implemented to stop a running initiator.

Implements FIX::Initiator.

Definition at line 112 of file ThreadedSocketInitiator.cpp.

References FIX::Initiator::isLoggedOn(), m_mutex, m_threads, QF_STACK_POP, QF_STACK_PUSH, FIX::socket_close(), FIX::Initiator::start(), and FIX::thread_join().

00113 { QF_STACK_PUSH(ThreadedSocketInitiator::onStop)
00114 
00115   SocketToThread threads;
00116   SocketToThread::iterator i;
00117   
00118   {
00119     Locker l(m_mutex);
00120 
00121     time_t start = 0;
00122     time_t now = 0;
00123 
00124     ::time( &start );
00125     while ( isLoggedOn() )
00126     {
00127       if( ::time(&now) -5 >= start )
00128         break;
00129     }
00130 
00131     threads = m_threads;
00132     m_threads.clear();
00133   }   
00134 
00135   for ( i = threads.begin(); i != threads.end(); ++i )
00136     socket_close( i->first );
00137   
00138   for ( i = threads.begin(); i != threads.end(); ++i )
00139     thread_join( i->second );
00140   threads.clear();
00141 
00142   QF_STACK_POP
00143 }

void FIX::ThreadedSocketInitiator::removeThread ( int  s  )  [private]

Definition at line 205 of file ThreadedSocketInitiator.cpp.

References m_mutex, m_threads, QF_STACK_POP, QF_STACK_PUSH, and FIX::thread_detach().

00206 { QF_STACK_PUSH(ThreadedSocketInitiator::removeThread)
00207 
00208   Locker l(m_mutex);
00209   SocketToThread::iterator i = m_threads.find( s );
00210 
00211   if ( i != m_threads.end() )
00212   {
00213     thread_detach( i->second );
00214     m_threads.erase( i );
00215   }
00216 
00217   QF_STACK_POP
00218 }

THREAD_PROC FIX::ThreadedSocketInitiator::socketThread ( void *  p  )  [static, private]

Definition at line 220 of file ThreadedSocketInitiator.cpp.

References FIX::ThreadedSocketConnection::connect(), FIX::ThreadedSocketConnection::disconnect(), FIX::ThreadedSocketConnection::getSession(), FIX::Session::getSessionID(), FIX::ThreadedSocketConnection::getSocket(), FIX::Session::lookupSession(), FIX::Session::next(), QF_STACK_CATCH, QF_STACK_POP, QF_STACK_PUSH, QF_STACK_TRY, and FIX::ThreadedSocketConnection::read().

Referenced by doConnect().

00221 { QF_STACK_TRY
00222   QF_STACK_PUSH(ThreadedSocketInitiator::socketThread)
00223 
00224   ThreadPair * pair = reinterpret_cast < ThreadPair* > ( p );
00225 
00226   ThreadedSocketInitiator* pInitiator = pair->first;
00227   ThreadedSocketConnection* pConnection = pair->second;
00228   FIX::SessionID sessionID = pConnection->getSession()->getSessionID();
00229   FIX::Session* pSession = FIX::Session::lookupSession( sessionID );
00230   int socket = pConnection->getSocket();
00231   delete pair;
00232 
00233   pInitiator->lock();
00234 
00235   if( !pConnection->connect() )
00236   {
00237     pInitiator->getLog()->onEvent( "Connection failed" );
00238     pConnection->disconnect();
00239     delete pConnection;
00240     pInitiator->removeThread( socket );
00241     pInitiator->setDisconnected( sessionID );
00242     return 0;
00243   }
00244 
00245   pInitiator->setConnected( sessionID );
00246   pInitiator->getLog()->onEvent( "Connection succeeded" );
00247 
00248   pSession->next();
00249 
00250   while ( pConnection->read() ) {}
00251 
00252   delete pConnection;
00253   if( !pInitiator->isStopped() )
00254     pInitiator->removeThread( socket );
00255   
00256   pInitiator->setDisconnected( sessionID );
00257   return 0;
00258 
00259   QF_STACK_POP
00260   QF_STACK_CATCH
00261 }


Member Data Documentation

Definition at line 73 of file ThreadedSocketInitiator.h.

Referenced by onStart().

Reimplemented from FIX::Initiator.

Definition at line 80 of file ThreadedSocketInitiator.h.

Referenced by addThread(), doConnect(), lock(), onStart(), onStop(), and removeThread().

Definition at line 75 of file ThreadedSocketInitiator.h.

Referenced by doConnect().

Definition at line 77 of file ThreadedSocketInitiator.h.

Referenced by doConnect().

Definition at line 74 of file ThreadedSocketInitiator.h.

Referenced by onStart().

Definition at line 76 of file ThreadedSocketInitiator.h.

Referenced by doConnect().

Definition at line 72 of file ThreadedSocketInitiator.h.

Referenced by getHost().

Reimplemented from FIX::Initiator.

Definition at line 71 of file ThreadedSocketInitiator.h.

Reimplemented from FIX::Initiator.

Definition at line 78 of file ThreadedSocketInitiator.h.

Definition at line 79 of file ThreadedSocketInitiator.h.

Referenced by addThread(), onStop(), and removeThread().


The documentation for this class was generated from the following files:

Generated on Mon Apr 5 21:00:13 2010 for QuickFIX by doxygen 1.6.1 written by Dimitri van Heesch, © 1997-2001