View Javadoc

1   /*
2    * Copyright (c) 2002 Peter Antman, Teknik i Media  <peter.antman@tim.se>
3    *
4    * $Id: JmsBase.java,v 1.2 2004/05/25 11:22:47 pra Exp $
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 2 of the License, or (at your option) any later version
10   * 
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   * 
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, write to the Free Software
18   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19   */
20  package org.backsource.adaptor.jms;
21  import javax.jms.*;
22  
23  import javax.naming.Context;
24  import javax.naming.InitialContext;
25  import javax.naming.Name;
26  import javax.naming.NamingException;
27  
28  import javax.management.ObjectName;
29  import javax.management.MBeanServer;
30  import javax.management.MalformedObjectNameException;
31  
32  import org.jboss.jms.jndi.JMSProviderAdapter;
33  
34  import org.backsource.jmx.ServiceMBeanSupport;
35  import org.backsource.jmx.ObjectNameFactory;
36  /***
37   * Base MBean for JMS MBeans.
38   *
39   * <p>This baseclass handles the common attributes; ands setups of connectiona dn topic. One instance of an MBean of this type, handles only one connection and one destination.</p>
40   *
41   * @author <a href="mailto:pra@tim.se">Peter Antman</a>
42   * @version $Revision: 1.2 $
43   * @jmx:mbean name="jms:service=Jms,name=default" extends="org.backsource.jmx.ServiceMBean"
44   */
45  
46  public class JmsBase extends ServiceMBeanSupport
47     implements org.backsource.adaptor.jms.JmsBaseMBean{
48     public static final ObjectName OBJECT_NAME = ObjectNameFactory.create("jms:service=Jms,name=default");
49  
50     protected ObjectName name = OBJECT_NAME;
51     protected MBeanServer server;
52     
53     //-- All config attributes
54     private String providerAdapterJNDI;
55     private String destinationFactoryJNDI;
56     private String destinationType;
57     private String destinationJndiName;
58     private String user;
59     private String passwd;
60     private String clientId;
61  
62     private long reconnectInterval=1000*5;//5 sec
63  
64     protected boolean failsafe = true;
65     protected boolean stopOnFail = true;
66     protected boolean isTopic = true;
67     protected boolean transacted = false;
68     protected int ackMode = Session.AUTO_ACKNOWLEDGE;
69     
70     protected Connection connection;
71     protected Destination destination;
72     protected ExceptionListenerImpl exListener;
73     volatile protected boolean connectionException = false;
74  
75     public JmsBase (){
76        
77     }
78     protected ObjectName getObjectName(MBeanServer server, ObjectName name)
79        throws MalformedObjectNameException
80     {
81        this.server = server;
82        if ( name != null) {
83           this.name = name;
84        } // end of if ()
85        
86        return this.name;
87     }
88  
89      /***
90      * @jmx:managed-attribute
91      */
92     public String getJMSProviderAdapterJNDI() {
93        return providerAdapterJNDI;
94     }
95     /***
96      * @jmx:managed-attribute
97      */
98     public void setJMSProviderAdapterJNDI(String providerAdapterJNDI) {
99        if ( providerAdapterJNDI != null) {
100          if (!providerAdapterJNDI.startsWith("java:/"))
101             {
102                providerAdapterJNDI = "java:/" + providerAdapterJNDI;
103             }
104       } // end of if ()
105       this.providerAdapterJNDI = providerAdapterJNDI;
106    }
107    /***
108     * @jmx:managed-attribute
109     */
110    public String getDestinationFactory() {
111       return destinationFactoryJNDI;
112    }
113    /***
114     * @jmx:managed-attribute
115     */
116    public void setDestinationFactory(String destinationFactoryJNDI) {
117       this.destinationFactoryJNDI = destinationFactoryJNDI;
118    }
119    /***
120     * @jmx:managed-attribute
121     */ 
122    public String getDestinationType()
123    {
124       return destinationType;
125    }
126    /***
127     * @jmx:managed-attribute
128     */
129    public void setDestinationType(String destinationType) {
130       if ( destinationType != null) {
131          isTopic = "javax.jms.topic".equals(destinationType.toLowerCase());
132       } // end of if ()
133       
134       this.destinationType = destinationType;
135    }
136    /***
137     * @jmx:managed-attribute
138     */
139    public String getDestinationJndiName()
140    {
141       return destinationJndiName;
142    }
143    /***
144     * @jmx:managed-attribute
145     */
146    public void setDestinationJndiName(String destinationJndiName) {
147       this.destinationJndiName = destinationJndiName;
148    }
149    /***
150     * @jmx:managed-attribute
151     */
152    public String getUsername()
153    {
154       return user;
155    }
156    /***
157     * @jmx:managed-attribute
158     */
159    public void setUsername(String user) {
160       this.user = user;
161    }
162    /***
163     * @jmx:managed-attribute
164     */
165    public String getPassword()
166    {
167       return passwd;
168    }
169    /***
170     * @jmx:managed-attribute
171     */
172    public void setPassword(String passwd) {
173       this.passwd = passwd;
174    }
175    /***
176     * @jmx:managed-attribute
177     */
178    public String getClientId()
179    {
180       return clientId;
181    }
182    /***
183     * @jmx:managed-attribute
184     */
185    public void setClientId(String clientId) {
186       this.clientId = clientId;
187    }
188       /***
189     * @jmx:managed-attribute
190     */
191    public boolean getFailsafe() {
192       return failsafe;
193    }
194    /***
195     * @jmx:managed-attribute
196     */
197    public void setFailsafe(boolean failsafe) {
198       this.failsafe = failsafe;
199    }
200    /***
201     * @jmx:managed-attribute
202     */
203    public boolean getStopOnFail() {
204       return stopOnFail;
205    }
206    /***
207     * @jmx:managed-attribute
208     */
209    public void setStopOnFail(boolean stopOnFail) {
210       this.stopOnFail = stopOnFail;
211    }
212 
213    /***
214     * @jmx:managed-attribute
215     */
216    public long getReconnectInterval() {
217       return reconnectInterval;
218    }
219    /***
220     * @jmx:managed-attribute
221     */
222    public void setReconnectInterval(long reconnectInterval) {
223       this.reconnectInterval = reconnectInterval;
224    }
225 
226    // ---- MBean lifecykle methods ----------------
227    protected void createService() throws Exception
228    {      
229       exListener = new ExceptionListenerImpl(this);
230 
231    }
232 
233    protected void startService() throws Exception {
234       
235       innerCreate();
236       if (connection != null)
237          {
238             if ( failsafe) {
239                connection.setExceptionListener(exListener);
240             } // end of if ()
241             
242             
243             connection.start();
244          }
245    }
246 
247    protected void stopService() throws Exception
248    {
249       log.info("Stopping JMS service");
250       // Silence the exception listener
251       if (exListener != null)
252       {
253          exListener.stop();
254       }
255       
256       innerStop();
257       
258    }
259    
260    protected void destroyService() throws Exception
261    {  
262       // close the connection
263       if (connection != null)
264          {
265             try
266                {
267                   log.info("Closing down connection");
268                   // Dont close - for example sonic on exception
269                   if ( connectionException && !stopOnFail) {
270                      log.warn("ConnectionException and stop on fail is false; skip closing connection");
271                   }else {
272                      connection.close();
273                   } // end of else
274                   
275                   connection = null;
276                   
277                }
278             catch (Exception e)
279                {
280                   log.error("Failed to close connection: "+e);
281                }
282          }
283    }
284 
285    //---------------- JMS related helper methods ----------------------
286    protected void innerCreate() throws JMSException {
287       if ( isTopic) {
288          startTopic();
289       } else {
290          startQueue();
291       } // end of else
292       connectionException = false;
293    }
294    /***
295     * Stop done from inside, we should not stop the
296     * exceptionListener in inner stop.
297     */
298    protected void innerStop() throws JMSException
299    {
300       if (connection != null)
301       {
302          connection.setExceptionListener(null);
303          log.debug("Unset exception listener");
304          if ( connectionException && !stopOnFail) {
305             log.warn("ConnectionException and stop on fail is false; skip closing connection");
306          }else {
307             try {
308                connection.stop();
309             } catch (Exception e) {
310                log.error("Could not stop connection: " +e);
311             } // end of try-catch
312 
313          } // end of else
314          
315          log.debug("Connection stopped");
316       }
317    }
318    protected void startTopic() throws JMSException {
319       TopicConnectionFactory factory = (TopicConnectionFactory)getFactory();
320       TopicConnection  myconnection ;
321       if (user == null) {
322          myconnection =
323             factory.createTopicConnection();
324       }
325       else {
326          myconnection =factory.createTopicConnection(user, passwd);
327       }
328       connection = myconnection;
329       log.debug("Using connection: " + connection);
330 
331       log.debug("Using client id: " + clientId);
332       if (clientId != null && clientId.length() > 0)
333          connection.setClientID(clientId);
334 
335       destination = getDestination();
336             
337    }
338 
339    protected void startQueue() throws JMSException {
340       QueueConnectionFactory factory = (QueueConnectionFactory)getFactory();
341       QueueConnection myconnection; 
342          if (user == null) {
343             myconnection =
344                factory.createQueueConnection();
345          }
346          else {
347             myconnection =factory.createQueueConnection(user, passwd);
348          }
349       connection = myconnection;
350       log.debug("Using connection: " + connection);
351 
352       log.debug("Using client id: " + clientId);
353       if (clientId != null && clientId.length() > 0)
354          connection.setClientID(clientId);
355 
356       destination = (Queue)getDestination();
357       
358    }
359 
360       
361    protected ConnectionFactory getFactory() throws JMSException {
362       try {
363 
364       String factoryRef = null;
365       if ( providerAdapterJNDI  != null) {
366          JMSProviderAdapter adapter = getJMSProviderAdapter();
367          if ( isTopic) {
368             factoryRef = adapter.getTopicFactoryRef();
369          } else {
370              factoryRef = adapter.getQueueFactoryRef();
371          } // end of else
372          
373          
374       } else if (destinationFactoryJNDI != null ) {
375          factoryRef = destinationFactoryJNDI;
376       } else {
377          
378          throw new JMSException("No factory name specifyed");
379       } // end of else
380  
381       return (ConnectionFactory)getContext().lookup(factoryRef);
382           
383       } catch (NamingException e) {
384          throw new JMSException("Could no lookup connection factory: " +e);
385       } // end of try-catch
386       
387    }
388 
389    protected Destination getDestination() throws JMSException {
390       // Lookit up.
391       try {
392                 
393          return (Destination)getContext().lookup(destinationJndiName);
394       } catch (NamingException e) {
395          throw new JMSException("Could no lookup connection destination: " +e);
396       } // end of try-catch
397 
398 
399    }
400 
401    protected Context getContext() throws JMSException {
402       try {                
403          if ( providerAdapterJNDI != null) {
404             return getJMSProviderAdapter().getInitialContext();
405          } else {         
406             return new InitialContext();
407          } // end of else
408       } catch ( NamingException e) {
409          throw new JMSException("Could get an initial context:  " + e);
410       } // end of try-catch
411       
412    }
413    
414    /***
415     * Return the JMSProviderAdapter that should be used.
416     *
417     * @return  The JMSProviderAdapter to use.
418     */
419    protected JMSProviderAdapter getJMSProviderAdapter()
420       throws JMSException
421    {
422       Context context = null;
423       try
424          {
425             context = new InitialContext();
426             log.debug("Looking up provider adapter: " + providerAdapterJNDI);
427             return (JMSProviderAdapter)context.lookup(providerAdapterJNDI);
428          }
429       catch ( NamingException e) {
430          throw new JMSException("Could no lookup provider adapeter: " +e);
431       } // end of catch
432       
433       finally
434          {
435             if ( context != null) {
436                try {
437                   context.close();
438                } catch (NamingException e) {
439                   
440                } // end of try-catch
441             } // end of if ()
442             
443          }
444    }
445 
446      /***
447     * ExceptionListener for failover handling.
448     */
449    class ExceptionListenerImpl
450       implements ExceptionListener
451    {
452       JmsBase invoker;
453       Thread currentThread;
454       volatile boolean running = true;
455       
456       ExceptionListenerImpl(final JmsBase invoker)
457       {
458          this.invoker = invoker;
459       }
460       
461       public void onException(JMSException ex)
462       {
463          connectionException = true;
464          currentThread = Thread.currentThread();
465          
466          log.warn("JMS provider failure detected: ", ex);
467          boolean restartInvoker = true;
468          
469          while (restartInvoker && running)
470             {
471             log.info("Trying to reconnect to JMS provider");
472             try
473                {
474                   try
475                      {
476                         Thread.sleep(reconnectInterval);
477                      }
478                   catch (InterruptedException ie)
479                      {
480                         return;
481                      }
482                   
483                   // Reboot container
484                   invoker.innerStop();
485                   invoker.destroyService();
486                   invoker.innerCreate();
487                   invoker.startService();
488                   
489                   // If we get this far the container is rebooted
490                   restartInvoker = false;
491                   connectionException = false;//Just to be safe, is set in innerCreate to!
492                   log.info("Reconnected to JMS provider");
493                }
494             catch (Exception e)
495                {
496                   log.error("Reconnect failed: JMS provider failure detected", e);
497                }
498             }
499          
500          currentThread = null;
501       }
502       
503       void stop()
504       {
505          log.debug("Stop requested");
506          
507          running = false;
508          if (currentThread != null)
509          {
510             currentThread.interrupt();
511             log.debug("Current thread interrupted");
512          }
513       }
514    }
515 
516 }// JmsJMX