00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #ifdef HAVE_POSTGRESQL
00028
00029 #include "PostgreSQLStore.h"
00030 #include "SessionID.h"
00031 #include "SessionSettings.h"
00032 #include "FieldConvertors.h"
00033 #include "Parser.h"
00034 #include "Utility.h"
00035 #include "strptime.h"
00036 #include <fstream>
00037
00038 namespace FIX
00039 {
00040
00041 const std::string PostgreSQLStoreFactory::DEFAULT_DATABASE = "quickfix";
00042 const std::string PostgreSQLStoreFactory::DEFAULT_USER = "postgres";
00043 const std::string PostgreSQLStoreFactory::DEFAULT_PASSWORD = "";
00044 const std::string PostgreSQLStoreFactory::DEFAULT_HOST = "localhost";
00045 const short PostgreSQLStoreFactory::DEFAULT_PORT = 0;
00046
00047 PostgreSQLStore::PostgreSQLStore
00048 ( const SessionID& s, const DatabaseConnectionID& d, PostgreSQLConnectionPool* p )
00049 : m_pConnectionPool( p ), m_sessionID( s )
00050 {
00051 m_pConnection = m_pConnectionPool->create( d );
00052 populateCache();
00053 }
00054
00055 PostgreSQLStore::PostgreSQLStore
00056 ( const SessionID& s, const std::string& database, const std::string& user,
00057 const std::string& password, const std::string& host, short port )
00058 : m_pConnectionPool( 0 ), m_sessionID( s )
00059 {
00060 m_pConnection = new PostgreSQLConnection( database, user, password, host, port );
00061 populateCache();
00062 }
00063
00064 PostgreSQLStore::~PostgreSQLStore()
00065 {
00066 if( m_pConnectionPool )
00067 m_pConnectionPool->destroy( m_pConnection );
00068 else
00069 delete m_pConnection;
00070 }
00071
00072 void PostgreSQLStore::populateCache()
00073 { QF_STACK_PUSH(PostgreSQLStore::populateCache)
00074
00075 std::stringstream queryString;
00076
00077 queryString << "SELECT creation_time, incoming_seqnum, outgoing_seqnum FROM sessions WHERE "
00078 << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00079 << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00080 << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00081 << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "'";
00082
00083 PostgreSQLQuery query( queryString.str() );
00084 if( !m_pConnection->execute(query) )
00085 throw ConfigError( "No entries found for session in database" );
00086
00087 int rows = query.rows();
00088 if( rows > 1 )
00089 throw ConfigError( "Multiple entries found for session in database" );
00090
00091 if( rows == 1 )
00092 {
00093 struct tm time;
00094 std::string sqlTime = query.getValue( 0, 0 );
00095 strptime( sqlTime.c_str(), "%Y-%m-%d %H:%M:%S", &time );
00096 m_cache.setCreationTime (UtcTimeStamp (&time));
00097 m_cache.setNextTargetMsgSeqNum( atol( query.getValue( 0, 1 ) ) );
00098 m_cache.setNextSenderMsgSeqNum( atol( query.getValue( 0, 2 ) ) );
00099 }
00100 else
00101 {
00102 UtcTimeStamp time = m_cache.getCreationTime();
00103 char sqlTime[ 20 ];
00104 int year, month, day, hour, minute, second, millis;
00105 time.getYMD (year, month, day);
00106 time.getHMS (hour, minute, second, millis);
00107 STRING_SPRINTF( sqlTime, "%d-%02d-%02d %02d:%02d:%02d",
00108 year, month, day, hour, minute, second );
00109 std::stringstream queryString2;
00110 queryString2 << "INSERT INTO sessions (beginstring, sendercompid, targetcompid, session_qualifier,"
00111 << "creation_time, incoming_seqnum, outgoing_seqnum) VALUES("
00112 << "'" << m_sessionID.getBeginString().getValue() << "',"
00113 << "'" << m_sessionID.getSenderCompID().getValue() << "',"
00114 << "'" << m_sessionID.getTargetCompID().getValue() << "',"
00115 << "'" << m_sessionID.getSessionQualifier() << "',"
00116 << "'" << sqlTime << "',"
00117 << m_cache.getNextTargetMsgSeqNum() << ","
00118 << m_cache.getNextSenderMsgSeqNum() << ")";
00119
00120 PostgreSQLQuery query2( queryString2.str() );
00121 if( !m_pConnection->execute(query2) )
00122 throw ConfigError( "Unable to create session in database" );
00123 }
00124
00125 QF_STACK_POP
00126 }
00127
00128 MessageStore* PostgreSQLStoreFactory::create( const SessionID& s )
00129 { QF_STACK_PUSH(PostgreSQLStoreFactory::create)
00130
00131 if( m_useSettings )
00132 return create( s, m_settings.get(s) );
00133 else if( m_useDictionary )
00134 return create( s, m_dictionary );
00135 else
00136 {
00137 DatabaseConnectionID id( m_database, m_user, m_password, m_host, m_port );
00138 return new PostgreSQLStore( s, id, m_connectionPoolPtr.get() );
00139 }
00140
00141 QF_STACK_POP
00142 }
00143
00144 MessageStore* PostgreSQLStoreFactory::create( const SessionID& s, const Dictionary& settings )
00145 { QF_STACK_PUSH(PostgreSQLStoreFactory::create)
00146
00147 std::string database = DEFAULT_DATABASE;
00148 std::string user = DEFAULT_USER;
00149 std::string password = DEFAULT_PASSWORD;
00150 std::string host = DEFAULT_HOST;
00151 short port = DEFAULT_PORT;
00152
00153 try { database = settings.getString( POSTGRESQL_STORE_DATABASE ); }
00154 catch( ConfigError& ) {}
00155
00156 try { user = settings.getString( POSTGRESQL_STORE_USER ); }
00157 catch( ConfigError& ) {}
00158
00159 try { password = settings.getString( POSTGRESQL_STORE_PASSWORD ); }
00160 catch( ConfigError& ) {}
00161
00162 try { host = settings.getString( POSTGRESQL_STORE_HOST ); }
00163 catch( ConfigError& ) {}
00164
00165 try { port = ( short ) settings.getLong( POSTGRESQL_STORE_PORT ); }
00166 catch( ConfigError& ) {}
00167
00168 DatabaseConnectionID id( database, user, password, host, port );
00169 return new PostgreSQLStore( s, id, m_connectionPoolPtr.get() );
00170
00171 QF_STACK_POP
00172 }
00173
00174 void PostgreSQLStoreFactory::destroy( MessageStore* pStore )
00175 { QF_STACK_PUSH(PostgreSQLStoreFactory::destroy)
00176 delete pStore;
00177 QF_STACK_POP
00178 }
00179
00180 bool PostgreSQLStore::set( int msgSeqNum, const std::string& msg )
00181 throw ( IOException )
00182 { QF_STACK_PUSH(PostgreSQLStore::set)
00183
00184 char* msgCopy = new char[ (msg.size() * 2) + 1 ];
00185 PQescapeString( msgCopy, msg.c_str(), msg.size() );
00186
00187 std::stringstream queryString;
00188 queryString << "INSERT INTO messages "
00189 << "(beginstring, sendercompid, targetcompid, session_qualifier, msgseqnum, message) "
00190 << "VALUES ("
00191 << "'" << m_sessionID.getBeginString().getValue() << "',"
00192 << "'" << m_sessionID.getSenderCompID().getValue() << "',"
00193 << "'" << m_sessionID.getTargetCompID().getValue() << "',"
00194 << "'" << m_sessionID.getSessionQualifier() << "',"
00195 << msgSeqNum << ","
00196 << "'" << msgCopy << "')";
00197
00198 delete [] msgCopy;
00199
00200 PostgreSQLQuery query( queryString.str() );
00201 if( !m_pConnection->execute(query) )
00202 {
00203 std::stringstream queryString2;
00204 queryString2 << "UPDATE messages SET message='" << msg << "' WHERE "
00205 << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00206 << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00207 << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00208 << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "' and "
00209 << "msgseqnum=" << msgSeqNum;
00210 PostgreSQLQuery query2( queryString2.str() );
00211 if( !m_pConnection->execute(query2) )
00212 query2.throwException();
00213 }
00214
00215 return true;
00216
00217 QF_STACK_POP
00218 }
00219
00220 void PostgreSQLStore::get( int begin, int end,
00221 std::vector < std::string > & result ) const
00222 throw ( IOException )
00223 { QF_STACK_PUSH(PostgreSQLStore::get)
00224
00225 result.clear();
00226 std::stringstream queryString;
00227 queryString << "SELECT message FROM messages WHERE "
00228 << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00229 << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00230 << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00231 << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "' and "
00232 << "msgseqnum>=" << begin << " and " << "msgseqnum<=" << end << " "
00233 << "ORDER BY msgseqnum";
00234
00235 PostgreSQLQuery query( queryString.str() );
00236 if( !m_pConnection->execute(query) )
00237 query.throwException();
00238
00239 int rows = query.rows();
00240 for( int row = 0; row < rows; row++ )
00241 result.push_back( query.getValue( row, 0 ) );
00242
00243 QF_STACK_POP
00244 }
00245
00246 int PostgreSQLStore::getNextSenderMsgSeqNum() const throw ( IOException )
00247 { QF_STACK_PUSH(PostgreSQLStore::getNextSenderMsgSeqNum)
00248 return m_cache.getNextSenderMsgSeqNum();
00249 QF_STACK_POP
00250 }
00251
00252 int PostgreSQLStore::getNextTargetMsgSeqNum() const throw ( IOException )
00253 { QF_STACK_PUSH(PostgreSQLStore::getNextTargetMsgSeqNum)
00254 return m_cache.getNextTargetMsgSeqNum();
00255 QF_STACK_POP
00256 }
00257
00258 void PostgreSQLStore::setNextSenderMsgSeqNum( int value ) throw ( IOException )
00259 { QF_STACK_PUSH(PostgreSQLStore::setNextSenderMsgSeqNum)
00260
00261 std::stringstream queryString;
00262 queryString << "UPDATE sessions SET outgoing_seqnum=" << value << " WHERE "
00263 << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00264 << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00265 << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00266 << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "'";
00267
00268 PostgreSQLQuery query( queryString.str() );
00269 if( !m_pConnection->execute(query) )
00270 query.throwException();
00271
00272 m_cache.setNextSenderMsgSeqNum( value );
00273
00274 QF_STACK_POP
00275 }
00276
00277 void PostgreSQLStore::setNextTargetMsgSeqNum( int value ) throw ( IOException )
00278 { QF_STACK_PUSH(PostgreSQLStore::setNextTargetMsgSeqNum)
00279
00280 std::stringstream queryString;
00281 queryString << "UPDATE sessions SET incoming_seqnum=" << value << " WHERE "
00282 << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00283 << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00284 << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00285 << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "'";
00286
00287 PostgreSQLQuery query( queryString.str() );
00288 if( !m_pConnection->execute(query) )
00289 query.throwException();
00290
00291 m_cache.setNextTargetMsgSeqNum( value );
00292
00293 QF_STACK_POP
00294 }
00295
00296 void PostgreSQLStore::incrNextSenderMsgSeqNum() throw ( IOException )
00297 { QF_STACK_PUSH(PostgreSQLStore::incrNextSenderMsgSeqNum)
00298 m_cache.incrNextSenderMsgSeqNum();
00299 setNextSenderMsgSeqNum( m_cache.getNextSenderMsgSeqNum() );
00300 QF_STACK_POP
00301 }
00302
00303 void PostgreSQLStore::incrNextTargetMsgSeqNum() throw ( IOException )
00304 { QF_STACK_PUSH(PostgreSQLStore::incrNextTargetMsgSeqNum)
00305 m_cache.incrNextTargetMsgSeqNum();
00306 setNextTargetMsgSeqNum( m_cache.getNextTargetMsgSeqNum() );
00307 QF_STACK_POP
00308 }
00309
00310 UtcTimeStamp PostgreSQLStore::getCreationTime() const throw ( IOException )
00311 { QF_STACK_PUSH(PostgreSQLStore::getCreationTime)
00312 return m_cache.getCreationTime();
00313 QF_STACK_POP
00314 }
00315
00316 void PostgreSQLStore::reset() throw ( IOException )
00317 { QF_STACK_PUSH(PostgreSQLStore::reset)
00318
00319 std::stringstream queryString;
00320 queryString << "DELETE FROM messages WHERE "
00321 << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00322 << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00323 << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00324 << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "'";
00325
00326 PostgreSQLQuery query( queryString.str() );
00327 if( !m_pConnection->execute(query) )
00328 query.throwException();
00329
00330 m_cache.reset();
00331 UtcTimeStamp time = m_cache.getCreationTime();
00332
00333 int year, month, day, hour, minute, second, millis;
00334 time.getYMD( year, month, day );
00335 time.getHMS( hour, minute, second, millis );
00336
00337 char sqlTime[ 20 ];
00338 STRING_SPRINTF( sqlTime, "%d-%02d-%02d %02d:%02d:%02d",
00339 year, month, day, hour, minute, second );
00340
00341 std::stringstream queryString2;
00342 queryString2 << "UPDATE sessions SET creation_time='" << sqlTime << "', "
00343 << "incoming_seqnum=" << m_cache.getNextTargetMsgSeqNum() << ", "
00344 << "outgoing_seqnum=" << m_cache.getNextSenderMsgSeqNum() << " WHERE "
00345 << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00346 << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00347 << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00348 << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "'";
00349
00350 PostgreSQLQuery query2( queryString2.str() );
00351 if( !m_pConnection->execute(query2) )
00352 query2.throwException();
00353
00354 QF_STACK_POP
00355 }
00356
00357 void PostgreSQLStore::refresh() throw ( IOException )
00358 { QF_STACK_PUSH(PostgreSQLStore::refresh)
00359
00360 m_cache.reset();
00361 populateCache();
00362
00363 QF_STACK_POP
00364 }
00365
00366 }
00367
00368 #endif