OmniEvents
ProxyPullSupplier.cc
Go to the documentation of this file.
1 // Package : omniEvents
2 // ProxyPullSupplier.cc Created : 2003/12/04
3 // Author : Alex Tingle
4 //
5 // Copyright (C) 2003-2005 Alex Tingle.
6 //
7 // This file is part of the omniEvents application.
8 //
9 // omniEvents is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // omniEvents is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 //
23 
24 #include "ProxyPullSupplier.h"
25 #include "EventChannel.h"
26 #include "Orb.h"
27 #include "omniEventsLog.h"
28 #include "PersistNode.h"
29 #include <assert.h>
30 
31 namespace OmniEvents {
32 
33 //
34 // ProxyPullSupplierManager
35 //
36 
37 PortableServer::Servant ProxyPullSupplierManager::incarnate(
38  const PortableServer::ObjectId& oid,
39  PortableServer::POA_ptr poa
40 )
41 {
42  // Evict the oldest proxy servant, if we have reached the maximum number.
43  if(_servants.size()>=_channel.maxNumProxies())
44  {
45  ProxyPullSupplier_i* oldest =NULL;
46  unsigned long age =0;
47  for(set<Proxy*>::iterator i=_servants.begin(); i!=_servants.end(); ++i)
48  if(!oldest || dynamic_cast<ProxyPullSupplier_i*>(*i)->timestamp()<age)
49  {
50  oldest=dynamic_cast<ProxyPullSupplier_i*>(*i);
51  age=oldest->timestamp();
52  }
53  DB(5,"Evicting oldest ProxyPullSupplier to make space for a new one")
54  try{ oldest->disconnect_pull_supplier(); }catch(CORBA::OBJECT_NOT_EXIST&){}
55  }
56  // Make a new servant.
58  _servants.insert(result);
59  return result;
60 }
61 
63  const EventChannel_i& channel,
64  PortableServer::POA_ptr parentPoa,
65  EventQueue& q
66 )
67 : ProxyManager(parentPoa),
68  _queue(q),
69  _channel(channel)
70 {
71  ProxyManager::activate("ProxyPullSupplier");
72 }
73 
75 {
76  DB(20,"~ProxyPullSupplierManager()")
77 }
78 
80 
81 CosEventChannelAdmin::ProxyPullSupplier_ptr
83 {
84  return createNarrowedReference<CosEventChannelAdmin::ProxyPullSupplier>(
85  _managedPoa.in(),
86  CosEventChannelAdmin::_tc_ProxyPullSupplier->id()
87  );
88 }
89 
91 {
92  for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
93  {
94  ProxyPullSupplier_i* pps =dynamic_cast<ProxyPullSupplier_i*>(*i);
95  // We are in the EventChannel's thread.
96  // Make sure all calls go though the ProxyPullSupplier POA.
97  CosEventChannelAdmin::ProxyPullSupplier_var ppsv =pps->_this();
99 
100  }
101 }
102 
103 
104 //
105 // ProxyPullSupplier_i
106 //
107 
108 // CORBA interface methods
109 
111  CosEventComm::PullConsumer_ptr pullConsumer
112 )
113 {
114  if(_connected || !CORBA::is_nil(_target) || !CORBA::is_nil(_req))
115  throw CosEventChannelAdmin::AlreadyConnected();
116  touch();
117  _connected=true;
118  if(!CORBA::is_nil(pullConsumer))
119  _target=CosEventComm::PullConsumer::_duplicate(pullConsumer);
120 
122  {
123  WriteLock log;
124  output(log.os);
125  }
126 }
127 
129 {
130  DB(5,"ProxyPullSupplier_i::disconnect_pull_supplier()");
131  touch();
132  eraseKey("ConsumerAdmin/ProxyPullSupplier");
134  if(!_connected)
135  {
136  throw CORBA::OBJECT_NOT_EXIST(
137  IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
138  CORBA::COMPLETED_NO
139  );
140  }
141  else if(!CORBA::is_nil(_target))
142  {
143  CORBA::Request_var req=_target->_request("disconnect_pull_consumer");
144  _target=CosEventComm::PullConsumer::_nil();
145  req->send_deferred();
146  Orb::inst().deferredRequest(req._retn());
147  }
148 }
149 
151 {
152  if(!_connected)
153  throw CosEventComm::Disconnected();
154  touch();
155  if(moreEvents())
156  return new CORBA::Any(*nextEvent());
157  else
158  throw CORBA::TRANSIENT(
159  IFELSE_OMNIORB4(omni::TRANSIENT_CallTimedout,0),
160  CORBA::COMPLETED_NO
161  );
162 }
163 
164 CORBA::Any* ProxyPullSupplier_i::try_pull(CORBA::Boolean& has_event)
165 {
166  if(!_connected)
167  throw CosEventComm::Disconnected();
168  touch();
169  if(moreEvents())
170  {
171  has_event=1;
172  return new CORBA::Any(*nextEvent());
173  }
174  else
175  {
176  has_event=0;
177  return new CORBA::Any();
178  }
179 }
180 
181 //
182 
184  PortableServer::POA_ptr poa,
185  EventQueue& q
186 )
187 : Proxy(poa),
188  EventQueue::Reader(q),
189  _target(CosEventComm::PullConsumer::_nil()),
190  _connected(false),
191  _timestamp(0)
192 {
193  touch();
194 }
195 
197 {
198  DB(20,"~ProxyPullSupplier_i()")
199 }
200 
202  const string& oid,
203  const PersistNode& node
204 )
205 {
206  CosEventComm::PullConsumer_var pullConsumer =
207  string_to_<CosEventComm::PullConsumer>(node.attrString("IOR").c_str());
208  // Do not activate until we know that we have read a valid target.
209  activateObjectWithId(oid.c_str());
210  connect_pull_consumer(pullConsumer.in());
211 }
212 
214 {
215  basicOutput(os,"ConsumerAdmin/ProxyPullSupplier",_target.in());
216 }
217 
219 {
220  unsigned long nsec; // dummy
221  omni_thread::get_time(&_timestamp,&nsec);
222 }
223 
224 }; // end namespace OmniEvents
OmniEvents::EventChannel_i
Servant for CosEventChannelAdmin::EventChannel objects, also inherits from omni_thread.
Definition: EventChannel.h:111
OmniEvents::ProxyManager::_servants
set< Proxy * > _servants
The set of all active Proxies in this object's _managedPoa.
Definition: ProxyManager.h:90
OmniEvents::ProxyPullSupplier_i::~ProxyPullSupplier_i
~ProxyPullSupplier_i()
Definition: ProxyPullSupplier.cc:196
PersistNode.h
OmniEvents::ProxyPullSupplier_i::ProxyPullSupplier_i
ProxyPullSupplier_i(PortableServer::POA_ptr poa, EventQueue &q)
Definition: ProxyPullSupplier.cc:183
OmniEvents::Servant::deactivateObject
void deactivateObject()
Calls deactivate_object() to deactivate this servant in its POA.
Definition: Servant.cc:160
Orb.h
OmniEvents::ProxyPullSupplier_i::try_pull
CORBA::Any * try_pull(CORBA::Boolean &has_event)
Definition: ProxyPullSupplier.cc:164
OmniEvents::ProxyPullSupplier_i::reincarnate
void reincarnate(const string &oid, const PersistNode &node)
Re-create a servant from information saved in the log file.
Definition: ProxyPullSupplier.cc:201
OmniEvents::ProxyPullSupplierManager::incarnate
PortableServer::Servant incarnate(const PortableServer::ObjectId &oid, PortableServer::POA_ptr poa)
Definition: ProxyPullSupplier.cc:37
OMNIEVENTS__DEBUG_REF_COUNTS__DEFN
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Definition: Servant.h:70
OmniEvents::ProxyManager
Base class for ServantActivator classes that manage Proxy servants.
Definition: ProxyManager.h:57
OmniEvents::ProxyPullSupplierManager::createObject
OMNIEVENTS__DEBUG_REF_COUNTS__DECL CosEventChannelAdmin::ProxyPullSupplier_ptr createObject()
Definition: ProxyPullSupplier.cc:82
OmniEvents::ProxyPullSupplier_i::output
void output(ostream &os)
Save this object's state to a stream.
Definition: ProxyPullSupplier.cc:213
OmniEvents::ProxyPullSupplierManager::_queue
EventQueue & _queue
Reference to queue shared with ProxyPushSuppliers.
Definition: ProxyPullSupplier.h:74
OmniEvents::Proxy::basicOutput
void basicOutput(ostream &os, const char *name, CORBA::Object_ptr target=CORBA::Object::_nil(), const char *extraAttributes=NULL)
Helper method for constructing persistency output.
Definition: ProxyManager.cc:201
DB
#define DB(l, x)
Definition: Orb.h:49
OmniEvents::ProxyPullSupplierManager::_channel
const EventChannel_i & _channel
Definition: ProxyPullSupplier.h:73
OmniEvents::ProxyManager::activate
void activate(const char *name)
Creates the Proxy-type's POA, and registers this object as its ServantManager.
Definition: ProxyManager.cc:103
OmniEvents::Servant::activateObjectWithId
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
Definition: Servant.cc:125
OmniEvents::ProxyPullSupplierManager
Definition: ProxyPullSupplier.h:50
OmniEvents::ProxyPullSupplier_i::pull
CORBA::Any * pull()
Definition: ProxyPullSupplier.cc:150
OmniEvents::EventQueue::Reader::moreEvents
bool moreEvents() const
Definition: EventQueue.cc:78
OmniEvents::Orb::inst
static Orb & inst()
Definition: Orb.h:81
OmniEvents::Proxy::_req
CORBA::Request_var _req
Definition: ProxyManager.h:128
OmniEvents::PersistNode
Definition: PersistNode.h:48
OmniEvents::ProxyPullSupplier_i::_target
CosEventComm::PullConsumer_var _target
Definition: ProxyPullSupplier.h:101
OmniEvents::ProxyPullSupplier_i::_connected
bool _connected
Can't use _target to keep track of whether this object is connected, because it is legal to connect w...
Definition: ProxyPullSupplier.h:106
OmniEvents::ProxyPullSupplier_i::timestamp
unsigned long timestamp() const
Definition: ProxyPullSupplier.h:99
OmniEvents::WriteLock
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access.
Definition: omniEventsLog.h:242
OmniEvents::Proxy
Base class for three of the four Proxy servants.
Definition: ProxyManager.h:104
OmniEvents::omniEventsLog::exists
static bool exists()
Library code may create Event Service objects without the need for persistency.
Definition: omniEventsLog.h:144
OmniEvents
Definition: Callback.h:39
ProxyPullSupplier.h
omniEventsLog.h
OmniEvents::Proxy::eraseKey
void eraseKey(const char *name)
Helper method for constructing persistency output.
Definition: ProxyManager.cc:189
EventChannel.h
OmniEvents::ProxyManager::_managedPoa
PortableServer::POA_var _managedPoa
The POA owned & managed by this object.
Definition: ProxyManager.h:95
IFELSE_OMNIORB4
#define IFELSE_OMNIORB4(omniORB4_code, default_code)
Definition: Orb.h:45
OmniEvents::ProxyPullSupplier_i::connect_pull_consumer
void connect_pull_consumer(CosEventComm::PullConsumer_ptr pullConsumer)
Definition: ProxyPullSupplier.cc:110
OmniEvents::ProxyPullSupplierManager::disconnect
void disconnect()
Send disconnect_pull_consumer() to all connected PullConsumers.
Definition: ProxyPullSupplier.cc:90
OmniEvents::ProxyPullSupplierManager::ProxyPullSupplierManager
ProxyPullSupplierManager(const EventChannel_i &channel, PortableServer::POA_ptr parentPoa, EventQueue &q)
Definition: ProxyPullSupplier.cc:62
OmniEvents::ProxyPullSupplier_i::_timestamp
unsigned long _timestamp
Keep track of when this proxy was last contacted.
Definition: ProxyPullSupplier.h:108
OmniEvents::ProxyPullSupplier_i
Servant for ProxyPullSupplier interface.
Definition: ProxyPullSupplier.h:84
OmniEvents::PersistNode::attrString
string attrString(const string &key, const string &fallback="") const
Definition: PersistNode.cc:155
OmniEvents::ProxyPullSupplier_i::disconnect_pull_supplier
void disconnect_pull_supplier()
Definition: ProxyPullSupplier.cc:128
OmniEvents::EventQueue::Reader::nextEvent
CORBA::Any * nextEvent()
Definition: EventQueue.cc:84
OmniEvents::ProxyPullSupplier_i::touch
void touch()
Update the _timestamp to the current moment.
Definition: ProxyPullSupplier.cc:218
OmniEvents::WriteLock::os
ostream & os
Definition: omniEventsLog.h:254
OmniEvents::Orb::deferredRequest
void deferredRequest(CORBA::Request_ptr req, Callback *callback=NULL)
Adopts the request and then stores it in _deferredRequests.
Definition: Orb.cc:187
OmniEvents::EventChannel_i::maxNumProxies
CORBA::ULong maxNumProxies() const
Definition: EventChannel.h:179
OmniEvents::ProxyPullSupplierManager::~ProxyPullSupplierManager
~ProxyPullSupplierManager()
Definition: ProxyPullSupplier.cc:74
OmniEvents::EventQueue
The EventQueue is a circular buffer, that contains _size-1 events.
Definition: EventQueue.h:56