Home | Trees | Indices | Help |
---|
|
1 # 2 # Licensed to the Apache Software Foundation (ASF) under one 3 # or more contributor license agreements. See the NOTICE file 4 # distributed with this work for additional information 5 # regarding copyright ownership. The ASF licenses this file 6 # to you under the Apache License, Version 2.0 (the 7 # "License"); you may not use this file except in compliance 8 # with the License. You may obtain a copy of the License at 9 # 10 # http://www.apache.org/licenses/LICENSE-2.0 11 # 12 # Unless required by applicable law or agreed to in writing, 13 # software distributed under the License is distributed on an 14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 # KIND, either express or implied. See the License for the 16 # specific language governing permissions and limitations 17 # under the License. 18 # 19 import collections, socket, time, threading 20 21 from proton import ConnectionException, Delivery, Endpoint, Handler, LinkException, Message 22 from proton import ProtonException, Timeout, Url 23 from proton.reactor import Container 24 from proton.handlers import MessagingHandler, IncomingMessageHandler53 6029 self.connection = connection 30 self.link = link 31 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT), 32 msg="Opening link %s" % link.name) 33 self._checkClosed()3436 self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED, 37 timeout=timeout, 38 msg="Opening link %s" % self.link.name) 39 self._checkClosed()4042 if self.link.state & Endpoint.REMOTE_CLOSED: 43 self.link.close() 44 raise LinkDetached(self.link)4547 self.link.close() 48 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_ACTIVE), 49 msg="Closing link %s" % self.link.name)50 51 # Access to other link attributes.8063 super(BlockingSender, self).__init__(connection, sender) 64 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 65 #this may be followed by a detach, which may contain an error condition, so wait a little... 66 self._waitForClose() 67 #...but close ourselves if peer does not 68 self.link.close() 69 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)7072 delivery = self.link.send(msg) 73 self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name, timeout=timeout) 74 bad = error_states 75 if bad is None: 76 bad = [Delivery.REJECTED, Delivery.RELEASED] 77 if delivery.remote_state in bad: 78 raise SendException(delivery.remote_state) 79 return delivery11583 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 84 self.connection = connection 85 self.incoming = collections.deque([]) 86 self.unsettled = collections.deque([])8789 self.incoming.append((event.message, event.delivery)) 90 self.connection.container.yield_() # Wake up the wait() loop to handle the message.9193 if event.link.state & Endpoint.LOCAL_ACTIVE: 94 event.link.close() 95 raise LinkDetached(event.link)96 99 100 @property102 return len(self.incoming)103105 message, delivery = self.incoming.popleft() 106 if not delivery.settled: 107 self.unsettled.append(delivery) 108 return message109153119 super(BlockingReceiver, self).__init__(connection, receiver) 120 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 121 #this may be followed by a detach, which may contain an error condition, so wait a little... 122 self._waitForClose() 123 #...but close ourselves if peer does not 124 self.link.close() 125 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 126 if credit: receiver.flow(credit) 127 self.fetcher = fetcher128130 if not self.fetcher: 131 raise Exception("Can't call receive on this receiver as a handler was provided") 132 if not self.link.credit: 133 self.link.flow(1) 134 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout) 135 return self.fetcher.pop()136 139 142 148169157 self.link = link 158 if link.is_sender: 159 txt = "sender %s to %s closed" % (link.name, link.target.address) 160 else: 161 txt = "receiver %s from %s closed" % (link.name, link.source.address) 162 if link.remote_condition: 163 txt += " due to: %s" % link.remote_condition 164 self.condition = link.remote_condition.name 165 else: 166 txt += " by peer" 167 self.condition = None 168 super(LinkDetached, self).__init__(txt)182173 self.connection = connection 174 txt = "Connection %s closed" % self.url 175 if event.connection.remote_condition: 176 txt += " due to: %s" % event.connection.remote_condition 177 self.condition = connection.remote_condition.name 178 else: 179 txt += " by peer" 180 self.condition = None 181 super(ConnectionClosed, self).__init__(txt)185 """ 186 A synchronous style connection wrapper. 187 """258189 self.timeout = timeout 190 self.container = container or Container() 191 self.container.timeout = self.timeout 192 self.container.start() 193 self.url = Url(url).defaults() 194 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat) 195 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 196 msg="Opening connection")197199 return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, options=options))200201 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):202 prefetch = credit 203 if handler: 204 fetcher = None 205 if prefetch is None: 206 prefetch = 1 207 else: 208 fetcher = Fetcher(self, credit) 209 return BlockingReceiver( 210 self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)211213 self.conn.close() 214 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 215 msg="Closing connection")216218 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 219 while self.container.process(): pass220222 """Call process until condition() is true""" 223 if timeout is False: 224 timeout = self.timeout 225 if timeout is None: 226 while not condition(): 227 self.container.process() 228 else: 229 container_timeout = self.container.timeout 230 self.container.timeout = timeout 231 try: 232 deadline = time.time() + timeout 233 while not condition(): 234 self.container.process() 235 if deadline < time.time(): 236 txt = "Connection %s timed out" % self.url 237 if msg: txt += ": " + msg 238 raise Timeout(txt) 239 finally: 240 self.container.timeout = container_timeout241243 if event.link.state & Endpoint.LOCAL_ACTIVE: 244 event.link.close() 245 raise LinkDetached(event.link)246248 if event.connection.state & Endpoint.LOCAL_ACTIVE: 249 event.connection.close() 250 raise ConnectionClosed(event.connection)251253 self.on_transport_closed(event)254256 if event.connection.state & Endpoint.LOCAL_ACTIVE: 257 raise ConnectionException("Connection %s disconnected" % self.url);272261 """Thread-safe atomic counter. Start at start, increment by step.""" 262 self.count, self.step = start, step 263 self.lock = threading.Lock()264266 """Get the next value""" 267 self.lock.acquire() 268 self.count += self.step; 269 result = self.count 270 self.lock.release() 271 return result274 """ 275 Implementation of the synchronous request-responce (aka RPC) pattern. 276 @ivar address: Address for all requests, may be None. 277 @ivar connection: Connection for requests and responses. 278 """ 279 280 correlation_id = AtomicCount() 281320 321 @property283 """ 284 Send requests and receive responses. A single instance can send many requests 285 to the same or different addresses. 286 287 @param connection: A L{BlockingConnection} 288 @param address: Address for all requests. 289 If not specified, each request must have the address property set. 290 Sucessive messages may have different addresses. 291 """ 292 super(SyncRequestResponse, self).__init__() 293 self.connection = connection 294 self.address = address 295 self.sender = self.connection.create_sender(self.address) 296 # dynamic=true generates a unique address dynamically for this receiver. 297 # credit=1 because we want to receive 1 response message initially. 298 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 299 self.response = None300302 """ 303 Send a request message, wait for and return the response message. 304 305 @param request: A L{proton.Message}. If L{self.address} is not set the 306 L{self.address} must be set and will be used. 307 """ 308 if not self.address and not request.address: 309 raise ValueError("Request message has no address: %s" % request) 310 request.reply_to = self.reply_to 311 request.correlation_id = correlation_id = self.correlation_id.next() 312 self.sender.send(request) 313 def wakeup(): 314 return self.response and (self.response.correlation_id == correlation_id)315 self.connection.wait(wakeup, msg="Waiting for response") 316 response = self.response 317 self.response = None # Ready for next response. 318 self.receiver.flow(1) # Set up credit for the next response. 319 return response323 """Return the dynamic address of our receiver.""" 324 return self.receiver.remote_source.address325327 """Called when we receive a message for our receiver.""" 328 self.response = event.message 329 self.connection.container.yield_() # Wake up the wait() loop to handle the message.330
Home | Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Tue May 3 09:59:06 2016 | http://epydoc.sourceforge.net |