001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import java.io.DataInputStream; 020import java.io.DataOutputStream; 021import java.io.IOException; 022import java.io.OutputStream; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.Map; 026import java.util.zip.DeflaterOutputStream; 027 028import javax.jms.JMSException; 029 030import org.apache.activemq.ActiveMQConnection; 031import org.apache.activemq.advisory.AdvisorySupport; 032import org.apache.activemq.broker.region.MessageReference; 033import org.apache.activemq.usage.MemoryUsage; 034import org.apache.activemq.util.ByteArrayInputStream; 035import org.apache.activemq.util.ByteArrayOutputStream; 036import org.apache.activemq.util.ByteSequence; 037import org.apache.activemq.util.MarshallingSupport; 038import org.apache.activemq.wireformat.WireFormat; 039import org.fusesource.hawtbuf.UTF8Buffer; 040 041/** 042 * Represents an ActiveMQ message 043 * 044 * @openwire:marshaller 045 * 046 */ 047public abstract class Message extends BaseCommand implements MarshallAware, MessageReference { 048 public static final String ORIGINAL_EXPIRATION = "originalExpiration"; 049 050 /** 051 * The default minimum amount of memory a message is assumed to use 052 */ 053 public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024; 054 055 protected MessageId messageId; 056 protected ActiveMQDestination originalDestination; 057 protected TransactionId originalTransactionId; 058 059 protected ProducerId producerId; 060 protected ActiveMQDestination destination; 061 protected TransactionId transactionId; 062 063 protected long expiration; 064 protected long timestamp; 065 protected long arrival; 066 protected long brokerInTime; 067 protected long brokerOutTime; 068 protected String correlationId; 069 protected ActiveMQDestination replyTo; 070 protected boolean persistent; 071 protected String type; 072 protected byte priority; 073 protected String groupID; 074 protected int groupSequence; 075 protected ConsumerId targetConsumerId; 076 protected boolean compressed; 077 protected String userID; 078 079 protected ByteSequence content; 080 protected ByteSequence marshalledProperties; 081 protected DataStructure dataStructure; 082 protected int redeliveryCounter; 083 084 protected int size; 085 protected Map<String, Object> properties; 086 protected boolean readOnlyProperties; 087 protected boolean readOnlyBody; 088 protected transient boolean recievedByDFBridge; 089 protected boolean droppable; 090 protected boolean jmsXGroupFirstForConsumer; 091 092 private transient short referenceCount; 093 private transient ActiveMQConnection connection; 094 transient MessageDestination regionDestination; 095 transient MemoryUsage memoryUsage; 096 097 private BrokerId[] brokerPath; 098 private BrokerId[] cluster; 099 100 public static interface MessageDestination { 101 int getMinimumMessageSize(); 102 MemoryUsage getMemoryUsage(); 103 } 104 105 public abstract Message copy(); 106 public abstract void clearBody() throws JMSException; 107 public abstract void storeContent(); 108 public abstract void storeContentAndClear(); 109 110 // useful to reduce the memory footprint of a persisted message 111 public void clearMarshalledState() throws JMSException { 112 properties = null; 113 } 114 115 protected void copy(Message copy) { 116 super.copy(copy); 117 copy.producerId = producerId; 118 copy.transactionId = transactionId; 119 copy.destination = destination; 120 copy.messageId = messageId != null ? messageId.copy() : null; 121 copy.originalDestination = originalDestination; 122 copy.originalTransactionId = originalTransactionId; 123 copy.expiration = expiration; 124 copy.timestamp = timestamp; 125 copy.correlationId = correlationId; 126 copy.replyTo = replyTo; 127 copy.persistent = persistent; 128 copy.redeliveryCounter = redeliveryCounter; 129 copy.type = type; 130 copy.priority = priority; 131 copy.size = size; 132 copy.groupID = groupID; 133 copy.userID = userID; 134 copy.groupSequence = groupSequence; 135 136 if (properties != null) { 137 copy.properties = new HashMap<String, Object>(properties); 138 139 // The new message hasn't expired, so remove this feild. 140 copy.properties.remove(ORIGINAL_EXPIRATION); 141 } else { 142 copy.properties = properties; 143 } 144 145 copy.content = content; 146 copy.marshalledProperties = marshalledProperties; 147 copy.dataStructure = dataStructure; 148 copy.readOnlyProperties = readOnlyProperties; 149 copy.readOnlyBody = readOnlyBody; 150 copy.compressed = compressed; 151 copy.recievedByDFBridge = recievedByDFBridge; 152 153 copy.arrival = arrival; 154 copy.connection = connection; 155 copy.regionDestination = regionDestination; 156 copy.brokerInTime = brokerInTime; 157 copy.brokerOutTime = brokerOutTime; 158 copy.memoryUsage=this.memoryUsage; 159 copy.brokerPath = brokerPath; 160 copy.jmsXGroupFirstForConsumer = jmsXGroupFirstForConsumer; 161 162 // lets not copy the following fields 163 // copy.targetConsumerId = targetConsumerId; 164 // copy.referenceCount = referenceCount; 165 } 166 167 public Object getProperty(String name) throws IOException { 168 if (properties == null) { 169 if (marshalledProperties == null) { 170 return null; 171 } 172 properties = unmarsallProperties(marshalledProperties); 173 } 174 Object result = properties.get(name); 175 if (result instanceof UTF8Buffer) { 176 result = result.toString(); 177 } 178 179 return result; 180 } 181 182 @SuppressWarnings("unchecked") 183 public Map<String, Object> getProperties() throws IOException { 184 if (properties == null) { 185 if (marshalledProperties == null) { 186 return Collections.EMPTY_MAP; 187 } 188 properties = unmarsallProperties(marshalledProperties); 189 } 190 return Collections.unmodifiableMap(properties); 191 } 192 193 public void clearProperties() { 194 marshalledProperties = null; 195 properties = null; 196 } 197 198 public void setProperty(String name, Object value) throws IOException { 199 lazyCreateProperties(); 200 properties.put(name, value); 201 } 202 203 public void removeProperty(String name) throws IOException { 204 lazyCreateProperties(); 205 properties.remove(name); 206 } 207 208 protected void lazyCreateProperties() throws IOException { 209 if (properties == null) { 210 if (marshalledProperties == null) { 211 properties = new HashMap<String, Object>(); 212 } else { 213 properties = unmarsallProperties(marshalledProperties); 214 marshalledProperties = null; 215 } 216 } else { 217 marshalledProperties = null; 218 } 219 } 220 221 private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException { 222 return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties))); 223 } 224 225 @Override 226 public void beforeMarshall(WireFormat wireFormat) throws IOException { 227 // Need to marshal the properties. 228 if (marshalledProperties == null && properties != null) { 229 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 230 DataOutputStream os = new DataOutputStream(baos); 231 MarshallingSupport.marshalPrimitiveMap(properties, os); 232 os.close(); 233 marshalledProperties = baos.toByteSequence(); 234 } 235 } 236 237 @Override 238 public void afterMarshall(WireFormat wireFormat) throws IOException { 239 } 240 241 @Override 242 public void beforeUnmarshall(WireFormat wireFormat) throws IOException { 243 } 244 245 @Override 246 public void afterUnmarshall(WireFormat wireFormat) throws IOException { 247 } 248 249 // ///////////////////////////////////////////////////////////////// 250 // 251 // Simple Field accessors 252 // 253 // ///////////////////////////////////////////////////////////////// 254 255 /** 256 * @openwire:property version=1 cache=true 257 */ 258 public ProducerId getProducerId() { 259 return producerId; 260 } 261 262 public void setProducerId(ProducerId producerId) { 263 this.producerId = producerId; 264 } 265 266 /** 267 * @openwire:property version=1 cache=true 268 */ 269 public ActiveMQDestination getDestination() { 270 return destination; 271 } 272 273 public void setDestination(ActiveMQDestination destination) { 274 this.destination = destination; 275 } 276 277 /** 278 * @openwire:property version=1 cache=true 279 */ 280 public TransactionId getTransactionId() { 281 return transactionId; 282 } 283 284 public void setTransactionId(TransactionId transactionId) { 285 this.transactionId = transactionId; 286 } 287 288 public boolean isInTransaction() { 289 return transactionId != null; 290 } 291 292 /** 293 * @openwire:property version=1 cache=true 294 */ 295 public ActiveMQDestination getOriginalDestination() { 296 return originalDestination; 297 } 298 299 public void setOriginalDestination(ActiveMQDestination destination) { 300 this.originalDestination = destination; 301 } 302 303 /** 304 * @openwire:property version=1 305 */ 306 @Override 307 public MessageId getMessageId() { 308 return messageId; 309 } 310 311 public void setMessageId(MessageId messageId) { 312 this.messageId = messageId; 313 } 314 315 /** 316 * @openwire:property version=1 cache=true 317 */ 318 public TransactionId getOriginalTransactionId() { 319 return originalTransactionId; 320 } 321 322 public void setOriginalTransactionId(TransactionId transactionId) { 323 this.originalTransactionId = transactionId; 324 } 325 326 /** 327 * @openwire:property version=1 328 */ 329 @Override 330 public String getGroupID() { 331 return groupID; 332 } 333 334 public void setGroupID(String groupID) { 335 this.groupID = groupID; 336 } 337 338 /** 339 * @openwire:property version=1 340 */ 341 @Override 342 public int getGroupSequence() { 343 return groupSequence; 344 } 345 346 public void setGroupSequence(int groupSequence) { 347 this.groupSequence = groupSequence; 348 } 349 350 /** 351 * @openwire:property version=1 352 */ 353 public String getCorrelationId() { 354 return correlationId; 355 } 356 357 public void setCorrelationId(String correlationId) { 358 this.correlationId = correlationId; 359 } 360 361 /** 362 * @openwire:property version=1 363 */ 364 @Override 365 public boolean isPersistent() { 366 return persistent; 367 } 368 369 public void setPersistent(boolean deliveryMode) { 370 this.persistent = deliveryMode; 371 } 372 373 /** 374 * @openwire:property version=1 375 */ 376 @Override 377 public long getExpiration() { 378 return expiration; 379 } 380 381 public void setExpiration(long expiration) { 382 this.expiration = expiration; 383 } 384 385 /** 386 * @openwire:property version=1 387 */ 388 public byte getPriority() { 389 return priority; 390 } 391 392 public void setPriority(byte priority) { 393 if (priority < 0) { 394 this.priority = 0; 395 } else if (priority > 9) { 396 this.priority = 9; 397 } else { 398 this.priority = priority; 399 } 400 } 401 402 /** 403 * @openwire:property version=1 404 */ 405 public ActiveMQDestination getReplyTo() { 406 return replyTo; 407 } 408 409 public void setReplyTo(ActiveMQDestination replyTo) { 410 this.replyTo = replyTo; 411 } 412 413 /** 414 * @openwire:property version=1 415 */ 416 public long getTimestamp() { 417 return timestamp; 418 } 419 420 public void setTimestamp(long timestamp) { 421 this.timestamp = timestamp; 422 } 423 424 /** 425 * @openwire:property version=1 426 */ 427 public String getType() { 428 return type; 429 } 430 431 public void setType(String type) { 432 this.type = type; 433 } 434 435 /** 436 * @openwire:property version=1 437 */ 438 public ByteSequence getContent() { 439 return content; 440 } 441 442 public void setContent(ByteSequence content) { 443 this.content = content; 444 } 445 446 /** 447 * @openwire:property version=1 448 */ 449 public ByteSequence getMarshalledProperties() { 450 return marshalledProperties; 451 } 452 453 public void setMarshalledProperties(ByteSequence marshalledProperties) { 454 this.marshalledProperties = marshalledProperties; 455 } 456 457 /** 458 * @openwire:property version=1 459 */ 460 public DataStructure getDataStructure() { 461 return dataStructure; 462 } 463 464 public void setDataStructure(DataStructure data) { 465 this.dataStructure = data; 466 } 467 468 /** 469 * Can be used to route the message to a specific consumer. Should be null 470 * to allow the broker use normal JMS routing semantics. If the target 471 * consumer id is an active consumer on the broker, the message is dropped. 472 * Used by the AdvisoryBroker to replay advisory messages to a specific 473 * consumer. 474 * 475 * @openwire:property version=1 cache=true 476 */ 477 @Override 478 public ConsumerId getTargetConsumerId() { 479 return targetConsumerId; 480 } 481 482 public void setTargetConsumerId(ConsumerId targetConsumerId) { 483 this.targetConsumerId = targetConsumerId; 484 } 485 486 @Override 487 public boolean isExpired() { 488 long expireTime = getExpiration(); 489 return expireTime > 0 && System.currentTimeMillis() > expireTime; 490 } 491 492 @Override 493 public boolean isAdvisory() { 494 return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 495 } 496 497 /** 498 * @openwire:property version=1 499 */ 500 public boolean isCompressed() { 501 return compressed; 502 } 503 504 public void setCompressed(boolean compressed) { 505 this.compressed = compressed; 506 } 507 508 public boolean isRedelivered() { 509 return redeliveryCounter > 0; 510 } 511 512 public void setRedelivered(boolean redelivered) { 513 if (redelivered) { 514 if (!isRedelivered()) { 515 setRedeliveryCounter(1); 516 } 517 } else { 518 if (isRedelivered()) { 519 setRedeliveryCounter(0); 520 } 521 } 522 } 523 524 @Override 525 public void incrementRedeliveryCounter() { 526 redeliveryCounter++; 527 } 528 529 /** 530 * @openwire:property version=1 531 */ 532 @Override 533 public int getRedeliveryCounter() { 534 return redeliveryCounter; 535 } 536 537 public void setRedeliveryCounter(int deliveryCounter) { 538 this.redeliveryCounter = deliveryCounter; 539 } 540 541 /** 542 * The route of brokers the command has moved through. 543 * 544 * @openwire:property version=1 cache=true 545 */ 546 public BrokerId[] getBrokerPath() { 547 return brokerPath; 548 } 549 550 public void setBrokerPath(BrokerId[] brokerPath) { 551 this.brokerPath = brokerPath; 552 } 553 554 public boolean isReadOnlyProperties() { 555 return readOnlyProperties; 556 } 557 558 public void setReadOnlyProperties(boolean readOnlyProperties) { 559 this.readOnlyProperties = readOnlyProperties; 560 } 561 562 public boolean isReadOnlyBody() { 563 return readOnlyBody; 564 } 565 566 public void setReadOnlyBody(boolean readOnlyBody) { 567 this.readOnlyBody = readOnlyBody; 568 } 569 570 public ActiveMQConnection getConnection() { 571 return this.connection; 572 } 573 574 public void setConnection(ActiveMQConnection connection) { 575 this.connection = connection; 576 } 577 578 /** 579 * Used to schedule the arrival time of a message to a broker. The broker 580 * will not dispatch a message to a consumer until it's arrival time has 581 * elapsed. 582 * 583 * @openwire:property version=1 584 */ 585 public long getArrival() { 586 return arrival; 587 } 588 589 public void setArrival(long arrival) { 590 this.arrival = arrival; 591 } 592 593 /** 594 * Only set by the broker and defines the userID of the producer connection 595 * who sent this message. This is an optional field, it needs to be enabled 596 * on the broker to have this field populated. 597 * 598 * @openwire:property version=1 599 */ 600 public String getUserID() { 601 return userID; 602 } 603 604 public void setUserID(String jmsxUserID) { 605 this.userID = jmsxUserID; 606 } 607 608 @Override 609 public int getReferenceCount() { 610 return referenceCount; 611 } 612 613 @Override 614 public Message getMessageHardRef() { 615 return this; 616 } 617 618 @Override 619 public Message getMessage() { 620 return this; 621 } 622 623 public void setRegionDestination(MessageDestination destination) { 624 this.regionDestination = destination; 625 if(this.memoryUsage==null) { 626 this.memoryUsage=destination.getMemoryUsage(); 627 } 628 } 629 630 @Override 631 public MessageDestination getRegionDestination() { 632 return regionDestination; 633 } 634 635 public MemoryUsage getMemoryUsage() { 636 return this.memoryUsage; 637 } 638 639 public void setMemoryUsage(MemoryUsage usage) { 640 this.memoryUsage=usage; 641 } 642 643 @Override 644 public boolean isMarshallAware() { 645 return true; 646 } 647 648 @Override 649 public int incrementReferenceCount() { 650 int rc; 651 int size; 652 synchronized (this) { 653 rc = ++referenceCount; 654 size = getSize(); 655 } 656 657 if (rc == 1 && getMemoryUsage() != null) { 658 getMemoryUsage().increaseUsage(size); 659 //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage()); 660 661 } 662 663 //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc); 664 return rc; 665 } 666 667 @Override 668 public int decrementReferenceCount() { 669 int rc; 670 int size; 671 synchronized (this) { 672 rc = --referenceCount; 673 size = getSize(); 674 } 675 676 if (rc == 0 && getMemoryUsage() != null) { 677 getMemoryUsage().decreaseUsage(size); 678 //Thread.dumpStack(); 679 //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage()); 680 } 681 682 //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc); 683 684 return rc; 685 } 686 687 @Override 688 public int getSize() { 689 int minimumMessageSize = getMinimumMessageSize(); 690 if (size < minimumMessageSize || size == 0) { 691 size = minimumMessageSize; 692 if (marshalledProperties != null) { 693 size += marshalledProperties.getLength(); 694 } 695 if (content != null) { 696 size += content.getLength(); 697 } 698 } 699 return size; 700 } 701 702 protected int getMinimumMessageSize() { 703 int result = DEFAULT_MINIMUM_MESSAGE_SIZE; 704 //let destination override 705 MessageDestination dest = regionDestination; 706 if (dest != null) { 707 result=dest.getMinimumMessageSize(); 708 } 709 return result; 710 } 711 712 /** 713 * @openwire:property version=1 714 * @return Returns the recievedByDFBridge. 715 */ 716 public boolean isRecievedByDFBridge() { 717 return recievedByDFBridge; 718 } 719 720 /** 721 * @param recievedByDFBridge The recievedByDFBridge to set. 722 */ 723 public void setRecievedByDFBridge(boolean recievedByDFBridge) { 724 this.recievedByDFBridge = recievedByDFBridge; 725 } 726 727 public void onMessageRolledBack() { 728 incrementRedeliveryCounter(); 729 } 730 731 /** 732 * @openwire:property version=2 cache=true 733 */ 734 public boolean isDroppable() { 735 return droppable; 736 } 737 738 public void setDroppable(boolean droppable) { 739 this.droppable = droppable; 740 } 741 742 /** 743 * If a message is stored in multiple nodes on a cluster, all the cluster 744 * members will be listed here. Otherwise, it will be null. 745 * 746 * @openwire:property version=3 cache=true 747 */ 748 public BrokerId[] getCluster() { 749 return cluster; 750 } 751 752 public void setCluster(BrokerId[] cluster) { 753 this.cluster = cluster; 754 } 755 756 @Override 757 public boolean isMessage() { 758 return true; 759 } 760 761 /** 762 * @openwire:property version=3 763 */ 764 public long getBrokerInTime() { 765 return this.brokerInTime; 766 } 767 768 public void setBrokerInTime(long brokerInTime) { 769 this.brokerInTime = brokerInTime; 770 } 771 772 /** 773 * @openwire:property version=3 774 */ 775 public long getBrokerOutTime() { 776 return this.brokerOutTime; 777 } 778 779 public void setBrokerOutTime(long brokerOutTime) { 780 this.brokerOutTime = brokerOutTime; 781 } 782 783 @Override 784 public boolean isDropped() { 785 return false; 786 } 787 788 /** 789 * @openwire:property version=10 790 */ 791 public boolean isJMSXGroupFirstForConsumer() { 792 return jmsXGroupFirstForConsumer; 793 } 794 795 public void setJMSXGroupFirstForConsumer(boolean val) { 796 jmsXGroupFirstForConsumer = val; 797 } 798 799 public void compress() throws IOException { 800 if (!isCompressed()) { 801 storeContent(); 802 if (!isCompressed() && getContent() != null) { 803 doCompress(); 804 } 805 } 806 } 807 808 protected void doCompress() throws IOException { 809 compressed = true; 810 ByteSequence bytes = getContent(); 811 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 812 OutputStream os = new DeflaterOutputStream(bytesOut); 813 os.write(bytes.data, bytes.offset, bytes.length); 814 os.close(); 815 setContent(bytesOut.toByteSequence()); 816 } 817 818 @Override 819 public String toString() { 820 return toString(null); 821 } 822 823 @Override 824 public String toString(Map<String, Object>overrideFields) { 825 try { 826 getProperties(); 827 } catch (IOException e) { 828 } 829 return super.toString(overrideFields); 830 } 831}