001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * <p/> 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * <p/> 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import org.apache.activemq.broker.BrokerService; 020import org.apache.activemq.broker.jmx.AnnotatedMBean; 021import org.apache.activemq.broker.jmx.BrokerMBeanSupport; 022import org.apache.activemq.broker.jmx.NetworkBridgeView; 023import org.apache.activemq.broker.jmx.NetworkDestinationView; 024import org.apache.activemq.command.ActiveMQDestination; 025import org.apache.activemq.command.Message; 026import org.apache.activemq.thread.Scheduler; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030import javax.management.ObjectName; 031import java.util.Iterator; 032import java.util.Map; 033import java.util.concurrent.ConcurrentHashMap; 034 035public class MBeanBridgeDestination { 036 private static final Logger LOG = LoggerFactory.getLogger(MBeanBridgeDestination.class); 037 private final BrokerService brokerService; 038 private final NetworkBridge bridge; 039 private final NetworkBridgeView networkBridgeView; 040 private final NetworkBridgeConfiguration networkBridgeConfiguration; 041 private final Scheduler scheduler; 042 private final Runnable purgeInactiveDestinationViewTask; 043 private final Map<ActiveMQDestination, NetworkDestinationContainer> outboundDestinationViewMap = new ConcurrentHashMap<>(); 044 private final Map<ActiveMQDestination, NetworkDestinationContainer> inboundDestinationViewMap = new ConcurrentHashMap<>(); 045 046 public MBeanBridgeDestination(BrokerService brokerService, NetworkBridgeConfiguration networkBridgeConfiguration, NetworkBridge bridge, NetworkBridgeView networkBridgeView) { 047 this.brokerService = brokerService; 048 this.networkBridgeConfiguration = networkBridgeConfiguration; 049 this.bridge = bridge; 050 this.networkBridgeView = networkBridgeView; 051 this.scheduler = brokerService.getScheduler(); 052 purgeInactiveDestinationViewTask = new Runnable() { 053 public void run() { 054 purgeInactiveDestinationViews(); 055 } 056 }; 057 } 058 059 060 public void onOutboundMessage(Message message) { 061 ActiveMQDestination destination = message.getDestination(); 062 NetworkDestinationContainer networkDestinationContainer; 063 064 if ((networkDestinationContainer = outboundDestinationViewMap.get(destination)) == null) { 065 ObjectName bridgeObjectName = bridge.getMbeanObjectName(); 066 try { 067 ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination); 068 NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName()); 069 AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName); 070 071 networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName); 072 outboundDestinationViewMap.put(destination, networkDestinationContainer); 073 networkDestinationView.messageSent(); 074 } catch (Exception e) { 075 LOG.warn("Failed to register " + destination, e); 076 } 077 } else { 078 networkDestinationContainer.view.messageSent(); 079 } 080 } 081 082 083 public void onInboundMessage(Message message) { 084 ActiveMQDestination destination = message.getDestination(); 085 NetworkDestinationContainer networkDestinationContainer; 086 087 if ((networkDestinationContainer = inboundDestinationViewMap.get(destination)) == null) { 088 ObjectName bridgeObjectName = bridge.getMbeanObjectName(); 089 try { 090 ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination); 091 NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName()); 092 AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName); 093 094 networkBridgeView.addNetworkDestinationView(networkDestinationView); 095 networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName); 096 inboundDestinationViewMap.put(destination, networkDestinationContainer); 097 networkDestinationView.messageSent(); 098 } catch (Exception e) { 099 LOG.warn("Failed to register " + destination, e); 100 } 101 } else { 102 networkDestinationContainer.view.messageSent(); 103 } 104 } 105 106 public void start() { 107 if (networkBridgeConfiguration.isGcDestinationViews()) { 108 long period = networkBridgeConfiguration.getGcSweepTime(); 109 if (period > 0) { 110 scheduler.executePeriodically(purgeInactiveDestinationViewTask, period); 111 } 112 } 113 } 114 115 public void stop() { 116 if (!brokerService.isUseJmx()) { 117 return; 118 } 119 120 scheduler.cancel(purgeInactiveDestinationViewTask); 121 for (NetworkDestinationContainer networkDestinationContainer : inboundDestinationViewMap.values()) { 122 try { 123 brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName); 124 } catch (Exception e) { 125 LOG.error("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); 126 } 127 } 128 for (NetworkDestinationContainer networkDestinationContainer : outboundDestinationViewMap.values()) { 129 try { 130 brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName); 131 } catch (Exception e) { 132 LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); 133 } 134 } 135 inboundDestinationViewMap.clear(); 136 outboundDestinationViewMap.clear(); 137 } 138 139 private void purgeInactiveDestinationViews() { 140 if (!brokerService.isUseJmx()) { 141 return; 142 } 143 purgeInactiveDestinationView(inboundDestinationViewMap); 144 purgeInactiveDestinationView(outboundDestinationViewMap); 145 } 146 147 private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationContainer> map) { 148 long time = System.currentTimeMillis() - networkBridgeConfiguration.getGcSweepTime(); 149 for (Iterator<Map.Entry<ActiveMQDestination, NetworkDestinationContainer>> it = map.entrySet().iterator(); it.hasNext(); ) { 150 Map.Entry<ActiveMQDestination, NetworkDestinationContainer> entry = it.next(); 151 if (entry.getValue().view.getLastAccessTime() <= time) { 152 ObjectName objectName = entry.getValue().objectName; 153 if (objectName != null) { 154 try { 155 brokerService.getManagementContext().unregisterMBean(entry.getValue().objectName); 156 } catch (Throwable e) { 157 LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); 158 } 159 } 160 entry.getValue().view.close(); 161 it.remove(); 162 } 163 } 164 } 165 166 private static class NetworkDestinationContainer { 167 private final NetworkDestinationView view; 168 private final ObjectName objectName; 169 170 private NetworkDestinationContainer(NetworkDestinationView view, ObjectName objectName) { 171 this.view = view; 172 this.objectName = objectName; 173 } 174 } 175}