View Javadoc

1   /*
2    * Copyright (c) 2002 Peter Antman, Teknik i Media  <peter.antman@tim.se>
3    *
4    * $Id: MessageDriven.java,v 1.1.1.1 2004/05/19 12:26:42 pra Exp $
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 2 of the License, or (at your option) any later version
10   * 
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   * 
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, write to the Free Software
18   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19   */
20  package org.backsource.adaptor.jms;
21  import java.util.Date;
22  import javax.jms.*;
23  
24  import javax.naming.Context;
25  import javax.naming.InitialContext;
26  import javax.naming.Name;
27  import javax.naming.NamingException;
28  
29  import javax.management.ObjectName;
30  import javax.management.MBeanServer;
31  import javax.management.Notification;
32  import javax.management.NotificationBroadcaster;
33  import javax.management.NotificationFilter;
34  import javax.management.NotificationFilterSupport;
35  import javax.management.MalformedObjectNameException;
36  
37  import org.jboss.jms.jndi.JMSProviderAdapter;
38  
39  import org.backsource.jmx.ServiceMBeanSupport;
40  /***
41   *
42   * @author <a href="mailto:pra@tim.se">Peter Antman</a>
43   * @version $Revision: 1.1.1.1 $
44   * @jmx:mbean name="jms:service=MessageDriven,name=default" extends="org.backsource.adaptor.jms.JmsBaseMBean"
45   */
46  
47  public class MessageDriven extends JmsBase
48     implements org.backsource.adaptor.jms.MessageDrivenMBean,MessageListener {
49  
50  
51     //-- All config attributes
52     private boolean subscriptionDurability = false;
53     private String messageSelector;
54     private String subscriptionId;
55     private boolean  pullSubscriber= false;
56     private long pullTimeput = 1000*2;
57     
58     // All possible JMS stuff
59     MessageConsumer subscriber;
60     ReceiverImpl receiver;
61  
62     public MessageDriven (){
63        
64     }
65  
66     /***
67      * @jmx:managed-attribute
68      */
69     public boolean getDurable() {
70        return subscriptionDurability;
71     }
72     /***
73      * @jmx:managed-attribute
74      */
75     public void setDurable(boolean subscriptionDurability) {
76        this.subscriptionDurability = subscriptionDurability;
77     }
78     /***
79      * @jmx:managed-attribute
80      */
81     public String getMessageSelector()
82     {
83        return messageSelector;
84     }
85     /***
86      * @jmx:managed-attribute
87      */
88     public void setMessageSelector(String messageSelector) {
89        if ( messageSelector != null) {
90           //AS Check for Carriage Returns, remove them and trim the selector
91           int i = -1;
92           // Note this only works this way because the search and replace are distinct
93           while ((i = messageSelector.indexOf("\r\n")) >= 0)
94              {
95                 // Replace \r\n by a space
96                 messageSelector = (i == 0 ? "" : messageSelector.substring(0, i)) + " " +
97                    (i >= messageSelector.length() - 2 ? "" : messageSelector.substring(i + 2));
98              }
99           i = -1;
100          
101          while (( i = messageSelector.indexOf("\r")) >= 0)
102             {
103                // Replace \r by a space
104                messageSelector = (i == 0 ? "" : messageSelector.substring(0, i)) + " " +
105                   (i >= messageSelector.length() - 1 ? "" : messageSelector.substring(i + 1));
106             }
107          i = -1;
108          
109          while ((i = messageSelector.indexOf("\n")) >= 0)
110             {
111                // Replace \n by a space
112                messageSelector = (i == 0 ? "" : messageSelector.substring(0, i)) + " " +
113                   (i >= messageSelector.length() - 1 ? "" : messageSelector.substring(i + 1));
114             }
115          
116          // Finally trim it. This is here because only carriage returns and 
117          // linefeeds are transformed to spaces
118          messageSelector = messageSelector.trim();
119          if ("".equals(messageSelector))
120             {
121                messageSelector = null;
122             }
123       } // end of if ()
124       this.messageSelector = messageSelector;
125       
126    }
127 
128    /***
129     * @jmx:managed-attribute
130     */
131    public String getSubscriptionId()
132    {
133       return subscriptionId;
134    }
135    /***
136     * @jmx:managed-attribute
137     */
138    public void setSubscriptionId(String subscriptionId) {
139       this.subscriptionId = subscriptionId;
140    }
141 
142    /***
143     * @jmx:managed-attribute
144     */
145    public boolean getPullSubscription() {
146       return pullSubscriber;
147    }
148    /***
149     * @jmx:managed-attribute
150     */
151    public void setPullSubscription(boolean pullSubscriber) {
152       this.pullSubscriber = pullSubscriber;
153    }
154    /***
155     * @jmx:managed-attribute
156     */
157    public long getPullTimeout() {
158       return pullTimeput;
159    }
160    /***
161     * @jmx:managed-attribute
162     */
163    public void setPullTimeout(long pullTimeput) {
164       this.pullTimeput = pullTimeput;
165    }
166 
167    
168    protected void destroyService() throws Exception
169    {
170       if ( receiver !=null) {
171          receiver.stop();
172          receiver = null;
173       } // end of if ()
174       
175       // close the connection consumer
176       try
177       {
178          if (subscriber != null)
179          {
180             if ( stopOnFail) {
181                subscriber.close();
182             } // end of if ()
183             
184             subscriber= null;
185          }
186       }
187       catch (Exception e)
188          {
189             log.error("Failed to close connection consumer: "+e);
190          }
191       super.destroyService();
192    }
193    //---------------- JMS interface methods ---------------------------
194    public void onMessage(final Message message){
195       log.debug("Got message " + message);
196       Notification n = new Notification(null,//Type!
197                                         name,
198                                          new Date().getTime()
199                                         );
200       n.setUserData(message);
201       sendNotification(n);
202    }
203 
204    
205 
206    //---------------- JMS related helper methods ----------------------
207 
208    protected void startTopic() throws JMSException {
209       super.startTopic();
210       TopicSession session = ((TopicConnection)connection).createTopicSession(
211 							 // No transaction
212 							 false,
213 							 // Auto ack
214 							 Session.AUTO_ACKNOWLEDGE);
215       
216       if (subscriptionDurability ) {
217          subscriber = session.createDurableSubscriber((Topic)destination, subscriptionId,messageSelector, false);
218       } else {
219          subscriber = session.createSubscriber((Topic)destination, messageSelector, false); 
220       } // end of else
221       
222       if ( !pullSubscriber) {
223          subscriber.setMessageListener(this);
224       } else {
225           receive(subscriber);
226       } // end of else
227       
228    }
229 
230    protected void startQueue() throws JMSException {
231       super.startQueue();
232       
233       QueueSession session = ((QueueConnection)connection).createQueueSession(
234 							 // No transaction
235 							 false,
236 							 // Auto ack
237 							 Session.AUTO_ACKNOWLEDGE);
238       
239 
240       subscriber = session.createReceiver((Queue)destination, messageSelector); 
241       
242       
243       if ( !pullSubscriber) {
244          subscriber.setMessageListener(this);
245       } else {
246          receive(subscriber);
247       } // end of else
248    }
249 
250    
251    protected void receive( MessageConsumer consumer) throws JMSException {
252       // Run forever, until stoped
253       receiver = new ReceiverImpl(consumer);
254       new Thread("JMSContainerInvoker Create Recovery Thread") {
255             public void run()
256             {
257                
258                receiver.receive();
259             }
260       	 }.start();
261    }
262    
263 
264    class ReceiverImpl {
265       boolean running = true;
266       Thread currentThread;
267       MessageConsumer consumer;
268       public ReceiverImpl(MessageConsumer consumer) {
269          this.consumer = consumer;
270       }
271       public void receive() {
272          currentThread = Thread.currentThread();
273          // Run forever, until stoped
274          log.debug("Starting receiving");
275          while ( running) {
276             try {
277                Message msg = consumer.receive(pullTimeput);
278 
279                if ( msg != null) {
280                   onMessage(msg);
281                } // end of if ()
282             } catch (JMSException e) {
283                // Skip handling if not in running mode,
284                // may be stop() that interupts thread.
285                if ( running ) {
286                   log.error("Could no receive message: "+e,e);
287                   
288                   running = false;
289                   if (!failsafe) {
290                      // 
291                      exListener.onException( new JMSException("Could not receive: " + e) );
292                   } // end of if ()
293                }
294                
295             } // end of try-catch
296          } // end of while ()
297          currentThread = null;
298       }
299       void stop()
300       {
301          log.debug("Stop requested");
302          
303          running = false;
304          if (currentThread != null)
305          {
306             currentThread.interrupt();
307             log.debug("Current thread interrupted");
308          }
309       }
310    }
311 }// MessageDrivenJMX