001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.policy; 018 019import java.util.ArrayList; 020import java.util.HashMap; 021import java.util.List; 022import java.util.Map; 023import java.util.Map.Entry; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.atomic.AtomicBoolean; 026 027import org.apache.activemq.broker.Broker; 028import org.apache.activemq.broker.Connection; 029import org.apache.activemq.broker.ConnectionContext; 030import org.apache.activemq.broker.region.Destination; 031import org.apache.activemq.broker.region.Subscription; 032import org.apache.activemq.command.ConsumerControl; 033import org.apache.activemq.command.RemoveInfo; 034import org.apache.activemq.state.CommandVisitor; 035import org.apache.activemq.thread.Scheduler; 036import org.apache.activemq.transport.InactivityIOException; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * Abort slow consumers when they reach the configured threshold of slowness, default is slow for 30 seconds 042 * 043 * @org.apache.xbean.XBean 044 */ 045public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable { 046 047 private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerStrategy.class); 048 049 protected String name = "AbortSlowConsumerStrategy@" + hashCode(); 050 protected Scheduler scheduler; 051 protected Broker broker; 052 protected final AtomicBoolean taskStarted = new AtomicBoolean(false); 053 protected final Map<Subscription, SlowConsumerEntry> slowConsumers = 054 new ConcurrentHashMap<Subscription, SlowConsumerEntry>(); 055 056 private long maxSlowCount = -1; 057 private long maxSlowDuration = 30*1000; 058 private long checkPeriod = 30*1000; 059 private boolean abortConnection = false; 060 private boolean ignoreNetworkConsumers = true; 061 062 @Override 063 public void setBrokerService(Broker broker) { 064 this.scheduler = broker.getScheduler(); 065 this.broker = broker; 066 } 067 068 @Override 069 public void slowConsumer(ConnectionContext context, Subscription subs) { 070 if (maxSlowCount < 0 && maxSlowDuration < 0) { 071 // nothing to do 072 LOG.info("no limits set, slowConsumer strategy has nothing to do"); 073 return; 074 } 075 076 if (taskStarted.compareAndSet(false, true)) { 077 scheduler.executePeriodically(this, checkPeriod); 078 } 079 080 if (!slowConsumers.containsKey(subs)) { 081 slowConsumers.put(subs, new SlowConsumerEntry(context)); 082 } else if (maxSlowCount > 0) { 083 slowConsumers.get(subs).slow(); 084 } 085 } 086 087 @Override 088 public void run() { 089 if (maxSlowDuration > 0) { 090 // mark 091 for (SlowConsumerEntry entry : slowConsumers.values()) { 092 entry.mark(); 093 } 094 } 095 096 HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>(); 097 for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) { 098 Subscription subscription = entry.getKey(); 099 if (isIgnoreNetworkSubscriptions() && subscription.getConsumerInfo().isNetworkSubscription()) { 100 if (slowConsumers.remove(subscription) != null) { 101 LOG.info("network sub: {} is no longer slow", subscription.getConsumerInfo().getConsumerId()); 102 } 103 continue; 104 } 105 106 if (entry.getKey().isSlowConsumer()) { 107 if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod >= maxSlowDuration) 108 || maxSlowCount > 0 && entry.getValue().slowCount >= maxSlowCount) { 109 toAbort.put(entry.getKey(), entry.getValue()); 110 slowConsumers.remove(entry.getKey()); 111 } 112 } else { 113 LOG.info("sub: " + entry.getKey().getConsumerInfo().getConsumerId() + " is no longer slow"); 114 slowConsumers.remove(entry.getKey()); 115 } 116 } 117 118 abortSubscription(toAbort, abortConnection); 119 } 120 121 protected void abortSubscription(Map<Subscription, SlowConsumerEntry> toAbort, boolean abortSubscriberConnection) { 122 123 Map<Connection, List<Subscription>> abortMap = new HashMap<Connection, List<Subscription>>(); 124 125 for (final Entry<Subscription, SlowConsumerEntry> entry : toAbort.entrySet()) { 126 ConnectionContext connectionContext = entry.getValue().context; 127 if (connectionContext == null) { 128 continue; 129 } 130 131 Connection connection = connectionContext.getConnection(); 132 if (connection == null) { 133 LOG.debug("slowConsumer abort ignored, no connection in context:" + connectionContext); 134 } 135 136 if (!abortMap.containsKey(connection)) { 137 abortMap.put(connection, new ArrayList<Subscription>()); 138 } 139 140 abortMap.get(connection).add(entry.getKey()); 141 } 142 143 for (Entry<Connection, List<Subscription>> entry : abortMap.entrySet()) { 144 final Connection connection = entry.getKey(); 145 final List<Subscription> subscriptions = entry.getValue(); 146 147 if (abortSubscriberConnection) { 148 149 LOG.info("aborting connection:{} with {} slow consumers", 150 connection.getConnectionId(), subscriptions.size()); 151 152 if (LOG.isTraceEnabled()) { 153 for (Subscription subscription : subscriptions) { 154 LOG.trace("Connection {} being aborted because of slow consumer: {} on destination: {}", 155 new Object[] { connection.getConnectionId(), 156 subscription.getConsumerInfo().getConsumerId(), 157 subscription.getActiveMQDestination() }); 158 } 159 } 160 161 try { 162 scheduler.executeAfterDelay(new Runnable() { 163 @Override 164 public void run() { 165 connection.serviceException(new InactivityIOException( 166 subscriptions.size() + " Consumers was slow too often (>" 167 + maxSlowCount + ") or too long (>" 168 + maxSlowDuration + "): ")); 169 }}, 0l); 170 } catch (Exception e) { 171 LOG.info("exception on aborting connection {} with {} slow consumers", 172 connection.getConnectionId(), subscriptions.size()); 173 } 174 } else { 175 // just abort each consumer 176 for (Subscription subscription : subscriptions) { 177 final Subscription subToClose = subscription; 178 LOG.info("aborting slow consumer: {} for destination:{}", 179 subscription.getConsumerInfo().getConsumerId(), 180 subscription.getActiveMQDestination()); 181 182 // tell the remote consumer to close 183 try { 184 ConsumerControl stopConsumer = new ConsumerControl(); 185 stopConsumer.setConsumerId(subscription.getConsumerInfo().getConsumerId()); 186 stopConsumer.setClose(true); 187 connection.dispatchAsync(stopConsumer); 188 } catch (Exception e) { 189 LOG.info("exception on aborting slow consumer: {}", subscription.getConsumerInfo().getConsumerId(), e); 190 } 191 192 // force a local remove in case remote is unresponsive 193 try { 194 scheduler.executeAfterDelay(new Runnable() { 195 @Override 196 public void run() { 197 try { 198 RemoveInfo removeCommand = subToClose.getConsumerInfo().createRemoveCommand(); 199 if (connection instanceof CommandVisitor) { 200 // avoid service exception handling and logging 201 removeCommand.visit((CommandVisitor) connection); 202 } else { 203 connection.service(removeCommand); 204 } 205 } catch (IllegalStateException ignoredAsRemoteHasDoneTheJob) { 206 } catch (Exception e) { 207 LOG.info("exception on local remove of slow consumer: {}", subToClose.getConsumerInfo().getConsumerId(), e); 208 } 209 }}, 1000l); 210 211 } catch (Exception e) { 212 LOG.info("exception on local remove of slow consumer: {}", subscription.getConsumerInfo().getConsumerId(), e); 213 } 214 } 215 } 216 } 217 } 218 219 public void abortConsumer(Subscription sub, boolean abortSubscriberConnection) { 220 if (sub != null) { 221 SlowConsumerEntry entry = slowConsumers.remove(sub); 222 if (entry != null) { 223 Map<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>(); 224 toAbort.put(sub, entry); 225 abortSubscription(toAbort, abortSubscriberConnection); 226 } else { 227 LOG.warn("cannot abort subscription as it no longer exists in the map of slow consumers: " + sub); 228 } 229 } 230 } 231 232 public long getMaxSlowCount() { 233 return maxSlowCount; 234 } 235 236 /** 237 * number of times a subscription can be deemed slow before triggering abort 238 * effect depends on dispatch rate as slow determination is done on dispatch 239 */ 240 public void setMaxSlowCount(long maxSlowCount) { 241 this.maxSlowCount = maxSlowCount; 242 } 243 244 public long getMaxSlowDuration() { 245 return maxSlowDuration; 246 } 247 248 /** 249 * time in milliseconds that a sub can remain slow before triggering 250 * an abort. 251 * @param maxSlowDuration 252 */ 253 public void setMaxSlowDuration(long maxSlowDuration) { 254 this.maxSlowDuration = maxSlowDuration; 255 } 256 257 public long getCheckPeriod() { 258 return checkPeriod; 259 } 260 261 /** 262 * time in milliseconds between checks for slow subscriptions 263 * @param checkPeriod 264 */ 265 public void setCheckPeriod(long checkPeriod) { 266 this.checkPeriod = checkPeriod; 267 } 268 269 public boolean isAbortConnection() { 270 return abortConnection; 271 } 272 273 /** 274 * abort the consumers connection rather than sending a stop command to the remote consumer 275 * @param abortConnection 276 */ 277 public void setAbortConnection(boolean abortConnection) { 278 this.abortConnection = abortConnection; 279 } 280 281 /** 282 * Returns whether the strategy is configured to ignore subscriptions that are from a network 283 * connection. 284 * 285 * @return true if the strategy will ignore network connection subscriptions when looking 286 * for slow consumers. 287 */ 288 public boolean isIgnoreNetworkSubscriptions() { 289 return ignoreNetworkConsumers; 290 } 291 292 /** 293 * Sets whether the strategy is configured to ignore consumers that are part of a network 294 * connection to another broker. 295 * 296 * When configured to not ignore idle consumers this strategy acts not only on consumers 297 * that are actually slow but also on any consumer that has not received any messages for 298 * the maxTimeSinceLastAck. This allows for a way to evict idle consumers while also 299 * aborting slow consumers however for a network subscription this can create a lot of 300 * unnecessary churn and if the abort connection option is also enabled this can result 301 * in the entire network connection being torn down and rebuilt for no reason. 302 * 303 * @param ignoreNetworkConsumers 304 * Should this strategy ignore subscriptions made by a network connector. 305 */ 306 public void setIgnoreNetworkConsumers(boolean ignoreNetworkConsumers) { 307 this.ignoreNetworkConsumers = ignoreNetworkConsumers; 308 } 309 310 public void setName(String name) { 311 this.name = name; 312 } 313 314 public String getName() { 315 return name; 316 } 317 318 public Map<Subscription, SlowConsumerEntry> getSlowConsumers() { 319 return slowConsumers; 320 } 321 322 @Override 323 public void addDestination(Destination destination) { 324 // Not needed for this strategy. 325 } 326}