001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.EOFException;
020import java.io.IOException;
021import java.net.SocketException;
022import java.net.URI;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Properties;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.CopyOnWriteArrayList;
032import java.util.concurrent.CountDownLatch;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicReference;
037import java.util.concurrent.locks.ReentrantReadWriteLock;
038
039import javax.transaction.xa.XAResource;
040
041import org.apache.activemq.advisory.AdvisorySupport;
042import org.apache.activemq.broker.region.ConnectionStatistics;
043import org.apache.activemq.broker.region.RegionBroker;
044import org.apache.activemq.command.ActiveMQDestination;
045import org.apache.activemq.command.BrokerInfo;
046import org.apache.activemq.command.Command;
047import org.apache.activemq.command.CommandTypes;
048import org.apache.activemq.command.ConnectionControl;
049import org.apache.activemq.command.ConnectionError;
050import org.apache.activemq.command.ConnectionId;
051import org.apache.activemq.command.ConnectionInfo;
052import org.apache.activemq.command.ConsumerControl;
053import org.apache.activemq.command.ConsumerId;
054import org.apache.activemq.command.ConsumerInfo;
055import org.apache.activemq.command.ControlCommand;
056import org.apache.activemq.command.DataArrayResponse;
057import org.apache.activemq.command.DestinationInfo;
058import org.apache.activemq.command.ExceptionResponse;
059import org.apache.activemq.command.FlushCommand;
060import org.apache.activemq.command.IntegerResponse;
061import org.apache.activemq.command.KeepAliveInfo;
062import org.apache.activemq.command.Message;
063import org.apache.activemq.command.MessageAck;
064import org.apache.activemq.command.MessageDispatch;
065import org.apache.activemq.command.MessageDispatchNotification;
066import org.apache.activemq.command.MessagePull;
067import org.apache.activemq.command.ProducerAck;
068import org.apache.activemq.command.ProducerId;
069import org.apache.activemq.command.ProducerInfo;
070import org.apache.activemq.command.RemoveInfo;
071import org.apache.activemq.command.RemoveSubscriptionInfo;
072import org.apache.activemq.command.Response;
073import org.apache.activemq.command.SessionId;
074import org.apache.activemq.command.SessionInfo;
075import org.apache.activemq.command.ShutdownInfo;
076import org.apache.activemq.command.TransactionId;
077import org.apache.activemq.command.TransactionInfo;
078import org.apache.activemq.command.WireFormatInfo;
079import org.apache.activemq.network.DemandForwardingBridge;
080import org.apache.activemq.network.MBeanNetworkListener;
081import org.apache.activemq.network.NetworkBridgeConfiguration;
082import org.apache.activemq.network.NetworkBridgeFactory;
083import org.apache.activemq.security.MessageAuthorizationPolicy;
084import org.apache.activemq.state.CommandVisitor;
085import org.apache.activemq.state.ConnectionState;
086import org.apache.activemq.state.ConsumerState;
087import org.apache.activemq.state.ProducerState;
088import org.apache.activemq.state.SessionState;
089import org.apache.activemq.state.TransactionState;
090import org.apache.activemq.thread.Task;
091import org.apache.activemq.thread.TaskRunner;
092import org.apache.activemq.thread.TaskRunnerFactory;
093import org.apache.activemq.transaction.Transaction;
094import org.apache.activemq.transport.DefaultTransportListener;
095import org.apache.activemq.transport.ResponseCorrelator;
096import org.apache.activemq.transport.TransmitCallback;
097import org.apache.activemq.transport.Transport;
098import org.apache.activemq.transport.TransportDisposedIOException;
099import org.apache.activemq.util.IntrospectionSupport;
100import org.apache.activemq.util.MarshallingSupport;
101import org.slf4j.Logger;
102import org.slf4j.LoggerFactory;
103import org.slf4j.MDC;
104
105public class TransportConnection implements Connection, Task, CommandVisitor {
106    private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
107    private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
108    private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
109    // Keeps track of the broker and connector that created this connection.
110    protected final Broker broker;
111    protected final TransportConnector connector;
112    // Keeps track of the state of the connections.
113    // protected final ConcurrentHashMap localConnectionStates=new
114    // ConcurrentHashMap();
115    protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
116    // The broker and wireformat info that was exchanged.
117    protected BrokerInfo brokerInfo;
118    protected final List<Command> dispatchQueue = new LinkedList<Command>();
119    protected TaskRunner taskRunner;
120    protected final AtomicReference<Throwable> transportException = new AtomicReference<Throwable>();
121    protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
122    private final Transport transport;
123    private MessageAuthorizationPolicy messageAuthorizationPolicy;
124    private WireFormatInfo wireFormatInfo;
125    // Used to do async dispatch.. this should perhaps be pushed down into the
126    // transport layer..
127    private boolean inServiceException;
128    private final ConnectionStatistics statistics = new ConnectionStatistics();
129    private boolean manageable;
130    private boolean slow;
131    private boolean markedCandidate;
132    private boolean blockedCandidate;
133    private boolean blocked;
134    private boolean connected;
135    private boolean active;
136    private boolean starting;
137    private boolean pendingStop;
138    private long timeStamp;
139    private final AtomicBoolean stopping = new AtomicBoolean(false);
140    private final CountDownLatch stopped = new CountDownLatch(1);
141    private final AtomicBoolean asyncException = new AtomicBoolean(false);
142    private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
143    private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
144    private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
145    private ConnectionContext context;
146    private boolean networkConnection;
147    private boolean faultTolerantConnection;
148    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
149    private DemandForwardingBridge duplexBridge;
150    private final TaskRunnerFactory taskRunnerFactory;
151    private final TaskRunnerFactory stopTaskRunnerFactory;
152    private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
153    private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
154    private String duplexNetworkConnectorId;
155
156    /**
157     * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
158     *                          else commands are sent async.
159     * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection.
160     */
161    public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
162                               TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) {
163        this.connector = connector;
164        this.broker = broker;
165        RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
166        brokerConnectionStates = rb.getConnectionStates();
167        if (connector != null) {
168            this.statistics.setParent(connector.getStatistics());
169            this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
170        }
171        this.taskRunnerFactory = taskRunnerFactory;
172        this.stopTaskRunnerFactory = stopTaskRunnerFactory;
173        this.transport = transport;
174        final BrokerService brokerService = this.broker.getBrokerService();
175        if( this.transport instanceof BrokerServiceAware ) {
176            ((BrokerServiceAware)this.transport).setBrokerService(brokerService);
177        }
178        this.transport.setTransportListener(new DefaultTransportListener() {
179            @Override
180            public void onCommand(Object o) {
181                serviceLock.readLock().lock();
182                try {
183                    if (!(o instanceof Command)) {
184                        throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
185                    }
186                    Command command = (Command) o;
187                    if (!brokerService.isStopping()) {
188                        Response response = service(command);
189                        if (response != null && !brokerService.isStopping()) {
190                            dispatchSync(response);
191                        }
192                    } else {
193                        throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
194                    }
195                } finally {
196                    serviceLock.readLock().unlock();
197                }
198            }
199
200            @Override
201            public void onException(IOException exception) {
202                serviceLock.readLock().lock();
203                try {
204                    serviceTransportException(exception);
205                } finally {
206                    serviceLock.readLock().unlock();
207                }
208            }
209        });
210        connected = true;
211    }
212
213    /**
214     * Returns the number of messages to be dispatched to this connection
215     *
216     * @return size of dispatch queue
217     */
218    @Override
219    public int getDispatchQueueSize() {
220        synchronized (dispatchQueue) {
221            return dispatchQueue.size();
222        }
223    }
224
225    public void serviceTransportException(IOException e) {
226        BrokerService bService = connector.getBrokerService();
227        if (bService.isShutdownOnSlaveFailure()) {
228            if (brokerInfo != null) {
229                if (brokerInfo.isSlaveBroker()) {
230                    LOG.error("Slave has exception: {} shutting down master now.", e.getMessage(), e);
231                    try {
232                        doStop();
233                        bService.stop();
234                    } catch (Exception ex) {
235                        LOG.warn("Failed to stop the master", ex);
236                    }
237                }
238            }
239        }
240        if (!stopping.get() && !pendingStop) {
241            transportException.set(e);
242            if (TRANSPORTLOG.isDebugEnabled()) {
243                TRANSPORTLOG.debug(this + " failed: " + e, e);
244            } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
245                TRANSPORTLOG.warn(this + " failed: " + e);
246            }
247            stopAsync(e);
248        }
249    }
250
251    private boolean expected(IOException e) {
252        return isStomp() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
253    }
254
255    private boolean isStomp() {
256        URI uri = connector.getUri();
257        return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1;
258    }
259
260    /**
261     * Calls the serviceException method in an async thread. Since handling a
262     * service exception closes a socket, we should not tie up broker threads
263     * since client sockets may hang or cause deadlocks.
264     */
265    @Override
266    public void serviceExceptionAsync(final IOException e) {
267        if (asyncException.compareAndSet(false, true)) {
268            new Thread("Async Exception Handler") {
269                @Override
270                public void run() {
271                    serviceException(e);
272                }
273            }.start();
274        }
275    }
276
277    /**
278     * Closes a clients connection due to a detected error. Errors are ignored
279     * if: the client is closing or broker is closing. Otherwise, the connection
280     * error transmitted to the client before stopping it's transport.
281     */
282    @Override
283    public void serviceException(Throwable e) {
284        // are we a transport exception such as not being able to dispatch
285        // synchronously to a transport
286        if (e instanceof IOException) {
287            serviceTransportException((IOException) e);
288        } else if (e.getClass() == BrokerStoppedException.class) {
289            // Handle the case where the broker is stopped
290            // But the client is still connected.
291            if (!stopping.get()) {
292                SERVICELOG.debug("Broker has been stopped.  Notifying client and closing his connection.");
293                ConnectionError ce = new ConnectionError();
294                ce.setException(e);
295                dispatchSync(ce);
296                // Record the error that caused the transport to stop
297                transportException.set(e);
298                // Wait a little bit to try to get the output buffer to flush
299                // the exception notification to the client.
300                try {
301                    Thread.sleep(500);
302                } catch (InterruptedException ie) {
303                    Thread.currentThread().interrupt();
304                }
305                // Worst case is we just kill the connection before the
306                // notification gets to him.
307                stopAsync();
308            }
309        } else if (!stopping.get() && !inServiceException) {
310            inServiceException = true;
311            try {
312                if (SERVICELOG.isDebugEnabled()) {
313                    SERVICELOG.debug("Async error occurred: " + e, e);
314                } else {
315                    SERVICELOG.warn("Async error occurred: " + e);
316                }
317                ConnectionError ce = new ConnectionError();
318                ce.setException(e);
319                if (pendingStop) {
320                    dispatchSync(ce);
321                } else {
322                    dispatchAsync(ce);
323                }
324            } finally {
325                inServiceException = false;
326            }
327        }
328    }
329
330    @Override
331    public Response service(Command command) {
332        MDC.put("activemq.connector", connector.getUri().toString());
333        Response response = null;
334        boolean responseRequired = command.isResponseRequired();
335        int commandId = command.getCommandId();
336        try {
337            if (!pendingStop) {
338                response = command.visit(this);
339            } else {
340                response = new ExceptionResponse(transportException.get());
341            }
342        } catch (Throwable e) {
343            if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
344                SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
345                        + " command: " + command + ", exception: " + e, e);
346            }
347
348            if (e instanceof SuppressReplyException || (e.getCause() instanceof SuppressReplyException)) {
349                LOG.info("Suppressing reply to: " + command + " on: " + e + ", cause: " + e.getCause());
350                responseRequired = false;
351            }
352
353            if (responseRequired) {
354                if (e instanceof SecurityException || e.getCause() instanceof SecurityException) {
355                    SERVICELOG.warn("Security Error occurred on connection to: {}, {}",
356                            transport.getRemoteAddress(), e.getMessage());
357                }
358                response = new ExceptionResponse(e);
359            } else {
360                serviceException(e);
361            }
362        }
363        if (responseRequired) {
364            if (response == null) {
365                response = new Response();
366            }
367            response.setCorrelationId(commandId);
368        }
369        // The context may have been flagged so that the response is not
370        // sent.
371        if (context != null) {
372            if (context.isDontSendReponse()) {
373                context.setDontSendReponse(false);
374                response = null;
375            }
376            context = null;
377        }
378        MDC.remove("activemq.connector");
379        return response;
380    }
381
382    @Override
383    public Response processKeepAlive(KeepAliveInfo info) throws Exception {
384        return null;
385    }
386
387    @Override
388    public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
389        broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
390        return null;
391    }
392
393    @Override
394    public Response processWireFormat(WireFormatInfo info) throws Exception {
395        wireFormatInfo = info;
396        protocolVersion.set(info.getVersion());
397        return null;
398    }
399
400    @Override
401    public Response processShutdown(ShutdownInfo info) throws Exception {
402        stopAsync();
403        return null;
404    }
405
406    @Override
407    public Response processFlush(FlushCommand command) throws Exception {
408        return null;
409    }
410
411    @Override
412    public Response processBeginTransaction(TransactionInfo info) throws Exception {
413        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
414        context = null;
415        if (cs != null) {
416            context = cs.getContext();
417        }
418        if (cs == null) {
419            throw new NullPointerException("Context is null");
420        }
421        // Avoid replaying dup commands
422        if (cs.getTransactionState(info.getTransactionId()) == null) {
423            cs.addTransactionState(info.getTransactionId());
424            broker.beginTransaction(context, info.getTransactionId());
425        }
426        return null;
427    }
428
429    @Override
430    public int getActiveTransactionCount() {
431        int rc = 0;
432        for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) {
433            Collection<TransactionState> transactions = cs.getTransactionStates();
434            for (TransactionState transaction : transactions) {
435                rc++;
436            }
437        }
438        return rc;
439    }
440
441    @Override
442    public Long getOldestActiveTransactionDuration() {
443        TransactionState oldestTX = null;
444        for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) {
445            Collection<TransactionState> transactions = cs.getTransactionStates();
446            for (TransactionState transaction : transactions) {
447                if( oldestTX ==null || oldestTX.getCreatedAt() < transaction.getCreatedAt() ) {
448                    oldestTX = transaction;
449                }
450            }
451        }
452        if( oldestTX == null ) {
453            return null;
454        }
455        return System.currentTimeMillis() - oldestTX.getCreatedAt();
456    }
457
458    @Override
459    public Response processEndTransaction(TransactionInfo info) throws Exception {
460        // No need to do anything. This packet is just sent by the client
461        // make sure he is synced with the server as commit command could
462        // come from a different connection.
463        return null;
464    }
465
466    @Override
467    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
468        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
469        context = null;
470        if (cs != null) {
471            context = cs.getContext();
472        }
473        if (cs == null) {
474            throw new NullPointerException("Context is null");
475        }
476        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
477        if (transactionState == null) {
478            throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
479                    + info.getTransactionId());
480        }
481        // Avoid dups.
482        if (!transactionState.isPrepared()) {
483            transactionState.setPrepared(true);
484            int result = broker.prepareTransaction(context, info.getTransactionId());
485            transactionState.setPreparedResult(result);
486            if (result == XAResource.XA_RDONLY) {
487                // we are done, no further rollback or commit from TM
488                cs.removeTransactionState(info.getTransactionId());
489            }
490            IntegerResponse response = new IntegerResponse(result);
491            return response;
492        } else {
493            IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
494            return response;
495        }
496    }
497
498    @Override
499    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
500        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
501        context = cs.getContext();
502        cs.removeTransactionState(info.getTransactionId());
503        broker.commitTransaction(context, info.getTransactionId(), true);
504        return null;
505    }
506
507    @Override
508    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
509        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
510        context = cs.getContext();
511        cs.removeTransactionState(info.getTransactionId());
512        broker.commitTransaction(context, info.getTransactionId(), false);
513        return null;
514    }
515
516    @Override
517    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
518        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
519        context = cs.getContext();
520        cs.removeTransactionState(info.getTransactionId());
521        broker.rollbackTransaction(context, info.getTransactionId());
522        return null;
523    }
524
525    @Override
526    public Response processForgetTransaction(TransactionInfo info) throws Exception {
527        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
528        context = cs.getContext();
529        broker.forgetTransaction(context, info.getTransactionId());
530        return null;
531    }
532
533    @Override
534    public Response processRecoverTransactions(TransactionInfo info) throws Exception {
535        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
536        context = cs.getContext();
537        TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
538        return new DataArrayResponse(preparedTransactions);
539    }
540
541    @Override
542    public Response processMessage(Message messageSend) throws Exception {
543        ProducerId producerId = messageSend.getProducerId();
544        ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
545        if (producerExchange.canDispatch(messageSend)) {
546            broker.send(producerExchange, messageSend);
547        }
548        return null;
549    }
550
551    @Override
552    public Response processMessageAck(MessageAck ack) throws Exception {
553        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
554        if (consumerExchange != null) {
555            broker.acknowledge(consumerExchange, ack);
556        } else if (ack.isInTransaction()) {
557            LOG.warn("no matching consumer, ignoring ack {}", consumerExchange, ack);
558        }
559        return null;
560    }
561
562    @Override
563    public Response processMessagePull(MessagePull pull) throws Exception {
564        return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
565    }
566
567    @Override
568    public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
569        broker.processDispatchNotification(notification);
570        return null;
571    }
572
573    @Override
574    public Response processAddDestination(DestinationInfo info) throws Exception {
575        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
576        broker.addDestinationInfo(cs.getContext(), info);
577        if (info.getDestination().isTemporary()) {
578            cs.addTempDestination(info);
579        }
580        return null;
581    }
582
583    @Override
584    public Response processRemoveDestination(DestinationInfo info) throws Exception {
585        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
586        broker.removeDestinationInfo(cs.getContext(), info);
587        if (info.getDestination().isTemporary()) {
588            cs.removeTempDestination(info.getDestination());
589        }
590        return null;
591    }
592
593    @Override
594    public Response processAddProducer(ProducerInfo info) throws Exception {
595        SessionId sessionId = info.getProducerId().getParentId();
596        ConnectionId connectionId = sessionId.getParentId();
597        TransportConnectionState cs = lookupConnectionState(connectionId);
598        if (cs == null) {
599            throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: "
600                    + connectionId);
601        }
602        SessionState ss = cs.getSessionState(sessionId);
603        if (ss == null) {
604            throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
605                    + sessionId);
606        }
607        // Avoid replaying dup commands
608        if (!ss.getProducerIds().contains(info.getProducerId())) {
609            ActiveMQDestination destination = info.getDestination();
610            // Do not check for null here as it would cause the count of max producers to exclude
611            // anonymous producers.  The isAdvisoryTopic method checks for null so it is safe to
612            // call it from here with a null Destination value.
613            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
614                if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){
615                    throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection());
616                }
617            }
618            broker.addProducer(cs.getContext(), info);
619            try {
620                ss.addProducer(info);
621            } catch (IllegalStateException e) {
622                broker.removeProducer(cs.getContext(), info);
623            }
624
625        }
626        return null;
627    }
628
629    @Override
630    public Response processRemoveProducer(ProducerId id) throws Exception {
631        SessionId sessionId = id.getParentId();
632        ConnectionId connectionId = sessionId.getParentId();
633        TransportConnectionState cs = lookupConnectionState(connectionId);
634        SessionState ss = cs.getSessionState(sessionId);
635        if (ss == null) {
636            throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
637                    + sessionId);
638        }
639        ProducerState ps = ss.removeProducer(id);
640        if (ps == null) {
641            throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
642        }
643        removeProducerBrokerExchange(id);
644        broker.removeProducer(cs.getContext(), ps.getInfo());
645        return null;
646    }
647
648    @Override
649    public Response processAddConsumer(ConsumerInfo info) throws Exception {
650        SessionId sessionId = info.getConsumerId().getParentId();
651        ConnectionId connectionId = sessionId.getParentId();
652        TransportConnectionState cs = lookupConnectionState(connectionId);
653        if (cs == null) {
654            throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: "
655                    + connectionId);
656        }
657        SessionState ss = cs.getSessionState(sessionId);
658        if (ss == null) {
659            throw new IllegalStateException(broker.getBrokerName()
660                    + " Cannot add a consumer to a session that had not been registered: " + sessionId);
661        }
662        // Avoid replaying dup commands
663        if (!ss.getConsumerIds().contains(info.getConsumerId())) {
664            ActiveMQDestination destination = info.getDestination();
665            if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
666                if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){
667                    throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection());
668                }
669            }
670
671            broker.addConsumer(cs.getContext(), info);
672            try {
673                ss.addConsumer(info);
674                addConsumerBrokerExchange(info.getConsumerId());
675            } catch (IllegalStateException e) {
676                broker.removeConsumer(cs.getContext(), info);
677            }
678
679        }
680        return null;
681    }
682
683    @Override
684    public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
685        SessionId sessionId = id.getParentId();
686        ConnectionId connectionId = sessionId.getParentId();
687        TransportConnectionState cs = lookupConnectionState(connectionId);
688        if (cs == null) {
689            throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
690                    + connectionId);
691        }
692        SessionState ss = cs.getSessionState(sessionId);
693        if (ss == null) {
694            throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
695                    + sessionId);
696        }
697        ConsumerState consumerState = ss.removeConsumer(id);
698        if (consumerState == null) {
699            throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
700        }
701        ConsumerInfo info = consumerState.getInfo();
702        info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
703        broker.removeConsumer(cs.getContext(), consumerState.getInfo());
704        removeConsumerBrokerExchange(id);
705        return null;
706    }
707
708    @Override
709    public Response processAddSession(SessionInfo info) throws Exception {
710        ConnectionId connectionId = info.getSessionId().getParentId();
711        TransportConnectionState cs = lookupConnectionState(connectionId);
712        // Avoid replaying dup commands
713        if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
714            broker.addSession(cs.getContext(), info);
715            try {
716                cs.addSession(info);
717            } catch (IllegalStateException e) {
718                LOG.warn("Failed to add session: {}", info.getSessionId(), e);
719                broker.removeSession(cs.getContext(), info);
720            }
721        }
722        return null;
723    }
724
725    @Override
726    public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
727        ConnectionId connectionId = id.getParentId();
728        TransportConnectionState cs = lookupConnectionState(connectionId);
729        if (cs == null) {
730            throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
731        }
732        SessionState session = cs.getSessionState(id);
733        if (session == null) {
734            throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
735        }
736        // Don't let new consumers or producers get added while we are closing
737        // this down.
738        session.shutdown();
739        // Cascade the connection stop to the consumers and producers.
740        for (ConsumerId consumerId : session.getConsumerIds()) {
741            try {
742                processRemoveConsumer(consumerId, lastDeliveredSequenceId);
743            } catch (Throwable e) {
744                LOG.warn("Failed to remove consumer: {}", consumerId, e);
745            }
746        }
747        for (ProducerId producerId : session.getProducerIds()) {
748            try {
749                processRemoveProducer(producerId);
750            } catch (Throwable e) {
751                LOG.warn("Failed to remove producer: {}", producerId, e);
752            }
753        }
754        cs.removeSession(id);
755        broker.removeSession(cs.getContext(), session.getInfo());
756        return null;
757    }
758
759    @Override
760    public Response processAddConnection(ConnectionInfo info) throws Exception {
761        // Older clients should have been defaulting this field to true.. but
762        // they were not.
763        if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
764            info.setClientMaster(true);
765        }
766        TransportConnectionState state;
767        // Make sure 2 concurrent connections by the same ID only generate 1
768        // TransportConnectionState object.
769        synchronized (brokerConnectionStates) {
770            state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
771            if (state == null) {
772                state = new TransportConnectionState(info, this);
773                brokerConnectionStates.put(info.getConnectionId(), state);
774            }
775            state.incrementReference();
776        }
777        // If there are 2 concurrent connections for the same connection id,
778        // then last one in wins, we need to sync here
779        // to figure out the winner.
780        synchronized (state.getConnectionMutex()) {
781            if (state.getConnection() != this) {
782                LOG.debug("Killing previous stale connection: {}", state.getConnection().getRemoteAddress());
783                state.getConnection().stop();
784                LOG.debug("Connection {} taking over previous connection: {}", getRemoteAddress(), state.getConnection().getRemoteAddress());
785                state.setConnection(this);
786                state.reset(info);
787            }
788        }
789        registerConnectionState(info.getConnectionId(), state);
790        LOG.debug("Setting up new connection id: {}, address: {}, info: {}", new Object[]{ info.getConnectionId(), getRemoteAddress(), info });
791        this.faultTolerantConnection = info.isFaultTolerant();
792        // Setup the context.
793        String clientId = info.getClientId();
794        context = new ConnectionContext();
795        context.setBroker(broker);
796        context.setClientId(clientId);
797        context.setClientMaster(info.isClientMaster());
798        context.setConnection(this);
799        context.setConnectionId(info.getConnectionId());
800        context.setConnector(connector);
801        context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
802        context.setNetworkConnection(networkConnection);
803        context.setFaultTolerant(faultTolerantConnection);
804        context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
805        context.setUserName(info.getUserName());
806        context.setWireFormatInfo(wireFormatInfo);
807        context.setReconnect(info.isFailoverReconnect());
808        this.manageable = info.isManageable();
809        context.setConnectionState(state);
810        state.setContext(context);
811        state.setConnection(this);
812        if (info.getClientIp() == null) {
813            info.setClientIp(getRemoteAddress());
814        }
815
816        try {
817            broker.addConnection(context, info);
818        } catch (Exception e) {
819            synchronized (brokerConnectionStates) {
820                brokerConnectionStates.remove(info.getConnectionId());
821            }
822            unregisterConnectionState(info.getConnectionId());
823            LOG.warn("Failed to add Connection {} due to {}", info.getConnectionId(), e);
824            if (e instanceof SecurityException) {
825                // close this down - in case the peer of this transport doesn't play nice
826                delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
827            }
828            throw e;
829        }
830        if (info.isManageable()) {
831            // send ConnectionCommand
832            ConnectionControl command = this.connector.getConnectionControl();
833            command.setFaultTolerant(broker.isFaultTolerantConfiguration());
834            if (info.isFailoverReconnect()) {
835                command.setRebalanceConnection(false);
836            }
837            dispatchAsync(command);
838        }
839        return null;
840    }
841
842    @Override
843    public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
844            throws InterruptedException {
845        LOG.debug("remove connection id: {}", id);
846        TransportConnectionState cs = lookupConnectionState(id);
847        if (cs != null) {
848            // Don't allow things to be added to the connection state while we
849            // are shutting down.
850            cs.shutdown();
851            // Cascade the connection stop to the sessions.
852            for (SessionId sessionId : cs.getSessionIds()) {
853                try {
854                    processRemoveSession(sessionId, lastDeliveredSequenceId);
855                } catch (Throwable e) {
856                    SERVICELOG.warn("Failed to remove session {}", sessionId, e);
857                }
858            }
859            // Cascade the connection stop to temp destinations.
860            for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) {
861                DestinationInfo di = iter.next();
862                try {
863                    broker.removeDestination(cs.getContext(), di.getDestination(), 0);
864                } catch (Throwable e) {
865                    SERVICELOG.warn("Failed to remove tmp destination {}", di.getDestination(), e);
866                }
867                iter.remove();
868            }
869            try {
870                broker.removeConnection(cs.getContext(), cs.getInfo(), transportException.get());
871            } catch (Throwable e) {
872                SERVICELOG.warn("Failed to remove connection {}", cs.getInfo(), e);
873            }
874            TransportConnectionState state = unregisterConnectionState(id);
875            if (state != null) {
876                synchronized (brokerConnectionStates) {
877                    // If we are the last reference, we should remove the state
878                    // from the broker.
879                    if (state.decrementReference() == 0) {
880                        brokerConnectionStates.remove(id);
881                    }
882                }
883            }
884        }
885        return null;
886    }
887
888    @Override
889    public Response processProducerAck(ProducerAck ack) throws Exception {
890        // A broker should not get ProducerAck messages.
891        return null;
892    }
893
894    @Override
895    public Connector getConnector() {
896        return connector;
897    }
898
899    @Override
900    public void dispatchSync(Command message) {
901        try {
902            processDispatch(message);
903        } catch (IOException e) {
904            serviceExceptionAsync(e);
905        }
906    }
907
908    @Override
909    public void dispatchAsync(Command message) {
910        if (!stopping.get()) {
911            if (taskRunner == null) {
912                dispatchSync(message);
913            } else {
914                synchronized (dispatchQueue) {
915                    dispatchQueue.add(message);
916                }
917                try {
918                    taskRunner.wakeup();
919                } catch (InterruptedException e) {
920                    Thread.currentThread().interrupt();
921                }
922            }
923        } else {
924            if (message.isMessageDispatch()) {
925                MessageDispatch md = (MessageDispatch) message;
926                TransmitCallback sub = md.getTransmitCallback();
927                broker.postProcessDispatch(md);
928                if (sub != null) {
929                    sub.onFailure();
930                }
931            }
932        }
933    }
934
935    protected void processDispatch(Command command) throws IOException {
936        MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
937        try {
938            if (!stopping.get()) {
939                if (messageDispatch != null) {
940                    try {
941                        broker.preProcessDispatch(messageDispatch);
942                    } catch (RuntimeException convertToIO) {
943                        throw new IOException(convertToIO);
944                    }
945                }
946                dispatch(command);
947            }
948        } catch (IOException e) {
949            if (messageDispatch != null) {
950                TransmitCallback sub = messageDispatch.getTransmitCallback();
951                broker.postProcessDispatch(messageDispatch);
952                if (sub != null) {
953                    sub.onFailure();
954                }
955                messageDispatch = null;
956                throw e;
957            }
958        } finally {
959            if (messageDispatch != null) {
960                TransmitCallback sub = messageDispatch.getTransmitCallback();
961                broker.postProcessDispatch(messageDispatch);
962                if (sub != null) {
963                    sub.onSuccess();
964                }
965            }
966        }
967    }
968
969    @Override
970    public boolean iterate() {
971        try {
972            if (pendingStop || stopping.get()) {
973                if (dispatchStopped.compareAndSet(false, true)) {
974                    if (transportException.get() == null) {
975                        try {
976                            dispatch(new ShutdownInfo());
977                        } catch (Throwable ignore) {
978                        }
979                    }
980                    dispatchStoppedLatch.countDown();
981                }
982                return false;
983            }
984            if (!dispatchStopped.get()) {
985                Command command = null;
986                synchronized (dispatchQueue) {
987                    if (dispatchQueue.isEmpty()) {
988                        return false;
989                    }
990                    command = dispatchQueue.remove(0);
991                }
992                processDispatch(command);
993                return true;
994            }
995            return false;
996        } catch (IOException e) {
997            if (dispatchStopped.compareAndSet(false, true)) {
998                dispatchStoppedLatch.countDown();
999            }
1000            serviceExceptionAsync(e);
1001            return false;
1002        }
1003    }
1004
1005    /**
1006     * Returns the statistics for this connection
1007     */
1008    @Override
1009    public ConnectionStatistics getStatistics() {
1010        return statistics;
1011    }
1012
1013    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1014        return messageAuthorizationPolicy;
1015    }
1016
1017    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1018        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1019    }
1020
1021    @Override
1022    public boolean isManageable() {
1023        return manageable;
1024    }
1025
1026    @Override
1027    public void start() throws Exception {
1028        try {
1029            synchronized (this) {
1030                starting = true;
1031                if (taskRunnerFactory != null) {
1032                    taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
1033                            + getRemoteAddress());
1034                } else {
1035                    taskRunner = null;
1036                }
1037                transport.start();
1038                active = true;
1039                BrokerInfo info = connector.getBrokerInfo().copy();
1040                if (connector.isUpdateClusterClients()) {
1041                    info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
1042                } else {
1043                    info.setPeerBrokerInfos(null);
1044                }
1045                dispatchAsync(info);
1046
1047                connector.onStarted(this);
1048            }
1049        } catch (Exception e) {
1050            // Force clean up on an error starting up.
1051            pendingStop = true;
1052            throw e;
1053        } finally {
1054            // stop() can be called from within the above block,
1055            // but we want to be sure start() completes before
1056            // stop() runs, so queue the stop until right now:
1057            setStarting(false);
1058            if (isPendingStop()) {
1059                LOG.debug("Calling the delayed stop() after start() {}", this);
1060                stop();
1061            }
1062        }
1063    }
1064
1065    @Override
1066    public void stop() throws Exception {
1067        // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory)
1068        // as their lifecycle is handled elsewhere
1069
1070        stopAsync();
1071        while (!stopped.await(5, TimeUnit.SECONDS)) {
1072            LOG.info("The connection to '{}' is taking a long time to shutdown.", transport.getRemoteAddress());
1073        }
1074    }
1075
1076    public void delayedStop(final int waitTime, final String reason, Throwable cause) {
1077        if (waitTime > 0) {
1078            synchronized (this) {
1079                pendingStop = true;
1080                transportException.set(cause);
1081            }
1082            try {
1083                stopTaskRunnerFactory.execute(new Runnable() {
1084                    @Override
1085                    public void run() {
1086                        try {
1087                            Thread.sleep(waitTime);
1088                            stopAsync();
1089                            LOG.info("Stopping {} because {}", transport.getRemoteAddress(), reason);
1090                        } catch (InterruptedException e) {
1091                        }
1092                    }
1093                });
1094            } catch (Throwable t) {
1095                LOG.warn("Cannot create stopAsync. This exception will be ignored.", t);
1096            }
1097        }
1098    }
1099
1100    public void stopAsync(Throwable cause) {
1101        transportException.set(cause);
1102        stopAsync();
1103    }
1104
1105    public void stopAsync() {
1106        // If we're in the middle of starting then go no further... for now.
1107        synchronized (this) {
1108            pendingStop = true;
1109            if (starting) {
1110                LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
1111                return;
1112            }
1113        }
1114        if (stopping.compareAndSet(false, true)) {
1115            // Let all the connection contexts know we are shutting down
1116            // so that in progress operations can notice and unblock.
1117            List<TransportConnectionState> connectionStates = listConnectionStates();
1118            for (TransportConnectionState cs : connectionStates) {
1119                ConnectionContext connectionContext = cs.getContext();
1120                if (connectionContext != null) {
1121                    connectionContext.getStopping().set(true);
1122                }
1123            }
1124            try {
1125                stopTaskRunnerFactory.execute(new Runnable() {
1126                    @Override
1127                    public void run() {
1128                        serviceLock.writeLock().lock();
1129                        try {
1130                            doStop();
1131                        } catch (Throwable e) {
1132                            LOG.debug("Error occurred while shutting down a connection {}", this, e);
1133                        } finally {
1134                            stopped.countDown();
1135                            serviceLock.writeLock().unlock();
1136                        }
1137                    }
1138                });
1139            } catch (Throwable t) {
1140                LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t);
1141                stopped.countDown();
1142            }
1143        }
1144    }
1145
1146    @Override
1147    public String toString() {
1148        return "Transport Connection to: " + transport.getRemoteAddress();
1149    }
1150
1151    protected void doStop() throws Exception {
1152        LOG.debug("Stopping connection: {}", transport.getRemoteAddress());
1153        connector.onStopped(this);
1154        try {
1155            synchronized (this) {
1156                if (duplexBridge != null) {
1157                    duplexBridge.stop();
1158                }
1159            }
1160        } catch (Exception ignore) {
1161            LOG.trace("Exception caught stopping. This exception is ignored.", ignore);
1162        }
1163        try {
1164            transport.stop();
1165            LOG.debug("Stopped transport: {}", transport.getRemoteAddress());
1166        } catch (Exception e) {
1167            LOG.debug("Could not stop transport to {}. This exception is ignored.", transport.getRemoteAddress(), e);
1168        }
1169        if (taskRunner != null) {
1170            taskRunner.shutdown(1);
1171            taskRunner = null;
1172        }
1173        active = false;
1174        // Run the MessageDispatch callbacks so that message references get
1175        // cleaned up.
1176        synchronized (dispatchQueue) {
1177            for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
1178                Command command = iter.next();
1179                if (command.isMessageDispatch()) {
1180                    MessageDispatch md = (MessageDispatch) command;
1181                    TransmitCallback sub = md.getTransmitCallback();
1182                    broker.postProcessDispatch(md);
1183                    if (sub != null) {
1184                        sub.onFailure();
1185                    }
1186                }
1187            }
1188            dispatchQueue.clear();
1189        }
1190        //
1191        // Remove all logical connection associated with this connection
1192        // from the broker.
1193        if (!broker.isStopped()) {
1194            List<TransportConnectionState> connectionStates = listConnectionStates();
1195            connectionStates = listConnectionStates();
1196            for (TransportConnectionState cs : connectionStates) {
1197                cs.getContext().getStopping().set(true);
1198                try {
1199                    LOG.debug("Cleaning up connection resources: {}", getRemoteAddress());
1200                    processRemoveConnection(cs.getInfo().getConnectionId(), RemoveInfo.LAST_DELIVERED_UNKNOWN);
1201                } catch (Throwable ignore) {
1202                    LOG.debug("Exception caught removing connection {}. This exception is ignored.", cs.getInfo().getConnectionId(), ignore);
1203                }
1204            }
1205        }
1206        LOG.debug("Connection Stopped: {}", getRemoteAddress());
1207    }
1208
1209    /**
1210     * @return Returns the blockedCandidate.
1211     */
1212    public boolean isBlockedCandidate() {
1213        return blockedCandidate;
1214    }
1215
1216    /**
1217     * @param blockedCandidate The blockedCandidate to set.
1218     */
1219    public void setBlockedCandidate(boolean blockedCandidate) {
1220        this.blockedCandidate = blockedCandidate;
1221    }
1222
1223    /**
1224     * @return Returns the markedCandidate.
1225     */
1226    public boolean isMarkedCandidate() {
1227        return markedCandidate;
1228    }
1229
1230    /**
1231     * @param markedCandidate The markedCandidate to set.
1232     */
1233    public void setMarkedCandidate(boolean markedCandidate) {
1234        this.markedCandidate = markedCandidate;
1235        if (!markedCandidate) {
1236            timeStamp = 0;
1237            blockedCandidate = false;
1238        }
1239    }
1240
1241    /**
1242     * @param slow The slow to set.
1243     */
1244    public void setSlow(boolean slow) {
1245        this.slow = slow;
1246    }
1247
1248    /**
1249     * @return true if the Connection is slow
1250     */
1251    @Override
1252    public boolean isSlow() {
1253        return slow;
1254    }
1255
1256    /**
1257     * @return true if the Connection is potentially blocked
1258     */
1259    public boolean isMarkedBlockedCandidate() {
1260        return markedCandidate;
1261    }
1262
1263    /**
1264     * Mark the Connection, so we can deem if it's collectable on the next sweep
1265     */
1266    public void doMark() {
1267        if (timeStamp == 0) {
1268            timeStamp = System.currentTimeMillis();
1269        }
1270    }
1271
1272    /**
1273     * @return if after being marked, the Connection is still writing
1274     */
1275    @Override
1276    public boolean isBlocked() {
1277        return blocked;
1278    }
1279
1280    /**
1281     * @return true if the Connection is connected
1282     */
1283    @Override
1284    public boolean isConnected() {
1285        return connected;
1286    }
1287
1288    /**
1289     * @param blocked The blocked to set.
1290     */
1291    public void setBlocked(boolean blocked) {
1292        this.blocked = blocked;
1293    }
1294
1295    /**
1296     * @param connected The connected to set.
1297     */
1298    public void setConnected(boolean connected) {
1299        this.connected = connected;
1300    }
1301
1302    /**
1303     * @return true if the Connection is active
1304     */
1305    @Override
1306    public boolean isActive() {
1307        return active;
1308    }
1309
1310    /**
1311     * @param active The active to set.
1312     */
1313    public void setActive(boolean active) {
1314        this.active = active;
1315    }
1316
1317    /**
1318     * @return true if the Connection is starting
1319     */
1320    public synchronized boolean isStarting() {
1321        return starting;
1322    }
1323
1324    @Override
1325    public synchronized boolean isNetworkConnection() {
1326        return networkConnection;
1327    }
1328
1329    @Override
1330    public boolean isFaultTolerantConnection() {
1331        return this.faultTolerantConnection;
1332    }
1333
1334    protected synchronized void setStarting(boolean starting) {
1335        this.starting = starting;
1336    }
1337
1338    /**
1339     * @return true if the Connection needs to stop
1340     */
1341    public synchronized boolean isPendingStop() {
1342        return pendingStop;
1343    }
1344
1345    protected synchronized void setPendingStop(boolean pendingStop) {
1346        this.pendingStop = pendingStop;
1347    }
1348
1349    @Override
1350    public Response processBrokerInfo(BrokerInfo info) {
1351        if (info.isSlaveBroker()) {
1352            LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName());
1353        } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1354            // so this TransportConnection is the rear end of a network bridge
1355            // We have been requested to create a two way pipe ...
1356            try {
1357                Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1358                Map<String, String> props = createMap(properties);
1359                NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1360                IntrospectionSupport.setProperties(config, props, "");
1361                config.setBrokerName(broker.getBrokerName());
1362
1363                // check for existing duplex connection hanging about
1364
1365                // We first look if existing network connection already exists for the same broker Id and network connector name
1366                // It's possible in case of brief network fault to have this transport connector side of the connection always active
1367                // and the duplex network connector side wanting to open a new one
1368                // In this case, the old connection must be broken
1369                String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
1370                CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1371                synchronized (connections) {
1372                    for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) {
1373                        TransportConnection c = iter.next();
1374                        if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1375                            LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId);
1376                            c.stopAsync();
1377                            // better to wait for a bit rather than get connection id already in use and failure to start new bridge
1378                            c.getStopped().await(1, TimeUnit.SECONDS);
1379                        }
1380                    }
1381                    setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1382                }
1383                Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker);
1384                Transport remoteBridgeTransport = transport;
1385                if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
1386                    // the vm transport case is already wrapped
1387                    remoteBridgeTransport = new ResponseCorrelator(remoteBridgeTransport);
1388                }
1389                String duplexName = localTransport.toString();
1390                if (duplexName.contains("#")) {
1391                    duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
1392                }
1393                MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), config, broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
1394                listener.setCreatedByDuplex(true);
1395                duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
1396                duplexBridge.setBrokerService(broker.getBrokerService());
1397                // now turn duplex off this side
1398                info.setDuplexConnection(false);
1399                duplexBridge.setCreatedByDuplex(true);
1400                duplexBridge.duplexStart(this, brokerInfo, info);
1401                LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId);
1402                return null;
1403            } catch (TransportDisposedIOException e) {
1404                LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId);
1405                return null;
1406            } catch (Exception e) {
1407                LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e);
1408                return null;
1409            }
1410        }
1411        // We only expect to get one broker info command per connection
1412        if (this.brokerInfo != null) {
1413            LOG.warn("Unexpected extra broker info command received: {}", info);
1414        }
1415        this.brokerInfo = info;
1416        networkConnection = true;
1417        List<TransportConnectionState> connectionStates = listConnectionStates();
1418        for (TransportConnectionState cs : connectionStates) {
1419            cs.getContext().setNetworkConnection(true);
1420        }
1421        return null;
1422    }
1423
1424    @SuppressWarnings({"unchecked", "rawtypes"})
1425    private HashMap<String, String> createMap(Properties properties) {
1426        return new HashMap(properties);
1427    }
1428
1429    protected void dispatch(Command command) throws IOException {
1430        try {
1431            setMarkedCandidate(true);
1432            transport.oneway(command);
1433        } finally {
1434            setMarkedCandidate(false);
1435        }
1436    }
1437
1438    @Override
1439    public String getRemoteAddress() {
1440        return transport.getRemoteAddress();
1441    }
1442
1443    public Transport getTransport() {
1444        return transport;
1445    }
1446
1447    @Override
1448    public String getConnectionId() {
1449        List<TransportConnectionState> connectionStates = listConnectionStates();
1450        for (TransportConnectionState cs : connectionStates) {
1451            if (cs.getInfo().getClientId() != null) {
1452                return cs.getInfo().getClientId();
1453            }
1454            return cs.getInfo().getConnectionId().toString();
1455        }
1456        return null;
1457    }
1458
1459    @Override
1460    public void updateClient(ConnectionControl control) {
1461        if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
1462                && this.wireFormatInfo.getVersion() >= 6) {
1463            dispatchAsync(control);
1464        }
1465    }
1466
1467    public ProducerBrokerExchange getProducerBrokerExchangeIfExists(ProducerInfo producerInfo){
1468        ProducerBrokerExchange result = null;
1469        if (producerInfo != null && producerInfo.getProducerId() != null){
1470            synchronized (producerExchanges){
1471                result = producerExchanges.get(producerInfo.getProducerId());
1472            }
1473        }
1474        return result;
1475    }
1476
1477    private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
1478        ProducerBrokerExchange result = producerExchanges.get(id);
1479        if (result == null) {
1480            synchronized (producerExchanges) {
1481                result = new ProducerBrokerExchange();
1482                TransportConnectionState state = lookupConnectionState(id);
1483                context = state.getContext();
1484                result.setConnectionContext(context);
1485                if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) {
1486                    result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
1487                }
1488                SessionState ss = state.getSessionState(id.getParentId());
1489                if (ss != null) {
1490                    result.setProducerState(ss.getProducerState(id));
1491                    ProducerState producerState = ss.getProducerState(id);
1492                    if (producerState != null && producerState.getInfo() != null) {
1493                        ProducerInfo info = producerState.getInfo();
1494                        result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
1495                    }
1496                }
1497                producerExchanges.put(id, result);
1498            }
1499        } else {
1500            context = result.getConnectionContext();
1501        }
1502        return result;
1503    }
1504
1505    private void removeProducerBrokerExchange(ProducerId id) {
1506        synchronized (producerExchanges) {
1507            producerExchanges.remove(id);
1508        }
1509    }
1510
1511    private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
1512        ConsumerBrokerExchange result = consumerExchanges.get(id);
1513        return result;
1514    }
1515
1516    private ConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) {
1517        ConsumerBrokerExchange result = consumerExchanges.get(id);
1518        if (result == null) {
1519            synchronized (consumerExchanges) {
1520                result = new ConsumerBrokerExchange();
1521                TransportConnectionState state = lookupConnectionState(id);
1522                context = state.getContext();
1523                result.setConnectionContext(context);
1524                SessionState ss = state.getSessionState(id.getParentId());
1525                if (ss != null) {
1526                    ConsumerState cs = ss.getConsumerState(id);
1527                    if (cs != null) {
1528                        ConsumerInfo info = cs.getInfo();
1529                        if (info != null) {
1530                            if (info.getDestination() != null && info.getDestination().isPattern()) {
1531                                result.setWildcard(true);
1532                            }
1533                        }
1534                    }
1535                }
1536                consumerExchanges.put(id, result);
1537            }
1538        }
1539        return result;
1540    }
1541
1542    private void removeConsumerBrokerExchange(ConsumerId id) {
1543        synchronized (consumerExchanges) {
1544            consumerExchanges.remove(id);
1545        }
1546    }
1547
1548    public int getProtocolVersion() {
1549        return protocolVersion.get();
1550    }
1551
1552    @Override
1553    public Response processControlCommand(ControlCommand command) throws Exception {
1554        return null;
1555    }
1556
1557    @Override
1558    public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
1559        return null;
1560    }
1561
1562    @Override
1563    public Response processConnectionControl(ConnectionControl control) throws Exception {
1564        if (control != null) {
1565            faultTolerantConnection = control.isFaultTolerant();
1566        }
1567        return null;
1568    }
1569
1570    @Override
1571    public Response processConnectionError(ConnectionError error) throws Exception {
1572        return null;
1573    }
1574
1575    @Override
1576    public Response processConsumerControl(ConsumerControl control) throws Exception {
1577        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
1578        broker.processConsumerControl(consumerExchange, control);
1579        return null;
1580    }
1581
1582    protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
1583                                                                            TransportConnectionState state) {
1584        TransportConnectionState cs = null;
1585        if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
1586            // swap implementations
1587            TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
1588            newRegister.intialize(connectionStateRegister);
1589            connectionStateRegister = newRegister;
1590        }
1591        cs = connectionStateRegister.registerConnectionState(connectionId, state);
1592        return cs;
1593    }
1594
1595    protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
1596        return connectionStateRegister.unregisterConnectionState(connectionId);
1597    }
1598
1599    protected synchronized List<TransportConnectionState> listConnectionStates() {
1600        return connectionStateRegister.listConnectionStates();
1601    }
1602
1603    protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
1604        return connectionStateRegister.lookupConnectionState(connectionId);
1605    }
1606
1607    protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
1608        return connectionStateRegister.lookupConnectionState(id);
1609    }
1610
1611    protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
1612        return connectionStateRegister.lookupConnectionState(id);
1613    }
1614
1615    protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
1616        return connectionStateRegister.lookupConnectionState(id);
1617    }
1618
1619    // public only for testing
1620    public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
1621        return connectionStateRegister.lookupConnectionState(connectionId);
1622    }
1623
1624    protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
1625        this.duplexNetworkConnectorId = duplexNetworkConnectorId;
1626    }
1627
1628    protected synchronized String getDuplexNetworkConnectorId() {
1629        return this.duplexNetworkConnectorId;
1630    }
1631
1632    public boolean isStopping() {
1633        return stopping.get();
1634    }
1635
1636    protected CountDownLatch getStopped() {
1637        return stopped;
1638    }
1639
1640    private int getProducerCount(ConnectionId connectionId) {
1641        int result = 0;
1642        TransportConnectionState cs = lookupConnectionState(connectionId);
1643        if (cs != null) {
1644            for (SessionId sessionId : cs.getSessionIds()) {
1645                SessionState sessionState = cs.getSessionState(sessionId);
1646                if (sessionState != null) {
1647                    result += sessionState.getProducerIds().size();
1648                }
1649            }
1650        }
1651        return result;
1652    }
1653
1654    private int getConsumerCount(ConnectionId connectionId) {
1655        int result = 0;
1656        TransportConnectionState cs = lookupConnectionState(connectionId);
1657        if (cs != null) {
1658            for (SessionId sessionId : cs.getSessionIds()) {
1659                SessionState sessionState = cs.getSessionState(sessionId);
1660                if (sessionState != null) {
1661                    result += sessionState.getConsumerIds().size();
1662                }
1663            }
1664        }
1665        return result;
1666    }
1667
1668    public WireFormatInfo getRemoteWireFormatInfo() {
1669        return wireFormatInfo;
1670    }
1671}