001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.jmx; 018 019import java.io.File; 020import java.io.IOException; 021import java.net.URI; 022import java.util.HashMap; 023import java.util.Map; 024import java.util.NoSuchElementException; 025import java.util.concurrent.atomic.AtomicInteger; 026 027import javax.management.ObjectName; 028 029import org.apache.activemq.ActiveMQConnectionMetaData; 030import org.apache.activemq.broker.BrokerService; 031import org.apache.activemq.broker.ConnectionContext; 032import org.apache.activemq.broker.TransportConnector; 033import org.apache.activemq.broker.region.Subscription; 034import org.apache.activemq.command.ActiveMQQueue; 035import org.apache.activemq.command.ActiveMQTopic; 036import org.apache.activemq.command.ConsumerId; 037import org.apache.activemq.command.ConsumerInfo; 038import org.apache.activemq.command.RemoveSubscriptionInfo; 039import org.apache.activemq.network.NetworkConnector; 040import org.apache.activemq.util.BrokerSupport; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * 046 */ 047public class BrokerView implements BrokerViewMBean { 048 private static final Logger LOG = LoggerFactory.getLogger(BrokerView.class); 049 ManagedRegionBroker broker; 050 private final BrokerService brokerService; 051 private final AtomicInteger sessionIdCounter = new AtomicInteger(0); 052 private ObjectName jmsJobScheduler; 053 054 public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception { 055 this.brokerService = brokerService; 056 this.broker = managedBroker; 057 } 058 059 public ManagedRegionBroker getBroker() { 060 return broker; 061 } 062 063 public void setBroker(ManagedRegionBroker broker) { 064 this.broker = broker; 065 } 066 067 @Override 068 public String getBrokerId() { 069 return safeGetBroker().getBrokerId().toString(); 070 } 071 072 @Override 073 public String getBrokerName() { 074 return safeGetBroker().getBrokerName(); 075 } 076 077 @Override 078 public String getBrokerVersion() { 079 return ActiveMQConnectionMetaData.PROVIDER_VERSION; 080 } 081 082 @Override 083 public String getUptime() { 084 return brokerService.getUptime(); 085 } 086 087 @Override 088 public long getUptimeMillis() { 089 return brokerService.getUptimeMillis(); 090 } 091 092 @Override 093 public int getCurrentConnectionsCount() { 094 return brokerService.getCurrentConnections(); 095 } 096 097 @Override 098 public long getTotalConnectionsCount() { 099 return brokerService.getTotalConnections(); 100 } 101 102 @Override 103 public void gc() throws Exception { 104 brokerService.getBroker().gc(); 105 try { 106 brokerService.getPersistenceAdapter().checkpoint(true); 107 } catch (IOException e) { 108 LOG.error("Failed to checkpoint persistence adapter on gc request", e); 109 } 110 } 111 112 @Override 113 public void start() throws Exception { 114 brokerService.start(); 115 } 116 117 @Override 118 public void stop() throws Exception { 119 brokerService.stop(); 120 } 121 122 @Override 123 public void restart() throws Exception { 124 if (brokerService.isRestartAllowed()) { 125 brokerService.requestRestart(); 126 brokerService.stop(); 127 } else { 128 throw new Exception("Restart is not allowed"); 129 } 130 } 131 132 @Override 133 public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) 134 throws Exception { 135 brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval); 136 } 137 138 @Override 139 public long getTotalEnqueueCount() { 140 return safeGetBroker().getDestinationStatistics().getEnqueues().getCount(); 141 } 142 143 @Override 144 public long getTotalDequeueCount() { 145 return safeGetBroker().getDestinationStatistics().getDequeues().getCount(); 146 } 147 148 @Override 149 public long getTotalConsumerCount() { 150 return safeGetBroker().getDestinationStatistics().getConsumers().getCount(); 151 } 152 153 @Override 154 public long getTotalProducerCount() { 155 return safeGetBroker().getDestinationStatistics().getProducers().getCount(); 156 } 157 158 @Override 159 public long getTotalMessageCount() { 160 return safeGetBroker().getDestinationStatistics().getMessages().getCount(); 161 } 162 163 /** 164 * @return the average size of a message (bytes) 165 */ 166 @Override 167 public long getAverageMessageSize() { 168 // we are okay with the size without decimals so cast to long 169 return (long) safeGetBroker().getDestinationStatistics().getMessageSize().getAverageSize(); 170 } 171 172 /** 173 * @return the max size of a message (bytes) 174 */ 175 @Override 176 public long getMaxMessageSize() { 177 return safeGetBroker().getDestinationStatistics().getMessageSize().getMaxSize(); 178 } 179 180 /** 181 * @return the min size of a message (bytes) 182 */ 183 @Override 184 public long getMinMessageSize() { 185 return safeGetBroker().getDestinationStatistics().getMessageSize().getMinSize(); 186 } 187 188 public long getTotalMessagesCached() { 189 return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount(); 190 } 191 192 @Override 193 public int getMemoryPercentUsage() { 194 return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage(); 195 } 196 197 @Override 198 public long getMemoryLimit() { 199 return brokerService.getSystemUsage().getMemoryUsage().getLimit(); 200 } 201 202 @Override 203 public void setMemoryLimit(long limit) { 204 brokerService.getSystemUsage().getMemoryUsage().setLimit(limit); 205 } 206 207 @Override 208 public long getStoreLimit() { 209 return brokerService.getSystemUsage().getStoreUsage().getLimit(); 210 } 211 212 @Override 213 public int getStorePercentUsage() { 214 return brokerService.getSystemUsage().getStoreUsage().getPercentUsage(); 215 } 216 217 @Override 218 public long getTempLimit() { 219 return brokerService.getSystemUsage().getTempUsage().getLimit(); 220 } 221 222 @Override 223 public int getTempPercentUsage() { 224 return brokerService.getSystemUsage().getTempUsage().getPercentUsage(); 225 } 226 227 @Override 228 public long getJobSchedulerStoreLimit() { 229 return brokerService.getSystemUsage().getJobSchedulerUsage().getLimit(); 230 } 231 232 @Override 233 public int getJobSchedulerStorePercentUsage() { 234 return brokerService.getSystemUsage().getJobSchedulerUsage().getPercentUsage(); 235 } 236 237 @Override 238 public void setStoreLimit(long limit) { 239 brokerService.getSystemUsage().getStoreUsage().setLimit(limit); 240 } 241 242 @Override 243 public void setTempLimit(long limit) { 244 brokerService.getSystemUsage().getTempUsage().setLimit(limit); 245 } 246 247 @Override 248 public void setJobSchedulerStoreLimit(long limit) { 249 brokerService.getSystemUsage().getJobSchedulerUsage().setLimit(limit); 250 } 251 252 @Override 253 public void resetStatistics() { 254 safeGetBroker().getDestinationStatistics().reset(); 255 } 256 257 @Override 258 public void enableStatistics() { 259 safeGetBroker().getDestinationStatistics().setEnabled(true); 260 } 261 262 @Override 263 public void disableStatistics() { 264 safeGetBroker().getDestinationStatistics().setEnabled(false); 265 } 266 267 @Override 268 public boolean isStatisticsEnabled() { 269 return safeGetBroker().getDestinationStatistics().isEnabled(); 270 } 271 272 @Override 273 public boolean isPersistent() { 274 return brokerService.isPersistent(); 275 } 276 277 @Override 278 public void terminateJVM(int exitCode) { 279 System.exit(exitCode); 280 } 281 282 @Override 283 public ObjectName[] getTopics() { 284 return safeGetBroker().getTopicsNonSuppressed(); 285 } 286 287 @Override 288 public ObjectName[] getQueues() { 289 return safeGetBroker().getQueuesNonSuppressed(); 290 } 291 292 @Override 293 public ObjectName[] getTemporaryTopics() { 294 return safeGetBroker().getTemporaryTopicsNonSuppressed(); 295 } 296 297 @Override 298 public ObjectName[] getTemporaryQueues() { 299 return safeGetBroker().getTemporaryQueuesNonSuppressed(); 300 } 301 302 @Override 303 public ObjectName[] getTopicSubscribers() { 304 return safeGetBroker().getTopicSubscribersNonSuppressed(); 305 } 306 307 @Override 308 public ObjectName[] getDurableTopicSubscribers() { 309 return safeGetBroker().getDurableTopicSubscribersNonSuppressed(); 310 } 311 312 @Override 313 public ObjectName[] getQueueSubscribers() { 314 return safeGetBroker().getQueueSubscribersNonSuppressed(); 315 } 316 317 @Override 318 public ObjectName[] getTemporaryTopicSubscribers() { 319 return safeGetBroker().getTemporaryTopicSubscribersNonSuppressed(); 320 } 321 322 @Override 323 public ObjectName[] getTemporaryQueueSubscribers() { 324 return safeGetBroker().getTemporaryQueueSubscribersNonSuppressed(); 325 } 326 327 @Override 328 public ObjectName[] getInactiveDurableTopicSubscribers() { 329 return safeGetBroker().getInactiveDurableTopicSubscribersNonSuppressed(); 330 } 331 332 @Override 333 public ObjectName[] getTopicProducers() { 334 return safeGetBroker().getTopicProducersNonSuppressed(); 335 } 336 337 @Override 338 public ObjectName[] getQueueProducers() { 339 return safeGetBroker().getQueueProducersNonSuppressed(); 340 } 341 342 @Override 343 public ObjectName[] getTemporaryTopicProducers() { 344 return safeGetBroker().getTemporaryTopicProducersNonSuppressed(); 345 } 346 347 @Override 348 public ObjectName[] getTemporaryQueueProducers() { 349 return safeGetBroker().getTemporaryQueueProducersNonSuppressed(); 350 } 351 352 @Override 353 public ObjectName[] getDynamicDestinationProducers() { 354 return safeGetBroker().getDynamicDestinationProducersNonSuppressed(); 355 } 356 357 @Override 358 public String addConnector(String discoveryAddress) throws Exception { 359 TransportConnector connector = brokerService.addConnector(discoveryAddress); 360 if (connector == null) { 361 throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress); 362 } 363 brokerService.startTransportConnector(connector); 364 return connector.getName(); 365 } 366 367 @Override 368 public String addNetworkConnector(String discoveryAddress) throws Exception { 369 NetworkConnector connector = brokerService.addNetworkConnector(discoveryAddress); 370 if (connector == null) { 371 throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress); 372 } 373 brokerService.registerNetworkConnectorMBean(connector); 374 connector.start(); 375 return connector.getName(); 376 } 377 378 @Override 379 public boolean removeConnector(String connectorName) throws Exception { 380 TransportConnector connector = brokerService.getConnectorByName(connectorName); 381 if (connector == null) { 382 throw new NoSuchElementException("Not connector matched the given name: " + connectorName); 383 } 384 connector.stop(); 385 return brokerService.removeConnector(connector); 386 } 387 388 @Override 389 public boolean removeNetworkConnector(String connectorName) throws Exception { 390 NetworkConnector connector = brokerService.getNetworkConnectorByName(connectorName); 391 if (connector == null) { 392 throw new NoSuchElementException("Not connector matched the given name: " + connectorName); 393 } 394 connector.stop(); 395 return brokerService.removeNetworkConnector(connector); 396 } 397 398 @Override 399 public void addTopic(String name) throws Exception { 400 safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name),true); 401 } 402 403 @Override 404 public void addQueue(String name) throws Exception { 405 safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name),true); 406 } 407 408 @Override 409 public void removeTopic(String name) throws Exception { 410 safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name), 1000); 411 } 412 413 @Override 414 public void removeQueue(String name) throws Exception { 415 safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name), 1000); 416 } 417 418 @Override 419 public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName, 420 String selector) throws Exception { 421 ConnectionContext context = getConnectionContext(); 422 context.setBroker(safeGetBroker()); 423 context.setClientId(clientId); 424 ConsumerInfo info = new ConsumerInfo(); 425 ConsumerId consumerId = new ConsumerId(); 426 consumerId.setConnectionId(clientId); 427 consumerId.setSessionId(sessionIdCounter.incrementAndGet()); 428 consumerId.setValue(0); 429 info.setConsumerId(consumerId); 430 info.setDestination(new ActiveMQTopic(topicName)); 431 info.setSubscriptionName(subscriberName); 432 info.setSelector(selector); 433 Subscription subscription = safeGetBroker().addConsumer(context, info); 434 safeGetBroker().removeConsumer(context, info); 435 if (subscription != null) { 436 return subscription.getObjectName(); 437 } 438 return null; 439 } 440 441 @Override 442 public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception { 443 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); 444 info.setClientId(clientId); 445 info.setSubscriptionName(subscriberName); 446 ConnectionContext context = getConnectionContext(); 447 context.setBroker(safeGetBroker()); 448 context.setClientId(clientId); 449 brokerService.getBroker().removeSubscription(context, info); 450 } 451 452 @Override 453 public void reloadLog4jProperties() throws Throwable { 454 Log4JConfigView.doReloadLog4jProperties(); 455 } 456 457 @Override 458 public Map<String, String> getTransportConnectors() { 459 Map<String, String> answer = new HashMap<String, String>(); 460 try { 461 for (TransportConnector connector : brokerService.getTransportConnectors()) { 462 answer.put(connector.getName(), connector.getConnectUri().toString()); 463 } 464 } catch (Exception e) { 465 LOG.debug("Failed to read URI to build transport connectors map", e); 466 } 467 return answer; 468 } 469 470 @Override 471 public String getTransportConnectorByType(String type) { 472 return brokerService.getTransportConnectorURIsAsMap().get(type); 473 } 474 475 @Override 476 @Deprecated 477 /** 478 * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)} 479 */ 480 public String getOpenWireURL() { 481 String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp"); 482 return answer != null ? answer : ""; 483 } 484 485 @Override 486 @Deprecated 487 /** 488 * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)} 489 */ 490 public String getStompURL() { 491 String answer = brokerService.getTransportConnectorURIsAsMap().get("stomp"); 492 return answer != null ? answer : ""; 493 } 494 495 @Override 496 @Deprecated 497 /** 498 * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)} 499 */ 500 public String getSslURL() { 501 String answer = brokerService.getTransportConnectorURIsAsMap().get("ssl"); 502 return answer != null ? answer : ""; 503 } 504 505 @Override 506 @Deprecated 507 /** 508 * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)} 509 */ 510 public String getStompSslURL() { 511 String answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl"); 512 return answer != null ? answer : ""; 513 } 514 515 @Override 516 public String getVMURL() { 517 URI answer = brokerService.getVmConnectorURI(); 518 return answer != null ? answer.toString() : ""; 519 } 520 521 @Override 522 public String getDataDirectory() { 523 File file = brokerService.getDataDirectoryFile(); 524 try { 525 return file != null ? file.getCanonicalPath():""; 526 } catch (IOException e) { 527 return ""; 528 } 529 } 530 531 @Override 532 public ObjectName getJMSJobScheduler() { 533 return this.jmsJobScheduler; 534 } 535 536 public void setJMSJobScheduler(ObjectName name) { 537 this.jmsJobScheduler=name; 538 } 539 540 @Override 541 public boolean isSlave() { 542 return brokerService.isSlave(); 543 } 544 545 private ManagedRegionBroker safeGetBroker() { 546 if (broker == null) { 547 throw new IllegalStateException("Broker is not yet started."); 548 } 549 550 return broker; 551 } 552 553 private ConnectionContext getConnectionContext() { 554 ConnectionContext context; 555 if(broker == null) { 556 context = new ConnectionContext(); 557 558 } 559 else { 560 ConnectionContext sharedContext = BrokerSupport.getConnectionContext(broker.getContextBroker()); 561 //Make a local copy of the sharedContext. We do this because we do not want to set a clientId on the 562 //global sharedContext. Taking a copy of the sharedContext is a good way to make sure that we are not 563 //messing up the shared context 564 context = sharedContext.copy(); 565 } 566 567 return context; 568 } 569}