1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """
21 The proton module defines a suite of APIs that implement the AMQP 1.0
22 protocol.
23
24 The proton APIs consist of the following classes:
25
26 - L{Messenger} -- A messaging endpoint.
27 - L{Message} -- A class for creating and/or accessing AMQP message content.
28 - L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded
29 data.
30
31 """
32 from __future__ import absolute_import
33
34 from cproton import *
35 from .wrapper import Wrapper
36 from . import _compat
37
38 import weakref, socket, sys, threading
39
40 try:
41 import uuid
45
46 except ImportError:
47 """
48 No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases.
49 """
50 import struct
53 - def __init__(self, hex=None, bytes=None):
54 if [hex, bytes].count(None) != 1:
55 raise TypeError("need one of hex or bytes")
56 if bytes is not None:
57 self.bytes = bytes
58 elif hex is not None:
59 fields=hex.split("-")
60 fields[4:5] = [fields[4][:4], fields[4][4:]]
61 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
62
64 if isinstance(other, uuid.UUID):
65 return cmp(self.bytes, other.bytes)
66 else:
67 return -1
68
70 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
71
73 return "UUID(%r)" % str(self)
74
77
78 import os, random, time
79 rand = random.Random()
80 rand.seed((os.getpid(), time.time(), socket.gethostname()))
82 data = [rand.randint(0, 255) for i in xrange(16)]
83
84
85 data[6] &= 0x0F
86 data[6] |= 0x40
87
88
89 data[8] &= 0x3F
90 data[8] |= 0x80
91 return "".join(map(chr, data))
92
94 return uuid.UUID(bytes=random_uuid())
95
98
99
100
101
102 try:
103 bytes()
104 except NameError:
105 bytes = str
106 try:
107 long()
108 except NameError:
109 long = int
110 try:
111 unicode()
112 except NameError:
113 unicode = str
114
115
116 VERSION_MAJOR = PN_VERSION_MAJOR
117 VERSION_MINOR = PN_VERSION_MINOR
118 API_LANGUAGE = "C"
119 IMPLEMENTATION_LANGUAGE = "C"
128
130 """
131 The root of the proton exception hierarchy. All proton exception
132 classes derive from this exception.
133 """
134 pass
135
137 """
138 A timeout exception indicates that a blocking operation has timed
139 out.
140 """
141 pass
142
144 """
145 An interrupt exception indicaes that a blocking operation was interrupted.
146 """
147 pass
148
150 """
151 The root of the messenger exception hierarchy. All exceptions
152 generated by the messenger class derive from this exception.
153 """
154 pass
155
157 """
158 The MessageException class is the root of the message exception
159 hierarhcy. All exceptions generated by the Message class derive from
160 this exception.
161 """
162 pass
163
164 EXCEPTIONS = {
165 PN_TIMEOUT: Timeout,
166 PN_INTR: Interrupt
167 }
168
169 PENDING = Constant("PENDING")
170 ACCEPTED = Constant("ACCEPTED")
171 REJECTED = Constant("REJECTED")
172 RELEASED = Constant("RELEASED")
173 MODIFIED = Constant("MODIFIED")
174 ABORTED = Constant("ABORTED")
175 SETTLED = Constant("SETTLED")
176
177 STATUSES = {
178 PN_STATUS_ABORTED: ABORTED,
179 PN_STATUS_ACCEPTED: ACCEPTED,
180 PN_STATUS_REJECTED: REJECTED,
181 PN_STATUS_RELEASED: RELEASED,
182 PN_STATUS_MODIFIED: MODIFIED,
183 PN_STATUS_PENDING: PENDING,
184 PN_STATUS_SETTLED: SETTLED,
185 PN_STATUS_UNKNOWN: None
186 }
187
188 AUTOMATIC = Constant("AUTOMATIC")
189 MANUAL = Constant("MANUAL")
192 """
193 The L{Messenger} class defines a high level interface for sending
194 and receiving L{Messages<Message>}. Every L{Messenger} contains a
195 single logical queue of incoming messages and a single logical queue
196 of outgoing messages. These messages in these queues may be destined
197 for, or originate from, a variety of addresses.
198
199 The messenger interface is single-threaded. All methods
200 except one (L{interrupt}) are intended to be used from within
201 the messenger thread.
202
203
204 Address Syntax
205 ==============
206
207 An address has the following form::
208
209 [ amqp[s]:// ] [user[:password]@] domain [/[name]]
210
211 Where domain can be one of::
212
213 host | host:port | ip | ip:port | name
214
215 The following are valid examples of addresses:
216
217 - example.org
218 - example.org:1234
219 - amqp://example.org
220 - amqps://example.org
221 - example.org/incoming
222 - amqps://example.org/outgoing
223 - amqps://fred:trustno1@example.org
224 - 127.0.0.1:1234
225 - amqps://127.0.0.1:1234
226
227 Sending & Receiving Messages
228 ============================
229
230 The L{Messenger} class works in conjuction with the L{Message} class. The
231 L{Message} class is a mutable holder of message content.
232
233 The L{put} method copies its L{Message} to the outgoing queue, and may
234 send queued messages if it can do so without blocking. The L{send}
235 method blocks until it has sent the requested number of messages,
236 or until a timeout interrupts the attempt.
237
238
239 >>> message = Message()
240 >>> for i in range(3):
241 ... message.address = "amqp://host/queue"
242 ... message.subject = "Hello World %i" % i
243 ... messenger.put(message)
244 >>> messenger.send()
245
246 Similarly, the L{recv} method receives messages into the incoming
247 queue, and may block as it attempts to receive the requested number
248 of messages, or until timeout is reached. It may receive fewer
249 than the requested number. The L{get} method pops the
250 eldest L{Message} off the incoming queue and copies it into the L{Message}
251 object that you supply. It will not block.
252
253
254 >>> message = Message()
255 >>> messenger.recv(10):
256 >>> while messenger.incoming > 0:
257 ... messenger.get(message)
258 ... print message.subject
259 Hello World 0
260 Hello World 1
261 Hello World 2
262
263 The blocking flag allows you to turn off blocking behavior entirely,
264 in which case L{send} and L{recv} will do whatever they can without
265 blocking, and then return. You can then look at the number
266 of incoming and outgoing messages to see how much outstanding work
267 still remains.
268 """
269
271 """
272 Construct a new L{Messenger} with the given name. The name has
273 global scope. If a NULL name is supplied, a UUID based name will
274 be chosen.
275
276 @type name: string
277 @param name: the name of the messenger or None
278
279 """
280 self._mng = pn_messenger(name)
281 self._selectables = {}
282
284 """
285 Destroy the L{Messenger}. This will close all connections that
286 are managed by the L{Messenger}. Call the L{stop} method before
287 destroying the L{Messenger}.
288 """
289 if hasattr(self, "_mng"):
290 pn_messenger_free(self._mng)
291 del self._mng
292
301
302 @property
304 """
305 The name of the L{Messenger}.
306 """
307 return pn_messenger_name(self._mng)
308
310 return pn_messenger_get_certificate(self._mng)
311
313 self._check(pn_messenger_set_certificate(self._mng, value))
314
315 certificate = property(_get_certificate, _set_certificate,
316 doc="""
317 Path to a certificate file for the L{Messenger}. This certificate is
318 used when the L{Messenger} accepts or establishes SSL/TLS connections.
319 This property must be specified for the L{Messenger} to accept
320 incoming SSL/TLS connections and to establish client authenticated
321 outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS
322 connections do not require this property.
323 """)
324
326 return pn_messenger_get_private_key(self._mng)
327
329 self._check(pn_messenger_set_private_key(self._mng, value))
330
331 private_key = property(_get_private_key, _set_private_key,
332 doc="""
333 Path to a private key file for the L{Messenger's<Messenger>}
334 certificate. This property must be specified for the L{Messenger} to
335 accept incoming SSL/TLS connections and to establish client
336 authenticated outgoing SSL/TLS connection. Non client authenticated
337 SSL/TLS connections do not require this property.
338 """)
339
341 return pn_messenger_get_password(self._mng)
342
344 self._check(pn_messenger_set_password(self._mng, value))
345
346 password = property(_get_password, _set_password,
347 doc="""
348 This property contains the password for the L{Messenger.private_key}
349 file, or None if the file is not encrypted.
350 """)
351
353 return pn_messenger_get_trusted_certificates(self._mng)
354
356 self._check(pn_messenger_set_trusted_certificates(self._mng, value))
357
358 trusted_certificates = property(_get_trusted_certificates,
359 _set_trusted_certificates,
360 doc="""
361 A path to a database of trusted certificates for use in verifying the
362 peer on an SSL/TLS connection. If this property is None, then the peer
363 will not be verified.
364 """)
365
367 t = pn_messenger_get_timeout(self._mng)
368 if t == -1:
369 return None
370 else:
371 return millis2secs(t)
372
374 if value is None:
375 t = -1
376 else:
377 t = secs2millis(value)
378 self._check(pn_messenger_set_timeout(self._mng, t))
379
380 timeout = property(_get_timeout, _set_timeout,
381 doc="""
382 The timeout property contains the default timeout for blocking
383 operations performed by the L{Messenger}.
384 """)
385
387 return pn_messenger_is_blocking(self._mng)
388
390 self._check(pn_messenger_set_blocking(self._mng, b))
391
392 blocking = property(_is_blocking, _set_blocking,
393 doc="""
394 Enable or disable blocking behavior during L{Message} sending
395 and receiving. This affects every blocking call, with the
396 exception of L{work}. Currently, the affected calls are
397 L{send}, L{recv}, and L{stop}.
398 """)
399
401 return pn_messenger_is_passive(self._mng)
402
404 self._check(pn_messenger_set_passive(self._mng, b))
405
406 passive = property(_is_passive, _set_passive,
407 doc="""
408 When passive is set to true, Messenger will not attempt to perform I/O
409 internally. In this mode it is necessary to use the selectables API to
410 drive any I/O needed to perform requested actions. In this mode
411 Messenger will never block.
412 """)
413
415 return pn_messenger_get_incoming_window(self._mng)
416
418 self._check(pn_messenger_set_incoming_window(self._mng, window))
419
420 incoming_window = property(_get_incoming_window, _set_incoming_window,
421 doc="""
422 The incoming tracking window for the messenger. The messenger will
423 track the remote status of this many incoming deliveries after they
424 have been accepted or rejected. Defaults to zero.
425
426 L{Messages<Message>} enter this window only when you take them into your application
427 using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>}
428 without explicitly accepting or rejecting the oldest message, then the
429 message that passes beyond the edge of the incoming window will be assigned
430 the default disposition of its link.
431 """)
432
434 return pn_messenger_get_outgoing_window(self._mng)
435
437 self._check(pn_messenger_set_outgoing_window(self._mng, window))
438
439 outgoing_window = property(_get_outgoing_window, _set_outgoing_window,
440 doc="""
441 The outgoing tracking window for the messenger. The messenger will
442 track the remote status of this many outgoing deliveries after calling
443 send. Defaults to zero.
444
445 A L{Message} enters this window when you call the put() method with the
446 message. If your outgoing window size is I{n}, and you call L{put} I{n}+1
447 times, status information will no longer be available for the
448 first message.
449 """)
450
452 """
453 Currently a no-op placeholder.
454 For future compatibility, do not L{send} or L{recv} messages
455 before starting the L{Messenger}.
456 """
457 self._check(pn_messenger_start(self._mng))
458
460 """
461 Transitions the L{Messenger} to an inactive state. An inactive
462 L{Messenger} will not send or receive messages from its internal
463 queues. A L{Messenger} should be stopped before being discarded to
464 ensure a clean shutdown handshake occurs on any internally managed
465 connections.
466 """
467 self._check(pn_messenger_stop(self._mng))
468
469 @property
471 """
472 Returns true iff a L{Messenger} is in the stopped state.
473 This function does not block.
474 """
475 return pn_messenger_stopped(self._mng)
476
478 """
479 Subscribes the L{Messenger} to messages originating from the
480 specified source. The source is an address as specified in the
481 L{Messenger} introduction with the following addition. If the
482 domain portion of the address begins with the '~' character, the
483 L{Messenger} will interpret the domain as host/port, bind to it,
484 and listen for incoming messages. For example "~0.0.0.0",
485 "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any
486 local interface and listen for incoming messages with the last
487 variant only permitting incoming SSL connections.
488
489 @type source: string
490 @param source: the source of messages to subscribe to
491 """
492 sub_impl = pn_messenger_subscribe(self._mng, source)
493 if not sub_impl:
494 self._check(pn_error_code(pn_messenger_error(self._mng)))
495 raise MessengerException("Cannot subscribe to %s"%source)
496 return Subscription(sub_impl)
497
498 - def put(self, message):
499 """
500 Places the content contained in the message onto the outgoing
501 queue of the L{Messenger}. This method will never block, however
502 it will send any unblocked L{Messages<Message>} in the outgoing
503 queue immediately and leave any blocked L{Messages<Message>}
504 remaining in the outgoing queue. The L{send} call may be used to
505 block until the outgoing queue is empty. The L{outgoing} property
506 may be used to check the depth of the outgoing queue.
507
508 When the content in a given L{Message} object is copied to the outgoing
509 message queue, you may then modify or discard the L{Message} object
510 without having any impact on the content in the outgoing queue.
511
512 This method returns an outgoing tracker for the L{Message}. The tracker
513 can be used to determine the delivery status of the L{Message}.
514
515 @type message: Message
516 @param message: the message to place in the outgoing queue
517 @return: a tracker
518 """
519 message._pre_encode()
520 self._check(pn_messenger_put(self._mng, message._msg))
521 return pn_messenger_outgoing_tracker(self._mng)
522
524 """
525 Gets the last known remote state of the delivery associated with
526 the given tracker.
527
528 @type tracker: tracker
529 @param tracker: the tracker whose status is to be retrieved
530
531 @return: one of None, PENDING, REJECTED, MODIFIED, or ACCEPTED
532 """
533 disp = pn_messenger_status(self._mng, tracker);
534 return STATUSES.get(disp, disp)
535
537 """
538 Checks if the delivery associated with the given tracker is still
539 waiting to be sent.
540
541 @type tracker: tracker
542 @param tracker: the tracker whose status is to be retrieved
543
544 @return: true if delivery is still buffered
545 """
546 return pn_messenger_buffered(self._mng, tracker);
547
548 - def settle(self, tracker=None):
549 """
550 Frees a L{Messenger} from tracking the status associated with a given
551 tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up
552 to the most recent will be settled.
553 """
554 if tracker is None:
555 tracker = pn_messenger_outgoing_tracker(self._mng)
556 flags = PN_CUMULATIVE
557 else:
558 flags = 0
559 self._check(pn_messenger_settle(self._mng, tracker, flags))
560
561 - def send(self, n=-1):
562 """
563 This call will block until the indicated number of L{messages<Message>}
564 have been sent, or until the operation times out. If n is -1 this call will
565 block until all outgoing L{messages<Message>} have been sent. If n is 0 then
566 this call will send whatever it can without blocking.
567 """
568 self._check(pn_messenger_send(self._mng, n))
569
570 - def recv(self, n=None):
571 """
572 Receives up to I{n} L{messages<Message>} into the incoming queue. If no value
573 for I{n} is supplied, this call will receive as many L{messages<Message>} as it
574 can buffer internally. If the L{Messenger} is in blocking mode, this
575 call will block until at least one L{Message} is available in the
576 incoming queue.
577 """
578 if n is None:
579 n = -1
580 self._check(pn_messenger_recv(self._mng, n))
581
582 - def work(self, timeout=None):
583 """
584 Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}.
585 This will block for the indicated timeout.
586 This method may also do I/O work other than sending and receiving
587 L{messages<Message>}. For example, closing connections after messenger.L{stop}()
588 has been called.
589 """
590 if timeout is None:
591 t = -1
592 else:
593 t = secs2millis(timeout)
594 err = pn_messenger_work(self._mng, t)
595 if (err == PN_TIMEOUT):
596 return False
597 else:
598 self._check(err)
599 return True
600
601 @property
603 return pn_messenger_receiving(self._mng)
604
606 """
607 The L{Messenger} interface is single-threaded.
608 This is the only L{Messenger} function intended to be called
609 from outside of the L{Messenger} thread.
610 Call this from a non-messenger thread to interrupt
611 a L{Messenger} that is blocking.
612 This will cause any in-progress blocking call to throw
613 the L{Interrupt} exception. If there is no currently blocking
614 call, then the next blocking call will be affected, even if it
615 is within the same thread that interrupt was called from.
616 """
617 self._check(pn_messenger_interrupt(self._mng))
618
619 - def get(self, message=None):
620 """
621 Moves the message from the head of the incoming message queue into
622 the supplied message object. Any content in the message will be
623 overwritten.
624
625 A tracker for the incoming L{Message} is returned. The tracker can
626 later be used to communicate your acceptance or rejection of the
627 L{Message}.
628
629 If None is passed in for the L{Message} object, the L{Message}
630 popped from the head of the queue is discarded.
631
632 @type message: Message
633 @param message: the destination message object
634 @return: a tracker
635 """
636 if message is None:
637 impl = None
638 else:
639 impl = message._msg
640 self._check(pn_messenger_get(self._mng, impl))
641 if message is not None:
642 message._post_decode()
643 return pn_messenger_incoming_tracker(self._mng)
644
645 - def accept(self, tracker=None):
646 """
647 Signal the sender that you have acted on the L{Message}
648 pointed to by the tracker. If no tracker is supplied,
649 then all messages that have been returned by the L{get}
650 method are accepted, except those that have already been
651 auto-settled by passing beyond your incoming window size.
652
653 @type tracker: tracker
654 @param tracker: a tracker as returned by get
655 """
656 if tracker is None:
657 tracker = pn_messenger_incoming_tracker(self._mng)
658 flags = PN_CUMULATIVE
659 else:
660 flags = 0
661 self._check(pn_messenger_accept(self._mng, tracker, flags))
662
663 - def reject(self, tracker=None):
664 """
665 Rejects the L{Message} indicated by the tracker. If no tracker
666 is supplied, all messages that have been returned by the L{get}
667 method are rejected, except those that have already been auto-settled
668 by passing beyond your outgoing window size.
669
670 @type tracker: tracker
671 @param tracker: a tracker as returned by get
672 """
673 if tracker is None:
674 tracker = pn_messenger_incoming_tracker(self._mng)
675 flags = PN_CUMULATIVE
676 else:
677 flags = 0
678 self._check(pn_messenger_reject(self._mng, tracker, flags))
679
680 @property
682 """
683 The outgoing queue depth.
684 """
685 return pn_messenger_outgoing(self._mng)
686
687 @property
689 """
690 The incoming queue depth.
691 """
692 return pn_messenger_incoming(self._mng)
693
694 - def route(self, pattern, address):
695 """
696 Adds a routing rule to a L{Messenger's<Messenger>} internal routing table.
697
698 The route procedure may be used to influence how a L{Messenger} will
699 internally treat a given address or class of addresses. Every call
700 to the route procedure will result in L{Messenger} appending a routing
701 rule to its internal routing table.
702
703 Whenever a L{Message} is presented to a L{Messenger} for delivery, it
704 will match the address of this message against the set of routing
705 rules in order. The first rule to match will be triggered, and
706 instead of routing based on the address presented in the message,
707 the L{Messenger} will route based on the address supplied in the rule.
708
709 The pattern matching syntax supports two types of matches, a '%'
710 will match any character except a '/', and a '*' will match any
711 character including a '/'.
712
713 A routing address is specified as a normal AMQP address, however it
714 may additionally use substitution variables from the pattern match
715 that triggered the rule.
716
717 Any message sent to "foo" will be routed to "amqp://foo.com":
718
719 >>> messenger.route("foo", "amqp://foo.com");
720
721 Any message sent to "foobar" will be routed to
722 "amqp://foo.com/bar":
723
724 >>> messenger.route("foobar", "amqp://foo.com/bar");
725
726 Any message sent to bar/<path> will be routed to the corresponding
727 path within the amqp://bar.com domain:
728
729 >>> messenger.route("bar/*", "amqp://bar.com/$1");
730
731 Route all L{messages<Message>} over TLS:
732
733 >>> messenger.route("amqp:*", "amqps:$1")
734
735 Supply credentials for foo.com:
736
737 >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1");
738
739 Supply credentials for all domains:
740
741 >>> messenger.route("amqp://*", "amqp://user:password@$1");
742
743 Route all addresses through a single proxy while preserving the
744 original destination:
745
746 >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2");
747
748 Route any address through a single broker:
749
750 >>> messenger.route("*", "amqp://user:password@broker/$1");
751 """
752 self._check(pn_messenger_route(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
753
754 - def rewrite(self, pattern, address):
755 """
756 Similar to route(), except that the destination of
757 the L{Message} is determined before the message address is rewritten.
758
759 The outgoing address is only rewritten after routing has been
760 finalized. If a message has an outgoing address of
761 "amqp://0.0.0.0:5678", and a rewriting rule that changes its
762 outgoing address to "foo", it will still arrive at the peer that
763 is listening on "amqp://0.0.0.0:5678", but when it arrives there,
764 the receiver will see its outgoing address as "foo".
765
766 The default rewrite rule removes username and password from addresses
767 before they are transmitted.
768 """
769 self._check(pn_messenger_rewrite(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
770
772 return Selectable.wrap(pn_messenger_selectable(self._mng))
773
774 @property
776 tstamp = pn_messenger_deadline(self._mng)
777 if tstamp:
778 return millis2secs(tstamp)
779 else:
780 return None
781
783 """The L{Message} class is a mutable holder of message content.
784
785 @ivar instructions: delivery instructions for the message
786 @type instructions: dict
787 @ivar annotations: infrastructure defined message annotations
788 @type annotations: dict
789 @ivar properties: application defined message properties
790 @type properties: dict
791 @ivar body: message body
792 @type body: bytes | unicode | dict | list | int | long | float | UUID
793 """
794
795 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
796
797 - def __init__(self, body=None, **kwargs):
798 """
799 @param kwargs: Message property name/value pairs to initialise the Message
800 """
801 self._msg = pn_message()
802 self._id = Data(pn_message_id(self._msg))
803 self._correlation_id = Data(pn_message_correlation_id(self._msg))
804 self.instructions = None
805 self.annotations = None
806 self.properties = None
807 self.body = body
808 for k,v in _compat.iteritems(kwargs):
809 getattr(self, k)
810 setattr(self, k, v)
811
813 if hasattr(self, "_msg"):
814 pn_message_free(self._msg)
815 del self._msg
816
818 if err < 0:
819 exc = EXCEPTIONS.get(err, MessageException)
820 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg))))
821 else:
822 return err
823
842
843 - def _post_decode(self):
844 inst = Data(pn_message_instructions(self._msg))
845 ann = Data(pn_message_annotations(self._msg))
846 props = Data(pn_message_properties(self._msg))
847 body = Data(pn_message_body(self._msg))
848
849 if inst.next():
850 self.instructions = inst.get_object()
851 else:
852 self.instructions = None
853 if ann.next():
854 self.annotations = ann.get_object()
855 else:
856 self.annotations = None
857 if props.next():
858 self.properties = props.get_object()
859 else:
860 self.properties = None
861 if body.next():
862 self.body = body.get_object()
863 else:
864 self.body = None
865
867 """
868 Clears the contents of the L{Message}. All fields will be reset to
869 their default values.
870 """
871 pn_message_clear(self._msg)
872 self.instructions = None
873 self.annotations = None
874 self.properties = None
875 self.body = None
876
878 return pn_message_is_inferred(self._msg)
879
881 self._check(pn_message_set_inferred(self._msg, bool(value)))
882
883 inferred = property(_is_inferred, _set_inferred, doc="""
884 The inferred flag for a message indicates how the message content
885 is encoded into AMQP sections. If inferred is true then binary and
886 list values in the body of the message will be encoded as AMQP DATA
887 and AMQP SEQUENCE sections, respectively. If inferred is false,
888 then all values in the body of the message will be encoded as AMQP
889 VALUE sections regardless of their type.
890 """)
891
893 return pn_message_is_durable(self._msg)
894
896 self._check(pn_message_set_durable(self._msg, bool(value)))
897
898 durable = property(_is_durable, _set_durable,
899 doc="""
900 The durable property indicates that the message should be held durably
901 by any intermediaries taking responsibility for the message.
902 """)
903
905 return pn_message_get_priority(self._msg)
906
908 self._check(pn_message_set_priority(self._msg, value))
909
910 priority = property(_get_priority, _set_priority,
911 doc="""
912 The priority of the message.
913 """)
914
916 return millis2secs(pn_message_get_ttl(self._msg))
917
919 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
920
921 ttl = property(_get_ttl, _set_ttl,
922 doc="""
923 The time to live of the message measured in seconds. Expired messages
924 may be dropped.
925 """)
926
928 return pn_message_is_first_acquirer(self._msg)
929
931 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
932
933 first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
934 doc="""
935 True iff the recipient is the first to acquire the message.
936 """)
937
939 return pn_message_get_delivery_count(self._msg)
940
942 self._check(pn_message_set_delivery_count(self._msg, value))
943
944 delivery_count = property(_get_delivery_count, _set_delivery_count,
945 doc="""
946 The number of delivery attempts made for this message.
947 """)
948
949
957 id = property(_get_id, _set_id,
958 doc="""
959 The id of the message.
960 """)
961
963 return pn_message_get_user_id(self._msg)
964
966 self._check(pn_message_set_user_id(self._msg, value))
967
968 user_id = property(_get_user_id, _set_user_id,
969 doc="""
970 The user id of the message creator.
971 """)
972
974 return utf82unicode(pn_message_get_address(self._msg))
975
977 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
978
979 address = property(_get_address, _set_address,
980 doc="""
981 The address of the message.
982 """)
983
985 return pn_message_get_subject(self._msg)
986
988 self._check(pn_message_set_subject(self._msg, value))
989
990 subject = property(_get_subject, _set_subject,
991 doc="""
992 The subject of the message.
993 """)
994
996 return utf82unicode(pn_message_get_reply_to(self._msg))
997
999 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
1000
1001 reply_to = property(_get_reply_to, _set_reply_to,
1002 doc="""
1003 The reply-to address for the message.
1004 """)
1005
1013
1014 correlation_id = property(_get_correlation_id, _set_correlation_id,
1015 doc="""
1016 The correlation-id for the message.
1017 """)
1018
1020 return pn_message_get_content_type(self._msg)
1021
1022 - def _set_content_type(self, value):
1023 self._check(pn_message_set_content_type(self._msg, value))
1024
1025 content_type = property(_get_content_type, _set_content_type,
1026 doc="""
1027 The content-type of the message.
1028 """)
1029
1031 return pn_message_get_content_encoding(self._msg)
1032
1033 - def _set_content_encoding(self, value):
1034 self._check(pn_message_set_content_encoding(self._msg, value))
1035
1036 content_encoding = property(_get_content_encoding, _set_content_encoding,
1037 doc="""
1038 The content-encoding of the message.
1039 """)
1040
1042 return millis2secs(pn_message_get_expiry_time(self._msg))
1043
1045 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
1046
1047 expiry_time = property(_get_expiry_time, _set_expiry_time,
1048 doc="""
1049 The expiry time of the message.
1050 """)
1051
1053 return millis2secs(pn_message_get_creation_time(self._msg))
1054
1056 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
1057
1058 creation_time = property(_get_creation_time, _set_creation_time,
1059 doc="""
1060 The creation time of the message.
1061 """)
1062
1064 return pn_message_get_group_id(self._msg)
1065
1067 self._check(pn_message_set_group_id(self._msg, value))
1068
1069 group_id = property(_get_group_id, _set_group_id,
1070 doc="""
1071 The group id of the message.
1072 """)
1073
1075 return pn_message_get_group_sequence(self._msg)
1076
1078 self._check(pn_message_set_group_sequence(self._msg, value))
1079
1080 group_sequence = property(_get_group_sequence, _set_group_sequence,
1081 doc="""
1082 The sequence of the message within its group.
1083 """)
1084
1086 return pn_message_get_reply_to_group_id(self._msg)
1087
1089 self._check(pn_message_set_reply_to_group_id(self._msg, value))
1090
1091 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
1092 doc="""
1093 The group-id for any replies.
1094 """)
1095
1097 self._pre_encode()
1098 sz = 16
1099 while True:
1100 err, data = pn_message_encode(self._msg, sz)
1101 if err == PN_OVERFLOW:
1102 sz *= 2
1103 continue
1104 else:
1105 self._check(err)
1106 return data
1107
1109 self._check(pn_message_decode(self._msg, data))
1110 self._post_decode()
1111
1112 - def send(self, sender, tag=None):
1120
1121 - def recv(self, link):
1122 """
1123 Receives and decodes the message content for the current delivery
1124 from the link. Upon success it will return the current delivery
1125 for the link. If there is no current delivery, or if the current
1126 delivery is incomplete, or if the link is not a receiver, it will
1127 return None.
1128
1129 @type link: Link
1130 @param link: the link to receive a message from
1131 @return the delivery associated with the decoded message (or None)
1132
1133 """
1134 if link.is_sender: return None
1135 dlv = link.current
1136 if not dlv or dlv.partial: return None
1137 encoded = link.recv(dlv.pending)
1138 link.advance()
1139
1140
1141 if link.remote_snd_settle_mode == Link.SND_SETTLED:
1142 dlv.settle()
1143 self.decode(encoded)
1144 return dlv
1145
1147 props = []
1148 for attr in ("inferred", "address", "reply_to", "durable", "ttl",
1149 "priority", "first_acquirer", "delivery_count", "id",
1150 "correlation_id", "user_id", "group_id", "group_sequence",
1151 "reply_to_group_id", "instructions", "annotations",
1152 "properties", "body"):
1153 value = getattr(self, attr)
1154 if value: props.append("%s=%r" % (attr, value))
1155 return "Message(%s)" % ", ".join(props)
1156
1158 tmp = pn_string(None)
1159 err = pn_inspect(self._msg, tmp)
1160 result = pn_string_get(tmp)
1161 pn_free(tmp)
1162 self._check(err)
1163 return result
1164
1166
1169
1170 @property
1172 return pn_subscription_address(self._impl)
1173
1174 _DEFAULT = object()
1177
1178 @staticmethod
1180 if impl is None:
1181 return None
1182 else:
1183 return Selectable(impl)
1184
1187
1190
1192 if fd is _DEFAULT:
1193 return pn_selectable_get_fd(self._impl)
1194 elif fd is None:
1195 pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET)
1196 else:
1197 pn_selectable_set_fd(self._impl, fd)
1198
1200 return pn_selectable_is_reading(self._impl)
1201
1203 pn_selectable_set_reading(self._impl, bool(val))
1204
1205 reading = property(_is_reading, _set_reading)
1206
1208 return pn_selectable_is_writing(self._impl)
1209
1211 pn_selectable_set_writing(self._impl, bool(val))
1212
1213 writing = property(_is_writing, _set_writing)
1214
1216 tstamp = pn_selectable_get_deadline(self._impl)
1217 if tstamp:
1218 return millis2secs(tstamp)
1219 else:
1220 return None
1221
1223 pn_selectable_set_deadline(self._impl, secs2millis(deadline))
1224
1225 deadline = property(_get_deadline, _set_deadline)
1226
1228 pn_selectable_readable(self._impl)
1229
1231 pn_selectable_writable(self._impl)
1232
1234 pn_selectable_expired(self._impl)
1235
1237 return pn_selectable_is_registered(self._impl)
1238
1240 pn_selectable_set_registered(self._impl, registered)
1241
1242 registered = property(_is_registered, _set_registered,
1243 doc="""
1244 The registered property may be get/set by an I/O polling system to
1245 indicate whether the fd has been registered or not.
1246 """)
1247
1248 @property
1250 return pn_selectable_is_terminal(self._impl)
1251
1253 pn_selectable_terminate(self._impl)
1254
1256 pn_selectable_release(self._impl)
1257
1259 """
1260 The DataException class is the root of the Data exception hierarchy.
1261 All exceptions raised by the Data class extend this exception.
1262 """
1263 pass
1264
1266
1269
1271 return "UnmappedType(%s)" % self.msg
1272
1274
1276 return "ulong(%s)" % long.__repr__(self)
1277
1279
1281 return "timestamp(%s)" % long.__repr__(self)
1282
1284
1286 return "symbol(%s)" % unicode.__repr__(self)
1287
1288 -class char(unicode):
1289
1291 return "char(%s)" % unicode.__repr__(self)
1292
1294
1295 - def __init__(self, descriptor, value):
1296 self.descriptor = descriptor
1297 self.value = value
1298
1300 return "Described(%r, %r)" % (self.descriptor, self.value)
1301
1303 if isinstance(o, Described):
1304 return self.descriptor == o.descriptor and self.value == o.value
1305 else:
1306 return False
1307
1308 UNDESCRIBED = Constant("UNDESCRIBED")
1309
1310 -class Array(object):
1311
1312 - def __init__(self, descriptor, type, *elements):
1313 self.descriptor = descriptor
1314 self.type = type
1315 self.elements = elements
1316
1318 return iter(self.elements)
1319
1321 if self.elements:
1322 els = ", %s" % (", ".join(map(repr, self.elements)))
1323 else:
1324 els = ""
1325 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
1326
1328 if isinstance(o, Array):
1329 return self.descriptor == o.descriptor and \
1330 self.type == o.type and self.elements == o.elements
1331 else:
1332 return False
1333
1335 """
1336 The L{Data} class provides an interface for decoding, extracting,
1337 creating, and encoding arbitrary AMQP data. A L{Data} object
1338 contains a tree of AMQP values. Leaf nodes in this tree correspond
1339 to scalars in the AMQP type system such as L{ints<INT>} or
1340 L{strings<STRING>}. Non-leaf nodes in this tree correspond to
1341 compound values in the AMQP type system such as L{lists<LIST>},
1342 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}.
1343 The root node of the tree is the L{Data} object itself and can have
1344 an arbitrary number of children.
1345
1346 A L{Data} object maintains the notion of the current sibling node
1347 and a current parent node. Siblings are ordered within their parent.
1348 Values are accessed and/or added by using the L{next}, L{prev},
1349 L{enter}, and L{exit} methods to navigate to the desired location in
1350 the tree and using the supplied variety of put_*/get_* methods to
1351 access or add a value of the desired type.
1352
1353 The put_* methods will always add a value I{after} the current node
1354 in the tree. If the current node has a next sibling the put_* method
1355 will overwrite the value on this node. If there is no current node
1356 or the current node has no next sibling then one will be added. The
1357 put_* methods always set the added/modified node to the current
1358 node. The get_* methods read the value of the current node and do
1359 not change which node is current.
1360
1361 The following types of scalar values are supported:
1362
1363 - L{NULL}
1364 - L{BOOL}
1365 - L{UBYTE}
1366 - L{USHORT}
1367 - L{SHORT}
1368 - L{UINT}
1369 - L{INT}
1370 - L{ULONG}
1371 - L{LONG}
1372 - L{FLOAT}
1373 - L{DOUBLE}
1374 - L{BINARY}
1375 - L{STRING}
1376 - L{SYMBOL}
1377
1378 The following types of compound values are supported:
1379
1380 - L{DESCRIBED}
1381 - L{ARRAY}
1382 - L{LIST}
1383 - L{MAP}
1384 """
1385
1386 NULL = PN_NULL; "A null value."
1387 BOOL = PN_BOOL; "A boolean value."
1388 UBYTE = PN_UBYTE; "An unsigned byte value."
1389 BYTE = PN_BYTE; "A signed byte value."
1390 USHORT = PN_USHORT; "An unsigned short value."
1391 SHORT = PN_SHORT; "A short value."
1392 UINT = PN_UINT; "An unsigned int value."
1393 INT = PN_INT; "A signed int value."
1394 CHAR = PN_CHAR; "A character value."
1395 ULONG = PN_ULONG; "An unsigned long value."
1396 LONG = PN_LONG; "A signed long value."
1397 TIMESTAMP = PN_TIMESTAMP; "A timestamp value."
1398 FLOAT = PN_FLOAT; "A float value."
1399 DOUBLE = PN_DOUBLE; "A double value."
1400 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value."
1401 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value."
1402 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value."
1403 UUID = PN_UUID; "A UUID value."
1404 BINARY = PN_BINARY; "A binary string."
1405 STRING = PN_STRING; "A unicode string."
1406 SYMBOL = PN_SYMBOL; "A symbolic string."
1407 DESCRIBED = PN_DESCRIBED; "A described value."
1408 ARRAY = PN_ARRAY; "An array value."
1409 LIST = PN_LIST; "A list value."
1410 MAP = PN_MAP; "A map value."
1411
1412 type_names = {
1413 NULL: "null",
1414 BOOL: "bool",
1415 BYTE: "byte",
1416 UBYTE: "ubyte",
1417 SHORT: "short",
1418 USHORT: "ushort",
1419 INT: "int",
1420 UINT: "uint",
1421 CHAR: "char",
1422 LONG: "long",
1423 ULONG: "ulong",
1424 TIMESTAMP: "timestamp",
1425 FLOAT: "float",
1426 DOUBLE: "double",
1427 DECIMAL32: "decimal32",
1428 DECIMAL64: "decimal64",
1429 DECIMAL128: "decimal128",
1430 UUID: "uuid",
1431 BINARY: "binary",
1432 STRING: "string",
1433 SYMBOL: "symbol",
1434 DESCRIBED: "described",
1435 ARRAY: "array",
1436 LIST: "list",
1437 MAP: "map"
1438 }
1439
1440 @classmethod
1442
1450
1452 if self._free and hasattr(self, "_data"):
1453 pn_data_free(self._data)
1454 del self._data
1455
1457 if err < 0:
1458 exc = EXCEPTIONS.get(err, DataException)
1459 raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data))))
1460 else:
1461 return err
1462
1464 """
1465 Clears the data object.
1466 """
1467 pn_data_clear(self._data)
1468
1470 """
1471 Clears current node and sets the parent to the root node. Clearing the
1472 current node sets it _before_ the first node, calling next() will advance to
1473 the first node.
1474 """
1475 assert self._data is not None
1476 pn_data_rewind(self._data)
1477
1479 """
1480 Advances the current node to its next sibling and returns its
1481 type. If there is no next sibling the current node remains
1482 unchanged and None is returned.
1483 """
1484 found = pn_data_next(self._data)
1485 if found:
1486 return self.type()
1487 else:
1488 return None
1489
1491 """
1492 Advances the current node to its previous sibling and returns its
1493 type. If there is no previous sibling the current node remains
1494 unchanged and None is returned.
1495 """
1496 found = pn_data_prev(self._data)
1497 if found:
1498 return self.type()
1499 else:
1500 return None
1501
1503 """
1504 Sets the parent node to the current node and clears the current node.
1505 Clearing the current node sets it _before_ the first child,
1506 call next() advances to the first child.
1507 """
1508 return pn_data_enter(self._data)
1509
1511 """
1512 Sets the current node to the parent node and the parent node to
1513 its own parent.
1514 """
1515 return pn_data_exit(self._data)
1516
1518 return pn_data_lookup(self._data, name)
1519
1521 pn_data_narrow(self._data)
1522
1524 pn_data_widen(self._data)
1525
1527 """
1528 Returns the type of the current node.
1529 """
1530 dtype = pn_data_type(self._data)
1531 if dtype == -1:
1532 return None
1533 else:
1534 return dtype
1535
1537 """
1538 Returns the size in bytes needed to encode the data in AMQP format.
1539 """
1540 return pn_data_encoded_size(self._data)
1541
1543 """
1544 Returns a representation of the data encoded in AMQP format.
1545 """
1546 size = 1024
1547 while True:
1548 cd, enc = pn_data_encode(self._data, size)
1549 if cd == PN_OVERFLOW:
1550 size *= 2
1551 elif cd >= 0:
1552 return enc
1553 else:
1554 self._check(cd)
1555
1557 """
1558 Decodes the first value from supplied AMQP data and returns the
1559 number of bytes consumed.
1560
1561 @type encoded: binary
1562 @param encoded: AMQP encoded binary data
1563 """
1564 return self._check(pn_data_decode(self._data, encoded))
1565
1567 """
1568 Puts a list value. Elements may be filled by entering the list
1569 node and putting element values.
1570
1571 >>> data = Data()
1572 >>> data.put_list()
1573 >>> data.enter()
1574 >>> data.put_int(1)
1575 >>> data.put_int(2)
1576 >>> data.put_int(3)
1577 >>> data.exit()
1578 """
1579 self._check(pn_data_put_list(self._data))
1580
1582 """
1583 Puts a map value. Elements may be filled by entering the map node
1584 and putting alternating key value pairs.
1585
1586 >>> data = Data()
1587 >>> data.put_map()
1588 >>> data.enter()
1589 >>> data.put_string("key")
1590 >>> data.put_string("value")
1591 >>> data.exit()
1592 """
1593 self._check(pn_data_put_map(self._data))
1594
1595 - def put_array(self, described, element_type):
1596 """
1597 Puts an array value. Elements may be filled by entering the array
1598 node and putting the element values. The values must all be of the
1599 specified array element type. If an array is described then the
1600 first child value of the array is the descriptor and may be of any
1601 type.
1602
1603 >>> data = Data()
1604 >>>
1605 >>> data.put_array(False, Data.INT)
1606 >>> data.enter()
1607 >>> data.put_int(1)
1608 >>> data.put_int(2)
1609 >>> data.put_int(3)
1610 >>> data.exit()
1611 >>>
1612 >>> data.put_array(True, Data.DOUBLE)
1613 >>> data.enter()
1614 >>> data.put_symbol("array-descriptor")
1615 >>> data.put_double(1.1)
1616 >>> data.put_double(1.2)
1617 >>> data.put_double(1.3)
1618 >>> data.exit()
1619
1620 @type described: bool
1621 @param described: specifies whether the array is described
1622 @type element_type: int
1623 @param element_type: the type of the array elements
1624 """
1625 self._check(pn_data_put_array(self._data, described, element_type))
1626
1628 """
1629 Puts a described value. A described node has two children, the
1630 descriptor and the value. These are specified by entering the node
1631 and putting the desired values.
1632
1633 >>> data = Data()
1634 >>> data.put_described()
1635 >>> data.enter()
1636 >>> data.put_symbol("value-descriptor")
1637 >>> data.put_string("the value")
1638 >>> data.exit()
1639 """
1640 self._check(pn_data_put_described(self._data))
1641
1643 """
1644 Puts a null value.
1645 """
1646 self._check(pn_data_put_null(self._data))
1647
1649 """
1650 Puts a boolean value.
1651
1652 @param b: a boolean value
1653 """
1654 self._check(pn_data_put_bool(self._data, b))
1655
1657 """
1658 Puts an unsigned byte value.
1659
1660 @param ub: an integral value
1661 """
1662 self._check(pn_data_put_ubyte(self._data, ub))
1663
1665 """
1666 Puts a signed byte value.
1667
1668 @param b: an integral value
1669 """
1670 self._check(pn_data_put_byte(self._data, b))
1671
1673 """
1674 Puts an unsigned short value.
1675
1676 @param us: an integral value.
1677 """
1678 self._check(pn_data_put_ushort(self._data, us))
1679
1681 """
1682 Puts a signed short value.
1683
1684 @param s: an integral value
1685 """
1686 self._check(pn_data_put_short(self._data, s))
1687
1689 """
1690 Puts an unsigned int value.
1691
1692 @param ui: an integral value
1693 """
1694 self._check(pn_data_put_uint(self._data, ui))
1695
1697 """
1698 Puts a signed int value.
1699
1700 @param i: an integral value
1701 """
1702 self._check(pn_data_put_int(self._data, i))
1703
1705 """
1706 Puts a char value.
1707
1708 @param c: a single character
1709 """
1710 self._check(pn_data_put_char(self._data, ord(c)))
1711
1713 """
1714 Puts an unsigned long value.
1715
1716 @param ul: an integral value
1717 """
1718 self._check(pn_data_put_ulong(self._data, ul))
1719
1721 """
1722 Puts a signed long value.
1723
1724 @param l: an integral value
1725 """
1726 self._check(pn_data_put_long(self._data, l))
1727
1729 """
1730 Puts a timestamp value.
1731
1732 @param t: an integral value
1733 """
1734 self._check(pn_data_put_timestamp(self._data, t))
1735
1737 """
1738 Puts a float value.
1739
1740 @param f: a floating point value
1741 """
1742 self._check(pn_data_put_float(self._data, f))
1743
1745 """
1746 Puts a double value.
1747
1748 @param d: a floating point value.
1749 """
1750 self._check(pn_data_put_double(self._data, d))
1751
1753 """
1754 Puts a decimal32 value.
1755
1756 @param d: a decimal32 value
1757 """
1758 self._check(pn_data_put_decimal32(self._data, d))
1759
1761 """
1762 Puts a decimal64 value.
1763
1764 @param d: a decimal64 value
1765 """
1766 self._check(pn_data_put_decimal64(self._data, d))
1767
1769 """
1770 Puts a decimal128 value.
1771
1772 @param d: a decimal128 value
1773 """
1774 self._check(pn_data_put_decimal128(self._data, d))
1775
1777 """
1778 Puts a UUID value.
1779
1780 @param u: a uuid value
1781 """
1782 self._check(pn_data_put_uuid(self._data, u.bytes))
1783
1785 """
1786 Puts a binary value.
1787
1788 @type b: binary
1789 @param b: a binary value
1790 """
1791 self._check(pn_data_put_binary(self._data, b))
1792
1794 """
1795 Puts a unicode value.
1796
1797 @type s: unicode
1798 @param s: a unicode value
1799 """
1800 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1801
1803 """
1804 Puts a symbolic value.
1805
1806 @type s: string
1807 @param s: the symbol name
1808 """
1809 self._check(pn_data_put_symbol(self._data, s.encode('ascii')))
1810
1812 """
1813 If the current node is a list, return the number of elements,
1814 otherwise return zero. List elements can be accessed by entering
1815 the list.
1816
1817 >>> count = data.get_list()
1818 >>> data.enter()
1819 >>> for i in range(count):
1820 ... type = data.next()
1821 ... if type == Data.STRING:
1822 ... print data.get_string()
1823 ... elif type == ...:
1824 ... ...
1825 >>> data.exit()
1826 """
1827 return pn_data_get_list(self._data)
1828
1830 """
1831 If the current node is a map, return the number of child elements,
1832 otherwise return zero. Key value pairs can be accessed by entering
1833 the map.
1834
1835 >>> count = data.get_map()
1836 >>> data.enter()
1837 >>> for i in range(count/2):
1838 ... type = data.next()
1839 ... if type == Data.STRING:
1840 ... print data.get_string()
1841 ... elif type == ...:
1842 ... ...
1843 >>> data.exit()
1844 """
1845 return pn_data_get_map(self._data)
1846
1848 """
1849 If the current node is an array, return a tuple of the element
1850 count, a boolean indicating whether the array is described, and
1851 the type of each element, otherwise return (0, False, None). Array
1852 data can be accessed by entering the array.
1853
1854 >>> # read an array of strings with a symbolic descriptor
1855 >>> count, described, type = data.get_array()
1856 >>> data.enter()
1857 >>> data.next()
1858 >>> print "Descriptor:", data.get_symbol()
1859 >>> for i in range(count):
1860 ... data.next()
1861 ... print "Element:", data.get_string()
1862 >>> data.exit()
1863 """
1864 count = pn_data_get_array(self._data)
1865 described = pn_data_is_array_described(self._data)
1866 type = pn_data_get_array_type(self._data)
1867 if type == -1:
1868 type = None
1869 return count, described, type
1870
1872 """
1873 Checks if the current node is a described value. The descriptor
1874 and value may be accessed by entering the described value.
1875
1876 >>> # read a symbolically described string
1877 >>> assert data.is_described() # will error if the current node is not described
1878 >>> data.enter()
1879 >>> data.next()
1880 >>> print data.get_symbol()
1881 >>> data.next()
1882 >>> print data.get_string()
1883 >>> data.exit()
1884 """
1885 return pn_data_is_described(self._data)
1886
1888 """
1889 Checks if the current node is a null.
1890 """
1891 return pn_data_is_null(self._data)
1892
1894 """
1895 If the current node is a boolean, returns its value, returns False
1896 otherwise.
1897 """
1898 return pn_data_get_bool(self._data)
1899
1901 """
1902 If the current node is an unsigned byte, returns its value,
1903 returns 0 otherwise.
1904 """
1905 return pn_data_get_ubyte(self._data)
1906
1908 """
1909 If the current node is a signed byte, returns its value, returns 0
1910 otherwise.
1911 """
1912 return pn_data_get_byte(self._data)
1913
1915 """
1916 If the current node is an unsigned short, returns its value,
1917 returns 0 otherwise.
1918 """
1919 return pn_data_get_ushort(self._data)
1920
1922 """
1923 If the current node is a signed short, returns its value, returns
1924 0 otherwise.
1925 """
1926 return pn_data_get_short(self._data)
1927
1929 """
1930 If the current node is an unsigned int, returns its value, returns
1931 0 otherwise.
1932 """
1933 return pn_data_get_uint(self._data)
1934
1936 """
1937 If the current node is a signed int, returns its value, returns 0
1938 otherwise.
1939 """
1940 return int(pn_data_get_int(self._data))
1941
1943 """
1944 If the current node is a char, returns its value, returns 0
1945 otherwise.
1946 """
1947 return char(_compat.unichar(pn_data_get_char(self._data)))
1948
1950 """
1951 If the current node is an unsigned long, returns its value,
1952 returns 0 otherwise.
1953 """
1954 return ulong(pn_data_get_ulong(self._data))
1955
1957 """
1958 If the current node is an signed long, returns its value, returns
1959 0 otherwise.
1960 """
1961 return long(pn_data_get_long(self._data))
1962
1964 """
1965 If the current node is a timestamp, returns its value, returns 0
1966 otherwise.
1967 """
1968 return timestamp(pn_data_get_timestamp(self._data))
1969
1971 """
1972 If the current node is a float, returns its value, raises 0
1973 otherwise.
1974 """
1975 return pn_data_get_float(self._data)
1976
1978 """
1979 If the current node is a double, returns its value, returns 0
1980 otherwise.
1981 """
1982 return pn_data_get_double(self._data)
1983
1984
1986 """
1987 If the current node is a decimal32, returns its value, returns 0
1988 otherwise.
1989 """
1990 return pn_data_get_decimal32(self._data)
1991
1992
1994 """
1995 If the current node is a decimal64, returns its value, returns 0
1996 otherwise.
1997 """
1998 return pn_data_get_decimal64(self._data)
1999
2000
2002 """
2003 If the current node is a decimal128, returns its value, returns 0
2004 otherwise.
2005 """
2006 return pn_data_get_decimal128(self._data)
2007
2009 """
2010 If the current node is a UUID, returns its value, returns None
2011 otherwise.
2012 """
2013 if pn_data_type(self._data) == Data.UUID:
2014 return uuid.UUID(bytes=pn_data_get_uuid(self._data))
2015 else:
2016 return None
2017
2019 """
2020 If the current node is binary, returns its value, returns ""
2021 otherwise.
2022 """
2023 return pn_data_get_binary(self._data)
2024
2026 """
2027 If the current node is a string, returns its value, returns ""
2028 otherwise.
2029 """
2030 return pn_data_get_string(self._data).decode("utf8")
2031
2033 """
2034 If the current node is a symbol, returns its value, returns ""
2035 otherwise.
2036 """
2037 return symbol(pn_data_get_symbol(self._data).decode('ascii'))
2038
2039 - def copy(self, src):
2040 self._check(pn_data_copy(self._data, src._data))
2041
2052
2054 pn_data_dump(self._data)
2055
2065
2067 if self.enter():
2068 try:
2069 result = {}
2070 while self.next():
2071 k = self.get_object()
2072 if self.next():
2073 v = self.get_object()
2074 else:
2075 v = None
2076 result[k] = v
2077 finally:
2078 self.exit()
2079 return result
2080
2089
2091 if self.enter():
2092 try:
2093 result = []
2094 while self.next():
2095 result.append(self.get_object())
2096 finally:
2097 self.exit()
2098 return result
2099
2110
2119
2121 """
2122 If the current node is an array, return an Array object
2123 representing the array and its contents. Otherwise return None.
2124 This is a convenience wrapper around get_array, enter, etc.
2125 """
2126
2127 count, described, type = self.get_array()
2128 if type is None: return None
2129 if self.enter():
2130 try:
2131 if described:
2132 self.next()
2133 descriptor = self.get_object()
2134 else:
2135 descriptor = UNDESCRIBED
2136 elements = []
2137 while self.next():
2138 elements.append(self.get_object())
2139 finally:
2140 self.exit()
2141 return Array(descriptor, type, *elements)
2142
2154
2155 put_mappings = {
2156 None.__class__: lambda s, _: s.put_null(),
2157 bool: put_bool,
2158 dict: put_dict,
2159 list: put_sequence,
2160 tuple: put_sequence,
2161 unicode: put_string,
2162 bytes: put_binary,
2163 symbol: put_symbol,
2164 long: put_long,
2165 char: put_char,
2166 ulong: put_ulong,
2167 timestamp: put_timestamp,
2168 float: put_double,
2169 uuid.UUID: put_uuid,
2170 Described: put_py_described,
2171 Array: put_py_array
2172 }
2173
2174
2175 if int not in put_mappings:
2176 put_mappings[int] = put_int
2177
2178 get_mappings = {
2179 NULL: lambda s: None,
2180 BOOL: get_bool,
2181 BYTE: get_byte,
2182 UBYTE: get_ubyte,
2183 SHORT: get_short,
2184 USHORT: get_ushort,
2185 INT: get_int,
2186 UINT: get_uint,
2187 CHAR: get_char,
2188 LONG: get_long,
2189 ULONG: get_ulong,
2190 TIMESTAMP: get_timestamp,
2191 FLOAT: get_float,
2192 DOUBLE: get_double,
2193 DECIMAL32: get_decimal32,
2194 DECIMAL64: get_decimal64,
2195 DECIMAL128: get_decimal128,
2196 UUID: get_uuid,
2197 BINARY: get_binary,
2198 STRING: get_string,
2199 SYMBOL: get_symbol,
2200 DESCRIBED: get_py_described,
2201 ARRAY: get_py_array,
2202 LIST: get_sequence,
2203 MAP: get_dict
2204 }
2205
2206
2208 putter = self.put_mappings[obj.__class__]
2209 putter(self, obj)
2210
2212 type = self.type()
2213 if type is None: return None
2214 getter = self.get_mappings.get(type)
2215 if getter:
2216 return getter(self)
2217 else:
2218 return UnmappedType(str(type))
2219
2222
2276
2278
2279 - def __init__(self, name, description=None, info=None):
2280 self.name = name
2281 self.description = description
2282 self.info = info
2283
2285 return "Condition(%s)" % ", ".join([repr(x) for x in
2286 (self.name, self.description, self.info)
2287 if x])
2288
2290 if not isinstance(o, Condition): return False
2291 return self.name == o.name and \
2292 self.description == o.description and \
2293 self.info == o.info
2294
2296 pn_condition_clear(cond)
2297 if obj:
2298 pn_condition_set_name(cond, str(obj.name))
2299 pn_condition_set_description(cond, obj.description)
2300 info = Data(pn_condition_info(cond))
2301 if obj.info:
2302 info.put_object(obj.info)
2303
2305 if pn_condition_is_set(cond):
2306 return Condition(pn_condition_get_name(cond),
2307 pn_condition_get_description(cond),
2308 dat2obj(pn_condition_info(cond)))
2309 else:
2310 return None
2311
2320
2325
2327 return long(secs*1000)
2328
2330 return float(millis)/1000.0
2331
2333 if secs is None: return PN_MILLIS_MAX
2334 return secs2millis(secs)
2335
2337 if millis == PN_MILLIS_MAX: return None
2338 return millis2secs(millis)
2339
2341 """Some Proton APIs expect a null terminated string. Convert python text
2342 types to UTF8 to avoid zero bytes introduced by other multi-byte encodings.
2343 This method will throw if the string cannot be converted.
2344 """
2345 if string is None:
2346 return None
2347 if _compat.IS_PY2:
2348 if isinstance(string, unicode):
2349 return string.encode('utf-8')
2350 elif isinstance(string, str):
2351 return string
2352 else:
2353
2354 if isinstance(string, str):
2355 string = string.encode('utf-8')
2356
2357 if isinstance(string, bytes):
2358 return string.decode('utf-8')
2359 raise TypeError("Unrecognized string type: %r (%s)" % (string, type(string)))
2360
2362 """Covert C strings returned from proton-c into python unicode"""
2363 if string is None:
2364 return None
2365 if isinstance(string, _compat.TEXT_TYPES):
2366
2367 return string
2368 elif isinstance(string, _compat.BINARY_TYPES):
2369 return string.decode('utf8')
2370 else:
2371 raise TypeError("Unrecognized string type")
2372
2374 """
2375 A representation of an AMQP connection
2376 """
2377
2378 @staticmethod
2380 if impl is None:
2381 return None
2382 else:
2383 return Connection(impl)
2384
2385 - def __init__(self, impl = pn_connection):
2387
2389 Endpoint._init(self)
2390 self.offered_capabilities = None
2391 self.desired_capabilities = None
2392 self.properties = None
2393
2395 return pn_connection_attachments(self._impl)
2396
2397 @property
2400
2401 @property
2404
2406 if err < 0:
2407 exc = EXCEPTIONS.get(err, ConnectionException)
2408 raise exc("[%s]: %s" % (err, pn_connection_error(self._impl)))
2409 else:
2410 return err
2411
2413 return pn_connection_condition(self._impl)
2414
2416 return pn_connection_remote_condition(self._impl)
2417
2419 if collector is None:
2420 pn_connection_collect(self._impl, None)
2421 else:
2422 pn_connection_collect(self._impl, collector._impl)
2423 self._collector = weakref.ref(collector)
2424
2426 return utf82unicode(pn_connection_get_container(self._impl))
2428 return pn_connection_set_container(self._impl, unicode2utf8(name))
2429
2430 container = property(_get_container, _set_container)
2431
2433 return utf82unicode(pn_connection_get_hostname(self._impl))
2435 return pn_connection_set_hostname(self._impl, unicode2utf8(name))
2436
2437 hostname = property(_get_hostname, _set_hostname)
2438
2440 return utf82unicode(pn_connection_get_user(self._impl))
2442 return pn_connection_set_user(self._impl, unicode2utf8(name))
2443
2444 user = property(_get_user, _set_user)
2445
2449 return pn_connection_set_password(self._impl, unicode2utf8(name))
2450
2451 password = property(_get_password, _set_password)
2452
2453 @property
2455 """The container identifier specified by the remote peer for this connection."""
2456 return pn_connection_remote_container(self._impl)
2457
2458 @property
2460 """The hostname specified by the remote peer for this connection."""
2461 return pn_connection_remote_hostname(self._impl)
2462
2463 @property
2465 """The capabilities offered by the remote peer for this connection."""
2466 return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
2467
2468 @property
2470 """The capabilities desired by the remote peer for this connection."""
2471 return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
2472
2473 @property
2475 """The properties specified by the remote peer for this connection."""
2476 return dat2obj(pn_connection_remote_properties(self._impl))
2477
2479 """
2480 Opens the connection.
2481
2482 In more detail, this moves the local state of the connection to
2483 the ACTIVE state and triggers an open frame to be sent to the
2484 peer. A connection is fully active once both peers have opened it.
2485 """
2486 obj2dat(self.offered_capabilities,
2487 pn_connection_offered_capabilities(self._impl))
2488 obj2dat(self.desired_capabilities,
2489 pn_connection_desired_capabilities(self._impl))
2490 obj2dat(self.properties, pn_connection_properties(self._impl))
2491 pn_connection_open(self._impl)
2492
2494 """
2495 Closes the connection.
2496
2497 In more detail, this moves the local state of the connection to
2498 the CLOSED state and triggers a close frame to be sent to the
2499 peer. A connection is fully closed once both peers have closed it.
2500 """
2501 self._update_cond()
2502 pn_connection_close(self._impl)
2503
2504 @property
2506 """
2507 The state of the connection as a bit field. The state has a local
2508 and a remote component. Each of these can be in one of three
2509 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
2510 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
2511 REMOTE_ACTIVE and REMOTE_CLOSED.
2512 """
2513 return pn_connection_state(self._impl)
2514
2516 """
2517 Returns a new session on this connection.
2518 """
2519 ssn = pn_session(self._impl)
2520 if ssn is None:
2521 raise(SessionException("Session allocation failed."))
2522 else:
2523 return Session(ssn)
2524
2526 return Session.wrap(pn_session_head(self._impl, mask))
2527
2529 return Link.wrap(pn_link_head(self._impl, mask))
2530
2531 @property
2534
2535 @property
2537 return pn_error_code(pn_connection_error(self._impl))
2538
2540 pn_connection_release(self._impl)
2541
2544
2546
2547 @staticmethod
2549 if impl is None:
2550 return None
2551 else:
2552 return Session(impl)
2553
2556
2558 return pn_session_attachments(self._impl)
2559
2561 return pn_session_condition(self._impl)
2562
2564 return pn_session_remote_condition(self._impl)
2565
2567 return pn_session_get_incoming_capacity(self._impl)
2568
2570 pn_session_set_incoming_capacity(self._impl, capacity)
2571
2572 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
2573
2575 return pn_session_get_outgoing_window(self._impl)
2576
2578 pn_session_set_outgoing_window(self._impl, window)
2579
2580 outgoing_window = property(_get_outgoing_window, _set_outgoing_window)
2581
2582 @property
2584 return pn_session_outgoing_bytes(self._impl)
2585
2586 @property
2588 return pn_session_incoming_bytes(self._impl)
2589
2591 pn_session_open(self._impl)
2592
2594 self._update_cond()
2595 pn_session_close(self._impl)
2596
2597 - def next(self, mask):
2598 return Session.wrap(pn_session_next(self._impl, mask))
2599
2600 @property
2602 return pn_session_state(self._impl)
2603
2604 @property
2607
2609 return Sender(pn_sender(self._impl, unicode2utf8(name)))
2610
2612 return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
2613
2615 pn_session_free(self._impl)
2616
2619
2620 -class Link(Wrapper, Endpoint):
2621 """
2622 A representation of an AMQP link, of which there are two concrete
2623 implementations, Sender and Receiver.
2624 """
2625
2626 SND_UNSETTLED = PN_SND_UNSETTLED
2627 SND_SETTLED = PN_SND_SETTLED
2628 SND_MIXED = PN_SND_MIXED
2629
2630 RCV_FIRST = PN_RCV_FIRST
2631 RCV_SECOND = PN_RCV_SECOND
2632
2633 @staticmethod
2635 if impl is None: return None
2636 if pn_link_is_sender(impl):
2637 return Sender(impl)
2638 else:
2639 return Receiver(impl)
2640
2643
2645 return pn_link_attachments(self._impl)
2646
2648 if err < 0:
2649 exc = EXCEPTIONS.get(err, LinkException)
2650 raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl))))
2651 else:
2652 return err
2653
2655 return pn_link_condition(self._impl)
2656
2658 return pn_link_remote_condition(self._impl)
2659
2661 """
2662 Opens the link.
2663
2664 In more detail, this moves the local state of the link to the
2665 ACTIVE state and triggers an attach frame to be sent to the
2666 peer. A link is fully active once both peers have attached it.
2667 """
2668 pn_link_open(self._impl)
2669
2671 """
2672 Closes the link.
2673
2674 In more detail, this moves the local state of the link to the
2675 CLOSED state and triggers an detach frame (with the closed flag
2676 set) to be sent to the peer. A link is fully closed once both
2677 peers have detached it.
2678 """
2679 self._update_cond()
2680 pn_link_close(self._impl)
2681
2682 @property
2684 """
2685 The state of the link as a bit field. The state has a local
2686 and a remote component. Each of these can be in one of three
2687 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
2688 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
2689 REMOTE_ACTIVE and REMOTE_CLOSED.
2690 """
2691 return pn_link_state(self._impl)
2692
2693 @property
2695 """The source of the link as described by the local peer."""
2696 return Terminus(pn_link_source(self._impl))
2697
2698 @property
2700 """The target of the link as described by the local peer."""
2701 return Terminus(pn_link_target(self._impl))
2702
2703 @property
2705 """The source of the link as described by the remote peer."""
2706 return Terminus(pn_link_remote_source(self._impl))
2707 @property
2709 """The target of the link as described by the remote peer."""
2710 return Terminus(pn_link_remote_target(self._impl))
2711
2712 @property
2715
2716 @property
2718 """The connection on which this link was attached."""
2719 return self.session.connection
2720
2723
2724 @property
2727
2729 return pn_link_advance(self._impl)
2730
2731 @property
2733 return pn_link_unsettled(self._impl)
2734
2735 @property
2737 """The amount of oustanding credit on this link."""
2738 return pn_link_credit(self._impl)
2739
2740 @property
2742 return pn_link_available(self._impl)
2743
2744 @property
2746 return pn_link_queued(self._impl)
2747
2748 - def next(self, mask):
2749 return Link.wrap(pn_link_next(self._impl, mask))
2750
2751 @property
2753 """Returns the name of the link"""
2754 return utf82unicode(pn_link_name(self._impl))
2755
2756 @property
2758 """Returns true if this link is a sender."""
2759 return pn_link_is_sender(self._impl)
2760
2761 @property
2763 """Returns true if this link is a receiver."""
2764 return pn_link_is_receiver(self._impl)
2765
2766 @property
2768 return pn_link_remote_snd_settle_mode(self._impl)
2769
2770 @property
2772 return pn_link_remote_rcv_settle_mode(self._impl)
2773
2775 return pn_link_snd_settle_mode(self._impl)
2777 pn_link_set_snd_settle_mode(self._impl, mode)
2778 snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode)
2779
2781 return pn_link_rcv_settle_mode(self._impl)
2783 pn_link_set_rcv_settle_mode(self._impl, mode)
2784 rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode)
2785
2787 return pn_link_get_drain(self._impl)
2788
2790 pn_link_set_drain(self._impl, bool(b))
2791
2792 drain_mode = property(_get_drain, _set_drain)
2793
2795 return pn_link_drained(self._impl)
2796
2798 return pn_link_detach(self._impl)
2799
2801 pn_link_free(self._impl)
2802
2804
2805 UNSPECIFIED = PN_UNSPECIFIED
2806 SOURCE = PN_SOURCE
2807 TARGET = PN_TARGET
2808 COORDINATOR = PN_COORDINATOR
2809
2810 NONDURABLE = PN_NONDURABLE
2811 CONFIGURATION = PN_CONFIGURATION
2812 DELIVERIES = PN_DELIVERIES
2813
2814 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
2815 DIST_MODE_COPY = PN_DIST_MODE_COPY
2816 DIST_MODE_MOVE = PN_DIST_MODE_MOVE
2817
2818 EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK
2819 EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION
2820 EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION
2821 EXPIRE_NEVER = PN_EXPIRE_NEVER
2822
2825
2827 if err < 0:
2828 exc = EXCEPTIONS.get(err, LinkException)
2829 raise exc("[%s]" % err)
2830 else:
2831 return err
2832
2834 return pn_terminus_get_type(self._impl)
2836 self._check(pn_terminus_set_type(self._impl, type))
2837 type = property(_get_type, _set_type)
2838
2840 """The address that identifies the source or target node"""
2841 return utf82unicode(pn_terminus_get_address(self._impl))
2843 self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
2844 address = property(_get_address, _set_address)
2845
2847 return pn_terminus_get_durability(self._impl)
2849 self._check(pn_terminus_set_durability(self._impl, seconds))
2850 durability = property(_get_durability, _set_durability)
2851
2853 return pn_terminus_get_expiry_policy(self._impl)
2855 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2856 expiry_policy = property(_get_expiry_policy, _set_expiry_policy)
2857
2859 return pn_terminus_get_timeout(self._impl)
2861 self._check(pn_terminus_set_timeout(self._impl, seconds))
2862 timeout = property(_get_timeout, _set_timeout)
2863
2865 """Indicates whether the source or target node was dynamically
2866 created"""
2867 return pn_terminus_is_dynamic(self._impl)
2869 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2870 dynamic = property(_is_dynamic, _set_dynamic)
2871
2873 return pn_terminus_get_distribution_mode(self._impl)
2875 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2876 distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
2877
2878 @property
2880 """Properties of a dynamic source or target."""
2881 return Data(pn_terminus_properties(self._impl))
2882
2883 @property
2885 """Capabilities of the source or target."""
2886 return Data(pn_terminus_capabilities(self._impl))
2887
2888 @property
2890 return Data(pn_terminus_outcomes(self._impl))
2891
2892 @property
2894 """A filter on a source allows the set of messages transfered over
2895 the link to be restricted"""
2896 return Data(pn_terminus_filter(self._impl))
2897
2898 - def copy(self, src):
2899 self._check(pn_terminus_copy(self._impl, src._impl))
2900
2902 """
2903 A link over which messages are sent.
2904 """
2905
2907 pn_link_offered(self._impl, n)
2908
2910 """
2911 Send specified data as part of the current delivery
2912
2913 @type data: binary
2914 @param data: data to send
2915 """
2916 return self._check(pn_link_send(self._impl, data))
2917
2918 - def send(self, obj, tag=None):
2919 """
2920 Send specified object over this sender; the object is expected to
2921 have a send() method on it that takes the sender and an optional
2922 tag as arguments.
2923
2924 Where the object is a Message, this will send the message over
2925 this link, creating a new delivery for the purpose.
2926 """
2927 if hasattr(obj, 'send'):
2928 return obj.send(self, tag=tag)
2929 else:
2930
2931 return self.stream(obj)
2932
2934 if not hasattr(self, 'tag_generator'):
2935 def simple_tags():
2936 count = 1
2937 while True:
2938 yield str(count)
2939 count += 1
2940 self.tag_generator = simple_tags()
2941 return next(self.tag_generator)
2942
2944 """
2945 A link over which messages are received.
2946 """
2947
2948 - def flow(self, n):
2949 """Increases the credit issued to the remote sender by the specified number of messages."""
2950 pn_link_flow(self._impl, n)
2951
2952 - def recv(self, limit):
2953 n, binary = pn_link_recv(self._impl, limit)
2954 if n == PN_EOS:
2955 return None
2956 else:
2957 self._check(n)
2958 return binary
2959
2961 pn_link_drain(self._impl, n)
2962
2964 return pn_link_draining(self._impl)
2965
2967
2968 values = {}
2969
2971 ni = super(NamedInt, cls).__new__(cls, i)
2972 cls.values[i] = ni
2973 return ni
2974
2977
2980
2983
2984 @classmethod
2986 return cls.values.get(i, i)
2987
2990
2992
2993 RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED")
2994 ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED")
2995 REJECTED = DispositionType(PN_REJECTED, "REJECTED")
2996 RELEASED = DispositionType(PN_RELEASED, "RELEASED")
2997 MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED")
2998
3000 self._impl = impl
3001 self.local = local
3002 self._data = None
3003 self._condition = None
3004 self._annotations = None
3005
3006 @property
3008 return DispositionType.get(pn_disposition_type(self._impl))
3009
3011 return pn_disposition_get_section_number(self._impl)
3013 pn_disposition_set_section_number(self._impl, n)
3014 section_number = property(_get_section_number, _set_section_number)
3015
3017 return pn_disposition_get_section_offset(self._impl)
3019 pn_disposition_set_section_offset(self._impl, n)
3020 section_offset = property(_get_section_offset, _set_section_offset)
3021
3023 return pn_disposition_is_failed(self._impl)
3025 pn_disposition_set_failed(self._impl, b)
3026 failed = property(_get_failed, _set_failed)
3027
3029 return pn_disposition_is_undeliverable(self._impl)
3031 pn_disposition_set_undeliverable(self._impl, b)
3032 undeliverable = property(_get_undeliverable, _set_undeliverable)
3033
3035 if self.local:
3036 return self._data
3037 else:
3038 return dat2obj(pn_disposition_data(self._impl))
3040 if self.local:
3041 self._data = obj
3042 else:
3043 raise AttributeError("data attribute is read-only")
3044 data = property(_get_data, _set_data)
3045
3047 if self.local:
3048 return self._annotations
3049 else:
3050 return dat2obj(pn_disposition_annotations(self._impl))
3052 if self.local:
3053 self._annotations = obj
3054 else:
3055 raise AttributeError("annotations attribute is read-only")
3056 annotations = property(_get_annotations, _set_annotations)
3057
3059 if self.local:
3060 return self._condition
3061 else:
3062 return cond2obj(pn_disposition_condition(self._impl))
3064 if self.local:
3065 self._condition = obj
3066 else:
3067 raise AttributeError("condition attribute is read-only")
3068 condition = property(_get_condition, _set_condition)
3069
3071 """
3072 Tracks and/or records the delivery of a message over a link.
3073 """
3074
3075 RECEIVED = Disposition.RECEIVED
3076 ACCEPTED = Disposition.ACCEPTED
3077 REJECTED = Disposition.REJECTED
3078 RELEASED = Disposition.RELEASED
3079 MODIFIED = Disposition.MODIFIED
3080
3081 @staticmethod
3083 if impl is None:
3084 return None
3085 else:
3086 return Delivery(impl)
3087
3090
3092 self.local = Disposition(pn_delivery_local(self._impl), True)
3093 self.remote = Disposition(pn_delivery_remote(self._impl), False)
3094
3095 @property
3097 """The identifier for the delivery."""
3098 return pn_delivery_tag(self._impl)
3099
3100 @property
3102 """Returns true for an outgoing delivery to which data can now be written."""
3103 return pn_delivery_writable(self._impl)
3104
3105 @property
3107 """Returns true for an incoming delivery that has data to read."""
3108 return pn_delivery_readable(self._impl)
3109
3110 @property
3112 """Returns true if the state of the delivery has been updated
3113 (e.g. it has been settled and/or accepted, rejected etc)."""
3114 return pn_delivery_updated(self._impl)
3115
3117 """
3118 Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED.
3119 """
3120 obj2dat(self.local._data, pn_disposition_data(self.local._impl))
3121 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl))
3122 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl))
3123 pn_delivery_update(self._impl, state)
3124
3125 @property
3127 return pn_delivery_pending(self._impl)
3128
3129 @property
3131 """
3132 Returns true for an incoming delivery if not all the data is
3133 yet available.
3134 """
3135 return pn_delivery_partial(self._impl)
3136
3137 @property
3139 """Returns the local state of the delivery."""
3140 return DispositionType.get(pn_delivery_local_state(self._impl))
3141
3142 @property
3144 """
3145 Returns the state of the delivery as indicated by the remote
3146 peer.
3147 """
3148 return DispositionType.get(pn_delivery_remote_state(self._impl))
3149
3150 @property
3152 """
3153 Returns true if the delivery has been settled by the remote peer.
3154 """
3155 return pn_delivery_settled(self._impl)
3156
3158 """
3159 Settles the delivery locally. This indicates the aplication
3160 considers the delivery complete and does not wish to receive any
3161 further events about it. Every delivery should be settled locally.
3162 """
3163 pn_delivery_settle(self._impl)
3164
3165 @property
3168
3169 @property
3171 """
3172 Returns the link on which the delivery was sent or received.
3173 """
3174 return Link.wrap(pn_delivery_link(self._impl))
3175
3176 @property
3178 """
3179 Returns the session over which the delivery was sent or received.
3180 """
3181 return self.link.session
3182
3183 @property
3185 """
3186 Returns the connection over which the delivery was sent or received.
3187 """
3188 return self.session.connection
3189
3190 @property
3193
3196
3198
3201
3202 - def __call__(self, trans_impl, message):
3204
3206
3207 TRACE_OFF = PN_TRACE_OFF
3208 TRACE_DRV = PN_TRACE_DRV
3209 TRACE_FRM = PN_TRACE_FRM
3210 TRACE_RAW = PN_TRACE_RAW
3211
3212 CLIENT = 1
3213 SERVER = 2
3214
3215 @staticmethod
3217 if impl is None:
3218 return None
3219 else:
3220 return Transport(_impl=impl)
3221
3222 - def __init__(self, mode=None, _impl = pn_transport):
3230
3232 self._sasl = None
3233 self._ssl = None
3234
3236 if err < 0:
3237 exc = EXCEPTIONS.get(err, TransportException)
3238 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl))))
3239 else:
3240 return err
3241
3243 pn_transport_set_pytracer(self._impl, TraceAdapter(tracer));
3244
3246 adapter = pn_transport_get_pytracer(self._impl)
3247 if adapter:
3248 return adapter.tracer
3249 else:
3250 return None
3251
3252 tracer = property(_get_tracer, _set_tracer,
3253 doc="""
3254 A callback for trace logging. The callback is passed the transport and log message.
3255 """)
3256
3257 - def log(self, message):
3258 pn_transport_log(self._impl, message)
3259
3261 pn_transport_require_auth(self._impl, bool)
3262
3263 @property
3265 return pn_transport_is_authenticated(self._impl)
3266
3268 pn_transport_require_encryption(self._impl, bool)
3269
3270 @property
3272 return pn_transport_is_encrypted(self._impl)
3273
3274 @property
3276 return pn_transport_get_user(self._impl)
3277
3278 - def bind(self, connection):
3279 """Assign a connection to the transport"""
3280 self._check(pn_transport_bind(self._impl, connection._impl))
3281
3283 """Release the connection"""
3284 self._check(pn_transport_unbind(self._impl))
3285
3287 pn_transport_trace(self._impl, n)
3288
3289 - def tick(self, now):
3290 """Process any timed events (like heartbeat generation).
3291 now = seconds since epoch (float).
3292 """
3293 return millis2secs(pn_transport_tick(self._impl, secs2millis(now)))
3294
3296 c = pn_transport_capacity(self._impl)
3297 if c >= PN_EOS:
3298 return c
3299 else:
3300 return self._check(c)
3301
3302 - def push(self, binary):
3303 n = self._check(pn_transport_push(self._impl, binary))
3304 if n != len(binary):
3305 raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary)))
3306
3308 self._check(pn_transport_close_tail(self._impl))
3309
3311 p = pn_transport_pending(self._impl)
3312 if p >= PN_EOS:
3313 return p
3314 else:
3315 return self._check(p)
3316
3317 - def peek(self, size):
3318 cd, out = pn_transport_peek(self._impl, size)
3319 if cd == PN_EOS:
3320 return None
3321 else:
3322 self._check(cd)
3323 return out
3324
3325 - def pop(self, size):
3326 pn_transport_pop(self._impl, size)
3327
3329 self._check(pn_transport_close_head(self._impl))
3330
3331 @property
3333 return pn_transport_closed(self._impl)
3334
3335
3337 return pn_transport_get_max_frame(self._impl)
3338
3340 pn_transport_set_max_frame(self._impl, value)
3341
3342 max_frame_size = property(_get_max_frame_size, _set_max_frame_size,
3343 doc="""
3344 Sets the maximum size for received frames (in bytes).
3345 """)
3346
3347 @property
3349 return pn_transport_get_remote_max_frame(self._impl)
3350
3352 return pn_transport_get_channel_max(self._impl)
3353
3355 if pn_transport_set_channel_max(self._impl, value):
3356 raise SessionException("Too late to change channel max.")
3357
3358 channel_max = property(_get_channel_max, _set_channel_max,
3359 doc="""
3360 Sets the maximum channel that may be used on the transport.
3361 """)
3362
3363 @property
3365 return pn_transport_remote_channel_max(self._impl)
3366
3367
3369 return millis2secs(pn_transport_get_idle_timeout(self._impl))
3370
3372 pn_transport_set_idle_timeout(self._impl, secs2millis(sec))
3373
3374 idle_timeout = property(_get_idle_timeout, _set_idle_timeout,
3375 doc="""
3376 The idle timeout of the connection (float, in seconds).
3377 """)
3378
3379 @property
3381 return millis2secs(pn_transport_get_remote_idle_timeout(self._impl))
3382
3383 @property
3385 return pn_transport_get_frames_output(self._impl)
3386
3387 @property
3390
3393
3394 - def ssl(self, domain=None, session_details=None):
3395
3396 if not self._ssl:
3397 self._ssl = SSL(self, domain, session_details)
3398 return self._ssl
3399
3400 @property
3402 return cond2obj(pn_transport_condition(self._impl))
3403
3404 @property
3407
3410
3411 -class SASL(Wrapper):
3412
3413 OK = PN_SASL_OK
3414 AUTH = PN_SASL_AUTH
3415 SYS = PN_SASL_SYS
3416 PERM = PN_SASL_PERM
3417 TEMP = PN_SASL_TEMP
3418
3419 @staticmethod
3421 return pn_sasl_extended()
3422
3426
3428 if err < 0:
3429 exc = EXCEPTIONS.get(err, SASLException)
3430 raise exc("[%s]" % (err))
3431 else:
3432 return err
3433
3434 @property
3436 return pn_sasl_get_user(self._sasl)
3437
3438 @property
3440 return pn_sasl_get_mech(self._sasl)
3441
3442 @property
3449
3451 pn_sasl_allowed_mechs(self._sasl, mechs)
3452
3454 return pn_sasl_get_allow_insecure_mechs(self._sasl)
3455
3457 pn_sasl_set_allow_insecure_mechs(self._sasl, insecure)
3458
3459 allow_insecure_mechs = property(_get_allow_insecure_mechs, _set_allow_insecure_mechs,
3460 doc="""
3461 Allow unencrypted cleartext passwords (PLAIN mech)
3462 """)
3463
3464 - def done(self, outcome):
3465 pn_sasl_done(self._sasl, outcome)
3466
3468 pn_sasl_config_name(self._sasl, name)
3469
3471 pn_sasl_config_path(self._sasl, path)
3472
3475
3478
3479 -class SSLDomain(object):
3480
3481 MODE_CLIENT = PN_SSL_MODE_CLIENT
3482 MODE_SERVER = PN_SSL_MODE_SERVER
3483 VERIFY_PEER = PN_SSL_VERIFY_PEER
3484 VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME
3485 ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER
3486
3487 - def __init__(self, mode):
3488 self._domain = pn_ssl_domain(mode)
3489 if self._domain is None:
3490 raise SSLUnavailable()
3491
3492 - def _check(self, err):
3493 if err < 0:
3494 exc = EXCEPTIONS.get(err, SSLException)
3495 raise exc("SSL failure.")
3496 else:
3497 return err
3498
3499 - def set_credentials(self, cert_file, key_file, password):
3500 return self._check( pn_ssl_domain_set_credentials(self._domain,
3501 cert_file, key_file,
3502 password) )
3503 - def set_trusted_ca_db(self, certificate_db):
3504 return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain,
3505 certificate_db) )
3506 - def set_peer_authentication(self, verify_mode, trusted_CAs=None):
3507 return self._check( pn_ssl_domain_set_peer_authentication(self._domain,
3508 verify_mode,
3509 trusted_CAs) )
3510
3512 return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
3513
3514 - def __del__(self):
3515 pn_ssl_domain_free(self._domain)
3516
3518
3519 @staticmethod
3521 return pn_ssl_present()
3522
3529
3530 - def __new__(cls, transport, domain, session_details=None):
3531 """Enforce a singleton SSL object per Transport"""
3532 if transport._ssl:
3533
3534
3535
3536 ssl = transport._ssl
3537 if (domain and (ssl._domain is not domain) or
3538 session_details and (ssl._session_details is not session_details)):
3539 raise SSLException("Cannot re-configure existing SSL object!")
3540 else:
3541 obj = super(SSL, cls).__new__(cls)
3542 obj._domain = domain
3543 obj._session_details = session_details
3544 session_id = None
3545 if session_details:
3546 session_id = session_details.get_session_id()
3547 obj._ssl = pn_ssl( transport._impl )
3548 if obj._ssl is None:
3549 raise SSLUnavailable()
3550 if domain:
3551 pn_ssl_init( obj._ssl, domain._domain, session_id )
3552 transport._ssl = obj
3553 return transport._ssl
3554
3556 rc, name = pn_ssl_get_cipher_name( self._ssl, 128 )
3557 if rc:
3558 return name
3559 return None
3560
3562 rc, name = pn_ssl_get_protocol_name( self._ssl, 128 )
3563 if rc:
3564 return name
3565 return None
3566
3567 @property
3569 return pn_ssl_get_remote_subject( self._ssl )
3570
3571 RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN
3572 RESUME_NEW = PN_SSL_RESUME_NEW
3573 RESUME_REUSED = PN_SSL_RESUME_REUSED
3574
3576 return pn_ssl_resume_status( self._ssl )
3577
3579 self._check(pn_ssl_set_peer_hostname( self._ssl, unicode2utf8(hostname) ))
3581 err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 )
3582 self._check(err)
3583 return utf82unicode(name)
3584 peer_hostname = property(_get_peer_hostname, _set_peer_hostname,
3585 doc="""
3586 Manage the expected name of the remote peer. Used to authenticate the remote.
3587 """)
3588
3591 """ Unique identifier for the SSL session. Used to resume previous session on a new
3592 SSL connection.
3593 """
3594
3596 self._session_id = session_id
3597
3599 return self._session_id
3600
3601
3602 wrappers = {
3603 "pn_void": lambda x: pn_void2py(x),
3604 "pn_pyref": lambda x: pn_void2py(x),
3605 "pn_connection": lambda x: Connection.wrap(pn_cast_pn_connection(x)),
3606 "pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)),
3607 "pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)),
3608 "pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)),
3609 "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)),
3610 "pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x))
3611 }
3614
3616 self._impl = pn_collector()
3617
3618 - def put(self, obj, etype):
3619 pn_collector_put(self._impl, PN_PYREF, pn_py2void(obj), etype.number)
3620
3622 return Event.wrap(pn_collector_peek(self._impl))
3623
3625 ev = self.peek()
3626 pn_collector_pop(self._impl)
3627
3629 pn_collector_free(self._impl)
3630 del self._impl
3631
3633
3634 _lock = threading.Lock()
3635 _extended = 10000
3636 TYPES = {}
3637
3638 - def __init__(self, name=None, number=None, method=None):
3639 if name is None and number is None:
3640 raise TypeError("extended events require a name")
3641 try:
3642 self._lock.acquire()
3643 if name is None:
3644 name = pn_event_type_name(number)
3645
3646 if number is None:
3647 number = EventType._extended
3648 EventType._extended += 1
3649
3650 if method is None:
3651 method = "on_%s" % name
3652
3653 self.name = name
3654 self.number = number
3655 self.method = method
3656
3657 self.TYPES[number] = self
3658 finally:
3659 self._lock.release()
3660
3663
3670
3672
3673 - def __init__(self, clazz, context, type):
3677
3680
3681 -def _none(x): return None
3682
3683 DELEGATED = Constant("DELEGATED")
3684
3685 -def _core(number, method):
3686 return EventType(number=number, method=method)
3687
3688 -class Event(Wrapper, EventBase):
3689
3690 REACTOR_INIT = _core(PN_REACTOR_INIT, "on_reactor_init")
3691 REACTOR_QUIESCED = _core(PN_REACTOR_QUIESCED, "on_reactor_quiesced")
3692 REACTOR_FINAL = _core(PN_REACTOR_FINAL, "on_reactor_final")
3693
3694 TIMER_TASK = _core(PN_TIMER_TASK, "on_timer_task")
3695
3696 CONNECTION_INIT = _core(PN_CONNECTION_INIT, "on_connection_init")
3697 CONNECTION_BOUND = _core(PN_CONNECTION_BOUND, "on_connection_bound")
3698 CONNECTION_UNBOUND = _core(PN_CONNECTION_UNBOUND, "on_connection_unbound")
3699 CONNECTION_LOCAL_OPEN = _core(PN_CONNECTION_LOCAL_OPEN, "on_connection_local_open")
3700 CONNECTION_LOCAL_CLOSE = _core(PN_CONNECTION_LOCAL_CLOSE, "on_connection_local_close")
3701 CONNECTION_REMOTE_OPEN = _core(PN_CONNECTION_REMOTE_OPEN, "on_connection_remote_open")
3702 CONNECTION_REMOTE_CLOSE = _core(PN_CONNECTION_REMOTE_CLOSE, "on_connection_remote_close")
3703 CONNECTION_FINAL = _core(PN_CONNECTION_FINAL, "on_connection_final")
3704
3705 SESSION_INIT = _core(PN_SESSION_INIT, "on_session_init")
3706 SESSION_LOCAL_OPEN = _core(PN_SESSION_LOCAL_OPEN, "on_session_local_open")
3707 SESSION_LOCAL_CLOSE = _core(PN_SESSION_LOCAL_CLOSE, "on_session_local_close")
3708 SESSION_REMOTE_OPEN = _core(PN_SESSION_REMOTE_OPEN, "on_session_remote_open")
3709 SESSION_REMOTE_CLOSE = _core(PN_SESSION_REMOTE_CLOSE, "on_session_remote_close")
3710 SESSION_FINAL = _core(PN_SESSION_FINAL, "on_session_final")
3711
3712 LINK_INIT = _core(PN_LINK_INIT, "on_link_init")
3713 LINK_LOCAL_OPEN = _core(PN_LINK_LOCAL_OPEN, "on_link_local_open")
3714 LINK_LOCAL_CLOSE = _core(PN_LINK_LOCAL_CLOSE, "on_link_local_close")
3715 LINK_LOCAL_DETACH = _core(PN_LINK_LOCAL_DETACH, "on_link_local_detach")
3716 LINK_REMOTE_OPEN = _core(PN_LINK_REMOTE_OPEN, "on_link_remote_open")
3717 LINK_REMOTE_CLOSE = _core(PN_LINK_REMOTE_CLOSE, "on_link_remote_close")
3718 LINK_REMOTE_DETACH = _core(PN_LINK_REMOTE_DETACH, "on_link_remote_detach")
3719 LINK_FLOW = _core(PN_LINK_FLOW, "on_link_flow")
3720 LINK_FINAL = _core(PN_LINK_FINAL, "on_link_final")
3721
3722 DELIVERY = _core(PN_DELIVERY, "on_delivery")
3723
3724 TRANSPORT = _core(PN_TRANSPORT, "on_transport")
3725 TRANSPORT_ERROR = _core(PN_TRANSPORT_ERROR, "on_transport_error")
3726 TRANSPORT_HEAD_CLOSED = _core(PN_TRANSPORT_HEAD_CLOSED, "on_transport_head_closed")
3727 TRANSPORT_TAIL_CLOSED = _core(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed")
3728 TRANSPORT_CLOSED = _core(PN_TRANSPORT_CLOSED, "on_transport_closed")
3729
3730 SELECTABLE_INIT = _core(PN_SELECTABLE_INIT, "on_selectable_init")
3731 SELECTABLE_UPDATED = _core(PN_SELECTABLE_UPDATED, "on_selectable_updated")
3732 SELECTABLE_READABLE = _core(PN_SELECTABLE_READABLE, "on_selectable_readable")
3733 SELECTABLE_WRITABLE = _core(PN_SELECTABLE_WRITABLE, "on_selectable_writable")
3734 SELECTABLE_EXPIRED = _core(PN_SELECTABLE_EXPIRED, "on_selectable_expired")
3735 SELECTABLE_ERROR = _core(PN_SELECTABLE_ERROR, "on_selectable_error")
3736 SELECTABLE_FINAL = _core(PN_SELECTABLE_FINAL, "on_selectable_final")
3737
3738 @staticmethod
3739 - def wrap(impl, number=None):
3740 if impl is None:
3741 return None
3742
3743 if number is None:
3744 number = pn_event_type(impl)
3745
3746 event = Event(impl, number)
3747
3748 if isinstance(event.context, EventBase):
3749 return event.context
3750 else:
3751 return event
3752
3754 Wrapper.__init__(self, impl, pn_event_attachments)
3755 self.__dict__["type"] = EventType.TYPES[number]
3756
3759
3760 @property
3762 cls = pn_event_class(self._impl)
3763 if cls:
3764 return pn_class_name(cls)
3765 else:
3766 return None
3767
3768 @property
3769 - def context(self):
3770 """Returns the context object associated with the event. The type of this depend on the type of event."""
3771 return wrappers[self.clazz](pn_event_context(self._impl))
3772
3773 - def dispatch(self, handler, type=None):
3782
3783
3784 @property
3786 """Returns the reactor associated with the event."""
3787 return wrappers.get("pn_reactor", _none)(pn_event_reactor(self._impl))
3788
3789 @property
3791 """Returns the transport associated with the event, or null if none is associated with it."""
3792 return Transport.wrap(pn_event_transport(self._impl))
3793
3794 @property
3796 """Returns the connection associated with the event, or null if none is associated with it."""
3797 return Connection.wrap(pn_event_connection(self._impl))
3798
3799 @property
3801 """Returns the session associated with the event, or null if none is associated with it."""
3802 return Session.wrap(pn_event_session(self._impl))
3803
3804 @property
3806 """Returns the link associated with the event, or null if none is associated with it."""
3807 return Link.wrap(pn_event_link(self._impl))
3808
3809 @property
3811 """Returns the sender link associated with the event, or null if
3812 none is associated with it. This is essentially an alias for
3813 link(), that does an additional checkon the type of the
3814 link."""
3815 l = self.link
3816 if l and l.is_sender:
3817 return l
3818 else:
3819 return None
3820
3821 @property
3823 """Returns the receiver link associated with the event, or null if
3824 none is associated with it. This is essentially an alias for
3825 link(), that does an additional checkon the type of the link."""
3826 l = self.link
3827 if l and l.is_receiver:
3828 return l
3829 else:
3830 return None
3831
3832 @property
3834 """Returns the delivery associated with the event, or null if none is associated with it."""
3835 return Delivery.wrap(pn_event_delivery(self._impl))
3836
3839
3844
3846
3847 - def __init__(self, handler, on_error=None):
3850
3854
3860
3862
3863 @staticmethod
3864 - def wrap(impl, on_error=None):
3865 if impl is None:
3866 return None
3867 else:
3868 handler = WrappedHandler(impl)
3869 handler.__dict__["on_error"] = on_error
3870 return handler
3871
3872 - def __init__(self, impl_or_constructor):
3874
3881
3882 - def add(self, handler):
3883 if handler is None: return
3884 impl = _chandler(handler, self._on_error)
3885 pn_handler_add(self._impl, impl)
3886 pn_decref(impl)
3887
3889 pn_handler_clear(self._impl)
3890
3892 if obj is None:
3893 return None
3894 elif isinstance(obj, WrappedHandler):
3895 impl = obj._impl
3896 pn_incref(impl)
3897 return impl
3898 else:
3899 return pn_pyhandler(_cadapter(obj, on_error))
3900
3902 """
3903 Simple URL parser/constructor, handles URLs of the form:
3904
3905 <scheme>://<user>:<password>@<host>:<port>/<path>
3906
3907 All components can be None if not specifeid in the URL string.
3908
3909 The port can be specified as a service name, e.g. 'amqp' in the
3910 URL string but Url.port always gives the integer value.
3911
3912 @ivar scheme: Url scheme e.g. 'amqp' or 'amqps'
3913 @ivar user: Username
3914 @ivar password: Password
3915 @ivar host: Host name, ipv6 literal or ipv4 dotted quad.
3916 @ivar port: Integer port.
3917 @ivar host_port: Returns host:port
3918 """
3919
3920 AMQPS = "amqps"
3921 AMQP = "amqp"
3922
3924 """An integer port number that can be constructed from a service name string"""
3925
3927 """@param value: integer port number or string service name."""
3928 port = super(Url.Port, cls).__new__(cls, cls._port_int(value))
3929 setattr(port, 'name', str(value))
3930 return port
3931
3932 - def __eq__(self, x): return str(self) == x or int(self) == x
3933 - def __ne__(self, x): return not self == x
3935
3936 @staticmethod
3938 """Convert service, an integer or a service name, into an integer port number."""
3939 try:
3940 return int(value)
3941 except ValueError:
3942 try:
3943 return socket.getservbyname(value)
3944 except socket.error:
3945
3946 if value == Url.AMQPS: return 5671
3947 elif value == Url.AMQP: return 5672
3948 else:
3949 raise ValueError("Not a valid port number or service name: '%s'" % value)
3950
3951 - def __init__(self, url=None, defaults=True, **kwargs):
3952 """
3953 @param url: URL string to parse.
3954 @param defaults: If true, fill in missing default values in the URL.
3955 If false, you can fill them in later by calling self.defaults()
3956 @param kwargs: scheme, user, password, host, port, path.
3957 If specified, replaces corresponding part in url string.
3958 """
3959 if url:
3960 self._url = pn_url_parse(unicode2utf8(str(url)))
3961 if not self._url: raise ValueError("Invalid URL '%s'" % url)
3962 else:
3963 self._url = pn_url()
3964 for k in kwargs:
3965 getattr(self, k)
3966 setattr(self, k, kwargs[k])
3967 if defaults: self.defaults()
3968
3971 self.getter = globals()["pn_url_get_%s" % part]
3972 self.setter = globals()["pn_url_set_%s" % part]
3973 - def __get__(self, obj, type=None): return self.getter(obj._url)
3974 - def __set__(self, obj, value): return self.setter(obj._url, str(value))
3975
3976 scheme = PartDescriptor('scheme')
3977 username = PartDescriptor('username')
3978 password = PartDescriptor('password')
3979 host = PartDescriptor('host')
3980 path = PartDescriptor('path')
3981
3983 portstr = pn_url_get_port(self._url)
3984 return portstr and Url.Port(portstr)
3985
3987 if value is None: pn_url_set_port(self._url, None)
3988 else: pn_url_set_port(self._url, str(Url.Port(value)))
3989
3990 port = property(_get_port, _set_port)
3991
3992 - def __str__(self): return pn_url_str(self._url)
3993
3994 - def __repr__(self): return "Url(%r)" % str(self)
3995
3996 - def __eq__(self, x): return str(self) == str(x)
3997 - def __ne__(self, x): return not self == x
3998
4000 pn_url_free(self._url);
4001 del self._url
4002
4004 """
4005 Fill in missing values (scheme, host or port) with defaults
4006 @return: self
4007 """
4008 self.scheme = self.scheme or self.AMQP
4009 self.host = self.host or '0.0.0.0'
4010 self.port = self.port or self.Port(self.scheme)
4011 return self
4012
4013 __all__ = [
4014 "API_LANGUAGE",
4015 "IMPLEMENTATION_LANGUAGE",
4016 "ABORTED",
4017 "ACCEPTED",
4018 "AUTOMATIC",
4019 "PENDING",
4020 "MANUAL",
4021 "REJECTED",
4022 "RELEASED",
4023 "MODIFIED",
4024 "SETTLED",
4025 "UNDESCRIBED",
4026 "Array",
4027 "Collector",
4028 "Condition",
4029 "Connection",
4030 "Data",
4031 "Delivery",
4032 "Disposition",
4033 "Described",
4034 "Endpoint",
4035 "Event",
4036 "Handler",
4037 "Link",
4038 "Message",
4039 "MessageException",
4040 "Messenger",
4041 "MessengerException",
4042 "ProtonException",
4043 "VERSION_MAJOR",
4044 "VERSION_MINOR",
4045 "Receiver",
4046 "SASL",
4047 "Sender",
4048 "Session",
4049 "SessionException",
4050 "SSL",
4051 "SSLDomain",
4052 "SSLSessionDetails",
4053 "SSLUnavailable",
4054 "SSLException",
4055 "Terminus",
4056 "Timeout",
4057 "Interrupt",
4058 "Transport",
4059 "TransportException",
4060 "Url",
4061 "char",
4062 "dispatch",
4063 "symbol",
4064 "timestamp",
4065 "ulong"
4066 ]
4067