001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.ra;
018
019import java.io.PrintWriter;
020import java.util.List;
021import java.util.concurrent.CopyOnWriteArrayList;
022import javax.jms.Connection;
023import javax.jms.ExceptionListener;
024import javax.jms.JMSException;
025import javax.resource.ResourceException;
026import javax.resource.spi.ConnectionEvent;
027import javax.resource.spi.ConnectionEventListener;
028import javax.resource.spi.ConnectionRequestInfo;
029import javax.resource.spi.LocalTransaction;
030import javax.resource.spi.ManagedConnection;
031import javax.resource.spi.ManagedConnectionMetaData;
032import javax.security.auth.Subject;
033import javax.transaction.xa.XAResource;
034import org.apache.activemq.ActiveMQConnection;
035import org.apache.activemq.LocalTransactionEventListener;
036import org.apache.activemq.TransactionContext;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * ActiveMQManagedConnection maps to real physical connection to the server.
042 * Since a ManagedConnection has to provide a transaction managment interface to
043 * the physical connection, and sessions are the objects implement transaction
044 * managment interfaces in the JMS API, this object also maps to a singe
045 * physical JMS session. <p/> The side-effect is that JMS connection the
046 * application gets will allways create the same session object. This is good if
047 * running in an app server since the sessions are elisted in the context
048 * transaction. This is bad if used outside of an app server since the user may
049 * be trying to create 2 different sessions to coordinate 2 different uow.
050 * 
051 * 
052 */
053public class ActiveMQManagedConnection implements ManagedConnection, ExceptionListener { // TODO:
054                                                                                            // ,
055                                                                                            // DissociatableManagedConnection
056                                                                                            // {
057
058    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQManagedConnection.class);
059
060    private PrintWriter logWriter;
061
062    private final ActiveMQConnection physicalConnection;
063    private final TransactionContext transactionContext;
064    private final List<ManagedConnectionProxy> proxyConnections = new CopyOnWriteArrayList<ManagedConnectionProxy>();
065    private final List<ConnectionEventListener> listeners = new CopyOnWriteArrayList<ConnectionEventListener>();
066    private final LocalAndXATransaction localAndXATransaction;
067
068    private Subject subject;
069    private ActiveMQConnectionRequestInfo info;
070    private boolean destroyed;
071
072    public ActiveMQManagedConnection(Subject subject, ActiveMQConnection physicalConnection, ActiveMQConnectionRequestInfo info) throws ResourceException {
073        try {
074            this.subject = subject;
075            this.info = info;
076            this.physicalConnection = physicalConnection;
077            this.transactionContext = new TransactionContext(physicalConnection);
078
079            this.localAndXATransaction = new LocalAndXATransaction(transactionContext) {
080                public void setInManagedTx(boolean inManagedTx) throws JMSException {
081                    super.setInManagedTx(inManagedTx);
082                    for (ManagedConnectionProxy proxy:proxyConnections) {
083                        proxy.setUseSharedTxContext(inManagedTx);
084                    }
085                }
086            };
087
088            this.transactionContext.setLocalTransactionEventListener(new LocalTransactionEventListener() {
089                public void beginEvent() {
090                    fireBeginEvent();
091                }
092
093                public void commitEvent() {
094                    fireCommitEvent();
095                }
096
097                public void rollbackEvent() {
098                    fireRollbackEvent();
099                }
100            });
101
102            physicalConnection.setExceptionListener(this);
103        } catch (JMSException e) {
104            throw new ResourceException("Could not create a new connection: " + e.getMessage(), e);
105        }
106    }
107
108    public boolean isInManagedTx() {
109        return localAndXATransaction.isInManagedTx();
110    }
111
112    public static boolean matches(Object x, Object y) {
113        if (x == null ^ y == null) {
114            return false;
115        }
116        if (x != null && !x.equals(y)) {
117            return false;
118        }
119        return true;
120    }
121
122    public void associate(Subject subject, ActiveMQConnectionRequestInfo info) throws JMSException {
123
124        // Do we need to change the associated userid/password
125        if (!matches(info.getUserName(), this.info.getUserName()) || !matches(info.getPassword(), this.info.getPassword())) {
126            physicalConnection.changeUserInfo(info.getUserName(), info.getPassword());
127        }
128
129        // Do we need to set the clientId?
130        if (info.getClientid() != null && info.getClientid().length() > 0) {
131            physicalConnection.setClientID(info.getClientid());
132        }
133
134        this.subject = subject;
135        this.info = info;
136    }
137
138    public Connection getPhysicalConnection() {
139        return physicalConnection;
140    }
141
142    private void fireBeginEvent() {
143        ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_STARTED);
144        for(ConnectionEventListener l:listeners) {
145            l.localTransactionStarted(event);
146        }
147    }
148
149    private void fireCommitEvent() {
150        ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_COMMITTED);
151        for(ConnectionEventListener l:listeners) {
152            l.localTransactionCommitted(event);
153        }
154    }
155
156    private void fireRollbackEvent() {
157        ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK);
158        for(ConnectionEventListener l:listeners) {
159            l.localTransactionRolledback(event);
160        }
161    }
162
163    private void fireCloseEvent(ManagedConnectionProxy proxy) {
164        ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.CONNECTION_CLOSED);
165        event.setConnectionHandle(proxy);
166
167        for(ConnectionEventListener l:listeners) {
168            l.connectionClosed(event);
169        }
170    }
171
172    private void fireErrorOccurredEvent(Exception error) {
173        ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.CONNECTION_ERROR_OCCURRED, error);
174        for(ConnectionEventListener l:listeners) {
175            l.connectionErrorOccurred(event);
176        }
177    }
178
179    /**
180     * @see javax.resource.spi.ManagedConnection#getConnection(javax.security.auth.Subject,
181     *      javax.resource.spi.ConnectionRequestInfo)
182     */
183    public Object getConnection(Subject subject, ConnectionRequestInfo info) throws ResourceException {
184        ManagedConnectionProxy proxy = new ManagedConnectionProxy(this, info);
185        proxyConnections.add(proxy);
186        return proxy;
187    }
188
189    private boolean isDestroyed() {
190        return destroyed;
191    }
192
193    /**
194     * Close down the physical connection to the server.
195     * 
196     * @see javax.resource.spi.ManagedConnection#destroy()
197     */
198    public void destroy() throws ResourceException {
199        // Have we already been destroyed??
200        if (isDestroyed()) {
201            return;
202        }
203
204        try {
205            cleanup();
206        } finally {
207            try {
208                physicalConnection.close();
209                destroyed = true;
210            } catch (JMSException e) {
211                LOG.trace("Error occurred during close of a JMS connection.", e);
212            }
213        }
214    }
215
216    /**
217     * Cleans up all proxy handles attached to this physical connection so that
218     * they cannot be used anymore.
219     * 
220     * @see javax.resource.spi.ManagedConnection#cleanup()
221     */
222    public void cleanup() throws ResourceException {
223
224        // Have we already been destroyed??
225        if (isDestroyed()) {
226            return;
227        }
228
229        for (ManagedConnectionProxy proxy:proxyConnections) {
230            proxy.cleanup();
231        }
232        proxyConnections.clear();
233
234        try {
235            physicalConnection.cleanup();
236        } catch (JMSException e) {
237            throw new ResourceException("Could not cleanup the ActiveMQ connection: " + e, e);
238        } finally {
239            // defer transaction cleanup till after close so that close is aware of the current tx
240            localAndXATransaction.cleanup();
241        }
242    }
243
244    /**
245     * @see javax.resource.spi.ManagedConnection#associateConnection(java.lang.Object)
246     */
247    public void associateConnection(Object connection) throws ResourceException {
248        if (connection instanceof ManagedConnectionProxy) {
249            ManagedConnectionProxy proxy = (ManagedConnectionProxy)connection;
250            proxyConnections.add(proxy);
251        } else {
252            throw new ResourceException("Not supported : associating connection instance of " + connection.getClass().getName());
253        }
254    }
255
256    /**
257     * @see javax.resource.spi.ManagedConnection#addConnectionEventListener(javax.resource.spi.ConnectionEventListener)
258     */
259    public void addConnectionEventListener(ConnectionEventListener listener) {
260        listeners.add(listener);
261    }
262
263    /**
264     * @see javax.resource.spi.ManagedConnection#removeConnectionEventListener(javax.resource.spi.ConnectionEventListener)
265     */
266    public void removeConnectionEventListener(ConnectionEventListener listener) {
267        listeners.remove(listener);
268    }
269
270    /**
271     * @see javax.resource.spi.ManagedConnection#getXAResource()
272     */
273    public XAResource getXAResource() throws ResourceException {
274        return localAndXATransaction;
275    }
276
277    /**
278     * @see javax.resource.spi.ManagedConnection#getLocalTransaction()
279     */
280    public LocalTransaction getLocalTransaction() throws ResourceException {
281        return localAndXATransaction;
282    }
283
284    /**
285     * @see javax.resource.spi.ManagedConnection#getMetaData()
286     */
287    public ManagedConnectionMetaData getMetaData() throws ResourceException {
288        return new ManagedConnectionMetaData() {
289
290            public String getEISProductName() throws ResourceException {
291                if (physicalConnection == null) {
292                    throw new ResourceException("Not connected.");
293                }
294                try {
295                    return physicalConnection.getMetaData().getJMSProviderName();
296                } catch (JMSException e) {
297                    throw new ResourceException("Error accessing provider.", e);
298                }
299            }
300
301            public String getEISProductVersion() throws ResourceException {
302                if (physicalConnection == null) {
303                    throw new ResourceException("Not connected.");
304                }
305                try {
306                    return physicalConnection.getMetaData().getProviderVersion();
307                } catch (JMSException e) {
308                    throw new ResourceException("Error accessing provider.", e);
309                }
310            }
311
312            public int getMaxConnections() throws ResourceException {
313                if (physicalConnection == null) {
314                    throw new ResourceException("Not connected.");
315                }
316                return Integer.MAX_VALUE;
317            }
318
319            public String getUserName() throws ResourceException {
320                if (physicalConnection == null) {
321                    throw new ResourceException("Not connected.");
322                }
323                try {
324                    return physicalConnection.getClientID();
325                } catch (JMSException e) {
326                    throw new ResourceException("Error accessing provider.", e);
327                }
328            }
329        };
330    }
331
332    /**
333     * @see javax.resource.spi.ManagedConnection#setLogWriter(java.io.PrintWriter)
334     */
335    public void setLogWriter(PrintWriter logWriter) throws ResourceException {
336        this.logWriter = logWriter;
337    }
338
339    /**
340     * @see javax.resource.spi.ManagedConnection#getLogWriter()
341     */
342    public PrintWriter getLogWriter() throws ResourceException {
343        return logWriter;
344    }
345
346    /**
347     * @param subject subject to match
348     * @param info cri to match
349     * @return whether the subject and cri match sufficiently to allow using this connection under the new circumstances
350     */
351    public boolean matches(Subject subject, ConnectionRequestInfo info) {
352        // Check to see if it is our info class
353        if (info == null) {
354            return false;
355        }
356        if (info.getClass() != ActiveMQConnectionRequestInfo.class) {
357            return false;
358        }
359
360        // Do the subjects match?
361        if (subject == null ^ this.subject == null) {
362            return false;
363        }
364        if (subject != null && !subject.equals(this.subject)) {
365            return false;
366        }
367
368        // Does the info match?
369        return info.equals(this.info);
370    }
371
372    /**
373     * When a proxy is closed this cleans up the proxy and notifies the
374     * ConnectionEventListeners that a connection closed.
375     * 
376     * @param proxy
377     */
378    public void proxyClosedEvent(ManagedConnectionProxy proxy) {
379        proxyConnections.remove(proxy);
380        proxy.cleanup();
381        fireCloseEvent(proxy);
382    }
383
384    public void onException(JMSException e) {
385        LOG.warn("Connection failed: " + e);
386        LOG.debug("Cause: ", e);
387
388        for (ManagedConnectionProxy proxy:proxyConnections) {
389            proxy.onException(e);
390        }
391        // Let the container know that the error occurred.
392        fireErrorOccurredEvent(e);
393    }
394
395    /**
396     * @return Returns the transactionContext.
397     */
398    public TransactionContext getTransactionContext() {
399        return transactionContext;
400    }
401
402    @Override
403    public String toString() {
404        return "[" + super.toString() + "," + physicalConnection +"]";
405    }
406
407}