001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.jmx; 018 019import java.io.IOException; 020import java.util.Set; 021 022import javax.jms.InvalidSelectorException; 023import javax.management.ObjectName; 024 025import org.apache.activemq.broker.BrokerService; 026import org.apache.activemq.broker.ConnectionContext; 027import org.apache.activemq.broker.region.Subscription; 028import org.apache.activemq.command.ActiveMQDestination; 029import org.apache.activemq.command.ActiveMQQueue; 030import org.apache.activemq.command.ActiveMQTopic; 031import org.apache.activemq.command.ConsumerInfo; 032import org.apache.activemq.filter.DestinationFilter; 033import org.apache.activemq.util.IOExceptionSupport; 034 035/** 036 * 037 */ 038public class SubscriptionView implements SubscriptionViewMBean { 039 040 protected final Subscription subscription; 041 protected final String clientId; 042 protected final String userName; 043 044 /** 045 * Constructor 046 * 047 * @param subs 048 */ 049 public SubscriptionView(String clientId, String userName, Subscription subs) { 050 this.clientId = clientId; 051 this.subscription = subs; 052 this.userName = userName; 053 } 054 055 /** 056 * @return the clientId 057 */ 058 @Override 059 public String getClientId() { 060 return clientId; 061 } 062 063 /** 064 * @returns the ObjectName of the Connection that created this subscription 065 */ 066 @Override 067 public ObjectName getConnection() { 068 ObjectName result = null; 069 070 if (clientId != null && subscription != null) { 071 ConnectionContext ctx = subscription.getContext(); 072 if (ctx != null && ctx.getBroker() != null && ctx.getBroker().getBrokerService() != null) { 073 BrokerService service = ctx.getBroker().getBrokerService(); 074 ManagementContext managementCtx = service.getManagementContext(); 075 if (managementCtx != null) { 076 077 try { 078 ObjectName query = createConnectionQuery(managementCtx, service.getBrokerName()); 079 Set<ObjectName> names = managementCtx.queryNames(query, null); 080 if (names.size() == 1) { 081 result = names.iterator().next(); 082 } 083 } catch (Exception e) { 084 } 085 } 086 } 087 } 088 return result; 089 } 090 091 092 093 private ObjectName createConnectionQuery(ManagementContext ctx, String brokerName) throws IOException { 094 try { 095 return BrokerMBeanSupport.createConnectionQuery(ctx.getJmxDomainName(), brokerName, clientId); 096 } catch (Throwable e) { 097 throw IOExceptionSupport.create(e); 098 } 099 } 100 101 /** 102 * @return the id of the Connection the Subscription is on 103 */ 104 @Override 105 public String getConnectionId() { 106 ConsumerInfo info = getConsumerInfo(); 107 if (info != null) { 108 return info.getConsumerId().getConnectionId(); 109 } 110 return "NOTSET"; 111 } 112 113 /** 114 * @return the id of the Session the subscription is on 115 */ 116 @Override 117 public long getSessionId() { 118 ConsumerInfo info = getConsumerInfo(); 119 if (info != null) { 120 return info.getConsumerId().getSessionId(); 121 } 122 return 0; 123 } 124 125 /** 126 * @return the id of the Subscription 127 */ 128 @Deprecated 129 @Override 130 public long getSubcriptionId() { 131 return getSubscriptionId(); 132 } 133 134 /** 135 * @return the id of the Subscription 136 */ 137 @Override 138 public long getSubscriptionId() { 139 ConsumerInfo info = getConsumerInfo(); 140 if (info != null) { 141 return info.getConsumerId().getValue(); 142 } 143 return 0; 144 } 145 146 /** 147 * @return the destination name 148 */ 149 @Override 150 public String getDestinationName() { 151 ConsumerInfo info = getConsumerInfo(); 152 if (info != null) { 153 ActiveMQDestination dest = info.getDestination(); 154 return dest.getPhysicalName(); 155 } 156 return "NOTSET"; 157 } 158 159 @Override 160 public String getSelector() { 161 if (subscription != null) { 162 return subscription.getSelector(); 163 } 164 return null; 165 } 166 167 @Override 168 public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException { 169 if (subscription != null) { 170 subscription.setSelector(selector); 171 } else { 172 throw new UnsupportedOperationException("No subscription object"); 173 } 174 } 175 176 /** 177 * @return true if the destination is a Queue 178 */ 179 @Override 180 public boolean isDestinationQueue() { 181 ConsumerInfo info = getConsumerInfo(); 182 if (info != null) { 183 ActiveMQDestination dest = info.getDestination(); 184 return dest.isQueue(); 185 } 186 return false; 187 } 188 189 /** 190 * @return true of the destination is a Topic 191 */ 192 @Override 193 public boolean isDestinationTopic() { 194 ConsumerInfo info = getConsumerInfo(); 195 if (info != null) { 196 ActiveMQDestination dest = info.getDestination(); 197 return dest.isTopic(); 198 } 199 return false; 200 } 201 202 /** 203 * @return true if the destination is temporary 204 */ 205 @Override 206 public boolean isDestinationTemporary() { 207 ConsumerInfo info = getConsumerInfo(); 208 if (info != null) { 209 ActiveMQDestination dest = info.getDestination(); 210 return dest.isTemporary(); 211 } 212 return false; 213 } 214 215 /** 216 * @return true if the subscriber is active 217 */ 218 @Override 219 public boolean isActive() { 220 return true; 221 } 222 223 @Override 224 public boolean isNetwork() { 225 ConsumerInfo info = getConsumerInfo(); 226 if (info != null) { 227 return info.isNetworkSubscription(); 228 } 229 return false; 230 } 231 232 /** 233 * The subscription should release as may references as it can to help the 234 * garbage collector reclaim memory. 235 */ 236 public void gc() { 237 if (subscription != null) { 238 subscription.gc(); 239 } 240 } 241 242 /** 243 * @return whether or not the subscriber is retroactive or not 244 */ 245 @Override 246 public boolean isRetroactive() { 247 ConsumerInfo info = getConsumerInfo(); 248 return info != null ? info.isRetroactive() : false; 249 } 250 251 /** 252 * @return whether or not the subscriber is an exclusive consumer 253 */ 254 @Override 255 public boolean isExclusive() { 256 ConsumerInfo info = getConsumerInfo(); 257 return info != null ? info.isExclusive() : false; 258 } 259 260 /** 261 * @return whether or not the subscriber is durable (persistent) 262 */ 263 @Override 264 public boolean isDurable() { 265 ConsumerInfo info = getConsumerInfo(); 266 return info != null ? info.isDurable() : false; 267 } 268 269 /** 270 * @return whether or not the subscriber ignores local messages 271 */ 272 @Override 273 public boolean isNoLocal() { 274 ConsumerInfo info = getConsumerInfo(); 275 return info != null ? info.isNoLocal() : false; 276 } 277 278 /** 279 * @return the maximum number of pending messages allowed in addition to the 280 * prefetch size. If enabled to a non-zero value then this will 281 * perform eviction of messages for slow consumers on non-durable 282 * topics. 283 */ 284 @Override 285 public int getMaximumPendingMessageLimit() { 286 ConsumerInfo info = getConsumerInfo(); 287 return info != null ? info.getMaximumPendingMessageLimit() : 0; 288 } 289 290 /** 291 * @return the consumer priority 292 */ 293 @Override 294 public byte getPriority() { 295 ConsumerInfo info = getConsumerInfo(); 296 return info != null ? info.getPriority() : 0; 297 } 298 299 /** 300 * @return the name of the consumer which is only used for durable 301 * consumers. 302 */ 303 @Deprecated 304 @Override 305 public String getSubcriptionName() { 306 return getSubscriptionName(); 307 } 308 309 /** 310 * @return the name of the consumer which is only used for durable 311 * consumers. 312 */ 313 @Override 314 public String getSubscriptionName() { 315 ConsumerInfo info = getConsumerInfo(); 316 return info != null ? info.getSubscriptionName() : null; 317 } 318 319 /** 320 * @return number of messages pending delivery 321 */ 322 @Override 323 public int getPendingQueueSize() { 324 return subscription != null ? subscription.getPendingQueueSize() : 0; 325 } 326 327 /** 328 * @return number of messages dispatched 329 */ 330 @Override 331 public int getDispatchedQueueSize() { 332 return subscription != null ? subscription.getDispatchedQueueSize() : 0; 333 } 334 335 @Override 336 public int getMessageCountAwaitingAcknowledge() { 337 return getDispatchedQueueSize(); 338 } 339 340 /** 341 * @return number of messages that matched the subscription 342 */ 343 @Override 344 public long getDispatchedCounter() { 345 return subscription != null ? subscription.getDispatchedCounter() : 0; 346 } 347 348 /** 349 * @return number of messages that matched the subscription 350 */ 351 @Override 352 public long getEnqueueCounter() { 353 return subscription != null ? subscription.getEnqueueCounter() : 0; 354 } 355 356 /** 357 * @return number of messages queued by the client 358 */ 359 @Override 360 public long getDequeueCounter() { 361 return subscription != null ? subscription.getDequeueCounter() : 0; 362 } 363 364 protected ConsumerInfo getConsumerInfo() { 365 return subscription != null ? subscription.getConsumerInfo() : null; 366 } 367 368 /** 369 * @return pretty print 370 */ 371 @Override 372 public String toString() { 373 return "SubscriptionView: " + getClientId() + ":" + getConnectionId(); 374 } 375 376 /** 377 */ 378 @Override 379 public int getPrefetchSize() { 380 return subscription != null ? subscription.getPrefetchSize() : 0; 381 } 382 383 @Override 384 public boolean isMatchingQueue(String queueName) { 385 if (isDestinationQueue()) { 386 return matchesDestination(new ActiveMQQueue(queueName)); 387 } 388 return false; 389 } 390 391 @Override 392 public boolean isMatchingTopic(String topicName) { 393 if (isDestinationTopic()) { 394 return matchesDestination(new ActiveMQTopic(topicName)); 395 } 396 return false; 397 } 398 399 /** 400 * Return true if this subscription matches the given destination 401 * 402 * @param destination the destination to compare against 403 * @return true if this subscription matches the given destination 404 */ 405 public boolean matchesDestination(ActiveMQDestination destination) { 406 ActiveMQDestination subscriptionDestination = subscription.getActiveMQDestination(); 407 DestinationFilter filter = DestinationFilter.parseFilter(subscriptionDestination); 408 return filter.matches(destination); 409 } 410 411 @Override 412 public boolean isSlowConsumer() { 413 return subscription.isSlowConsumer(); 414 } 415 416 @Override 417 public String getUserName() { 418 return userName; 419 } 420 421 @Override 422 public void resetStatistics() { 423 if (subscription != null && subscription.getSubscriptionStatistics() != null){ 424 subscription.getSubscriptionStatistics().reset(); 425 } 426 } 427 428 @Override 429 public long getConsumedCount() { 430 return subscription != null ? subscription.getConsumedCount() : 0; 431 } 432}