001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.view; 018 019import java.io.IOException; 020import java.io.PrintWriter; 021import java.util.Collection; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.Iterator; 025import java.util.Map; 026import java.util.Set; 027 028import javax.management.ObjectName; 029 030import org.apache.activemq.broker.Broker; 031import org.apache.activemq.broker.ConnectionContext; 032import org.apache.activemq.broker.ProducerBrokerExchange; 033import org.apache.activemq.broker.jmx.BrokerViewMBean; 034import org.apache.activemq.broker.jmx.SubscriptionViewMBean; 035import org.apache.activemq.broker.region.Subscription; 036import org.apache.activemq.command.ActiveMQDestination; 037import org.apache.activemq.command.ConsumerInfo; 038import org.apache.activemq.command.Message; 039import org.apache.activemq.command.ProducerId; 040import org.apache.activemq.command.ProducerInfo; 041import org.apache.activemq.filter.DestinationMapNode; 042 043/** 044 * 045 */ 046public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport { 047 048 protected static final String ID_SEPARATOR = "_"; 049 050 private final boolean redrawOnRemove; 051 private boolean clearProducerCacheAfterRender; 052 private final String domain = "org.apache.activemq"; 053 private BrokerViewMBean brokerView; 054 055 // until we have some MBeans for producers, lets do it all ourselves 056 private final Map<ProducerId, ProducerInfo> producers = new HashMap<ProducerId, ProducerInfo>(); 057 private final Map<ProducerId, Set<ActiveMQDestination>> producerDestinations = new HashMap<ProducerId, Set<ActiveMQDestination>>(); 058 private final Object lock = new Object(); 059 060 public ConnectionDotFileInterceptor(Broker next, String file, boolean redrawOnRemove) throws IOException { 061 super(next, file); 062 this.redrawOnRemove = redrawOnRemove; 063 064 } 065 066 @Override 067 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 068 Subscription answer = super.addConsumer(context, info); 069 generateFile(); 070 return answer; 071 } 072 073 @Override 074 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 075 super.addProducer(context, info); 076 ProducerId producerId = info.getProducerId(); 077 synchronized (lock) { 078 producers.put(producerId, info); 079 } 080 generateFile(); 081 } 082 083 @Override 084 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 085 super.removeConsumer(context, info); 086 if (redrawOnRemove) { 087 generateFile(); 088 } 089 } 090 091 @Override 092 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 093 super.removeProducer(context, info); 094 ProducerId producerId = info.getProducerId(); 095 if (redrawOnRemove) { 096 synchronized (lock) { 097 producerDestinations.remove(producerId); 098 producers.remove(producerId); 099 } 100 generateFile(); 101 } 102 } 103 104 @Override 105 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 106 super.send(producerExchange, messageSend); 107 ProducerId producerId = messageSend.getProducerId(); 108 ActiveMQDestination destination = messageSend.getDestination(); 109 synchronized (lock) { 110 Set<ActiveMQDestination> destinations = producerDestinations.get(producerId); 111 if (destinations == null) { 112 destinations = new HashSet<ActiveMQDestination>(); 113 } 114 producerDestinations.put(producerId, destinations); 115 destinations.add(destination); 116 } 117 } 118 119 @Override 120 protected void generateFile(PrintWriter writer) throws Exception { 121 122 writer.println("digraph \"ActiveMQ Connections\" {"); 123 writer.println(); 124 writer.println("label=\"ActiveMQ Broker: " + getBrokerView().getBrokerId() + "\"];"); 125 writer.println(); 126 writer.println("node [style = \"rounded,filled\", fillcolor = yellow, fontname=\"Helvetica-Oblique\"];"); 127 writer.println(); 128 129 Map<String, String> clients = new HashMap<String, String>(); 130 Map<String, String> queues = new HashMap<String, String>(); 131 Map<String, String> topics = new HashMap<String, String>(); 132 133 printSubscribers(writer, clients, queues, "queue_", getBrokerView().getQueueSubscribers()); 134 writer.println(); 135 136 printSubscribers(writer, clients, topics, "topic_", getBrokerView().getTopicSubscribers()); 137 writer.println(); 138 139 printProducers(writer, clients, queues, topics); 140 writer.println(); 141 142 writeLabels(writer, "green", "Client: ", clients); 143 writer.println(); 144 145 writeLabels(writer, "red", "Queue: ", queues); 146 writeLabels(writer, "blue", "Topic: ", topics); 147 writer.println("}"); 148 149 if (clearProducerCacheAfterRender) { 150 producerDestinations.clear(); 151 } 152 } 153 154 protected void printProducers(PrintWriter writer, Map<String, String> clients, Map<String, String> queues, Map<String, String> topics) { 155 synchronized (lock) { 156 for (Iterator iter = producerDestinations.entrySet().iterator(); iter.hasNext();) { 157 Map.Entry entry = (Map.Entry)iter.next(); 158 ProducerId producerId = (ProducerId)entry.getKey(); 159 Set destinationSet = (Set)entry.getValue(); 160 printProducers(writer, clients, queues, topics, producerId, destinationSet); 161 } 162 } 163 } 164 165 protected void printProducers(PrintWriter writer, Map<String, String> clients, Map<String, String> queues, Map<String, String> topics, ProducerId producerId, Set destinationSet) { 166 for (Iterator iter = destinationSet.iterator(); iter.hasNext();) { 167 ActiveMQDestination destination = (ActiveMQDestination)iter.next(); 168 169 // TODO use clientId one day 170 String clientId = producerId.getConnectionId(); 171 String safeClientId = asID(clientId); 172 clients.put(safeClientId, clientId); 173 174 String physicalName = destination.getPhysicalName(); 175 String safeDestinationId = asID(physicalName); 176 if (destination.isTopic()) { 177 safeDestinationId = "topic_" + safeDestinationId; 178 topics.put(safeDestinationId, physicalName); 179 } else { 180 safeDestinationId = "queue_" + safeDestinationId; 181 queues.put(safeDestinationId, physicalName); 182 } 183 184 String safeProducerId = asID(producerId.toString()); 185 186 // lets write out the links 187 188 writer.print(safeClientId); 189 writer.print(" -> "); 190 writer.print(safeProducerId); 191 writer.println(";"); 192 193 writer.print(safeProducerId); 194 writer.print(" -> "); 195 writer.print(safeDestinationId); 196 writer.println(";"); 197 198 // now lets write out the label 199 writer.print(safeProducerId); 200 writer.print(" [label = \""); 201 String label = "Producer: " + producerId.getSessionId() + "-" + producerId.getValue(); 202 writer.print(label); 203 writer.println("\"];"); 204 205 } 206 } 207 208 protected void printSubscribers(PrintWriter writer, Map<String, String> clients, Map<String, String> destinations, String type, ObjectName[] subscribers) { 209 for (int i = 0; i < subscribers.length; i++) { 210 ObjectName name = subscribers[i]; 211 SubscriptionViewMBean subscriber = (SubscriptionViewMBean)getBrokerService().getManagementContext().newProxyInstance(name, SubscriptionViewMBean.class, true); 212 213 String clientId = subscriber.getClientId(); 214 String safeClientId = asID(clientId); 215 clients.put(safeClientId, clientId); 216 217 String destination = subscriber.getDestinationName(); 218 String safeDestinationId = type + asID(destination); 219 destinations.put(safeDestinationId, destination); 220 221 String selector = subscriber.getSelector(); 222 223 // lets write out the links 224 String subscriberId = safeClientId + "_" + subscriber.getSessionId() + "_" + subscriber.getSubscriptionId(); 225 226 writer.print(subscriberId); 227 writer.print(" -> "); 228 writer.print(safeClientId); 229 writer.println(";"); 230 231 writer.print(safeDestinationId); 232 writer.print(" -> "); 233 writer.print(subscriberId); 234 writer.println(";"); 235 236 // now lets write out the label 237 writer.print(subscriberId); 238 writer.print(" [label = \""); 239 String label = "Subscription: " + subscriber.getSessionId() + "-" + subscriber.getSubscriptionId(); 240 if (selector != null && selector.length() > 0) { 241 label = label + "\\nSelector: " + selector; 242 } 243 writer.print(label); 244 writer.println("\"];"); 245 } 246 } 247 248 protected void writeLabels(PrintWriter writer, String color, String prefix, Map<String, String> map) { 249 for (Iterator iter = map.entrySet().iterator(); iter.hasNext();) { 250 Map.Entry entry = (Map.Entry)iter.next(); 251 String id = (String)entry.getKey(); 252 String label = (String)entry.getValue(); 253 254 writer.print(id); 255 writer.print(" [ fillcolor = "); 256 writer.print(color); 257 writer.print(", label = \""); 258 writer.print(prefix); 259 writer.print(label); 260 writer.println("\"];"); 261 } 262 } 263 264 /** 265 * Lets strip out any non supported characters 266 */ 267 protected String asID(String name) { 268 StringBuffer buffer = new StringBuffer(); 269 int size = name.length(); 270 for (int i = 0; i < size; i++) { 271 char ch = name.charAt(i); 272 if (Character.isLetterOrDigit(ch) || ch == '_') { 273 buffer.append(ch); 274 } else { 275 buffer.append('_'); 276 } 277 } 278 return buffer.toString(); 279 } 280 281 protected void printNodes(PrintWriter writer, DestinationMapNode node, String prefix) { 282 String path = getPath(node); 283 writer.print(" "); 284 writer.print(prefix); 285 writer.print(ID_SEPARATOR); 286 writer.print(path); 287 String label = path; 288 if (prefix.equals("topic")) { 289 label = "Topics"; 290 } else if (prefix.equals("queue")) { 291 label = "Queues"; 292 } 293 writer.print("[ label = \""); 294 writer.print(label); 295 writer.println("\" ];"); 296 297 Collection children = node.getChildren(); 298 for (Iterator iter = children.iterator(); iter.hasNext();) { 299 DestinationMapNode child = (DestinationMapNode)iter.next(); 300 printNodes(writer, child, prefix + ID_SEPARATOR + path); 301 } 302 } 303 304 protected void printNodeLinks(PrintWriter writer, DestinationMapNode node, String prefix) { 305 String path = getPath(node); 306 Collection children = node.getChildren(); 307 for (Iterator iter = children.iterator(); iter.hasNext();) { 308 DestinationMapNode child = (DestinationMapNode)iter.next(); 309 310 writer.print(" "); 311 writer.print(prefix); 312 writer.print(ID_SEPARATOR); 313 writer.print(path); 314 writer.print(" -> "); 315 writer.print(prefix); 316 writer.print(ID_SEPARATOR); 317 writer.print(path); 318 writer.print(ID_SEPARATOR); 319 writer.print(getPath(child)); 320 writer.println(";"); 321 322 printNodeLinks(writer, child, prefix + ID_SEPARATOR + path); 323 } 324 } 325 326 protected String getPath(DestinationMapNode node) { 327 String path = node.getPath(); 328 if (path.equals("*")) { 329 return "root"; 330 } 331 return path; 332 } 333 334 BrokerViewMBean getBrokerView() throws Exception { 335 if (this.brokerView == null) { 336 ObjectName brokerName = getBrokerService().getBrokerObjectName(); 337 this.brokerView = (BrokerViewMBean) getBrokerService().getManagementContext().newProxyInstance(brokerName, 338 BrokerViewMBean.class, true); 339 } 340 return this.brokerView; 341 } 342}