View Javadoc

1   /*
2    * Copyright (c) 2002 Peter Antman, Teknik i Media  <peter.antman@tim.se>
3    *
4    * $Id: Producer.java,v 1.1.1.1 2004/05/19 12:26:42 pra Exp $
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 2 of the License, or (at your option) any later version
10   * 
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   * 
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, write to the Free Software
18   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19   */
20  package org.backsource.adaptor.jms;
21  import javax.jms.QueueConnection;
22  import javax.jms.TopicConnection;
23  import javax.jms.Session;
24  import javax.jms.QueueSession;
25  import javax.jms.TopicSession;
26  import javax.jms.QueueSender;
27  import javax.jms.TopicPublisher;
28  import javax.jms.Message;
29  import javax.jms.TextMessage;
30  import javax.jms.Destination;
31  import javax.jms.Queue;
32  import javax.jms.Topic;
33  import javax.jms.JMSException;
34  
35  import org.backsource.jmx.MBeanProxyFactory;
36  /***
37   * A JMS producer.
38   *
39   * <p>This class holds a connection and specific destination. Users get a MessageProducer from it. Currently this will create a session each time the getProducer is called, but it should always be left back when used, since failover handling will not work correct othervise! if given a ProducerJNDI an Mbean proxy will be bound at that name that implements {@link ProducerFactory}.</p>
40   *
41   * @author <a href="mailto:pra@tim.se">Peter Antman</a>
42   * @version $Revision: 1.1.1.1 $
43   * @jmx:mbean name="jms:service=JmsProducer,name=default" extends="org.backsource.adaptor.jms.JmsBaseMBean,org.backsource.adaptor.jms.ProducerFactory"
44   */
45  
46  public class Producer extends JmsBase implements org.backsource.adaptor.jms.ProducerMBean {
47     protected String producerJNDI;
48     protected ProducerFactory prodProxy;
49     
50     public Producer (){
51        
52     }
53     /***
54      * Set a JNDI name to bind an MBean proxy to this MBean that implements {@link ProducerFactory}.
55      *<p>if not set a startup, binding will not be done.</p>
56      * @jmx:managed-attribute
57      */
58     public void setProducerJNDI(String producerJNDI) {
59        this.producerJNDI = producerJNDI;
60     }
61     /***
62      * @jmx:managed-attribute
63      */
64     public String getProducerJNDI() {
65        return producerJNDI;
66     }
67     /***
68      * Get a MessageProducer.
69      *
70      * <p>Currently a new session will be created each time this is called. Even if this is the case; the recommended usage pattern is to get it at each invokation and close it; since pooling may be introduced.</p>
71      *
72      * @jmx:managed-attribute
73      */
74     public MessageProducer getProducer() throws JMSException {
75        if ( connectionException) {
76           throw new JMSException("Connection exception: no connection to provider");
77        } // end of if ()
78        if ( connection != null) {
79           Session sess = null;
80           if ( isTopic) {
81              sess = ((TopicConnection)connection).createTopicSession(transacted,
82                                                                      ackMode);
83           } else {
84              sess = ((QueueConnection)connection).createQueueSession(transacted,
85                                                                      ackMode);
86           } // end of else
87           return new MessageProducerImpl(this,sess,destination,isTopic);
88        } else {     
89           throw new JMSException("No connection");
90        } // end of else
91        
92        
93  
94  
95     }
96     /***
97      * Used by MessageProcucer to leave itself back to the factory.
98      *
99      * @jmx:managed-operation
100     */
101    public void leaveProducer(MessageProducer prod) {
102       try {
103 
104       if ( prod != null) {
105          Session s = prod.getSession();
106          if ( s != null) {
107             s.close();
108          } // end of if ()
109          
110       } // end of if ()
111                 
112       } catch ( Throwable e) {
113          log.error("Could not take back producer: " +e,e);
114       } // end of try-catch
115       
116    }
117    /***
118     * Publish message as a text message.
119     *
120     * @jmx:managed-operation
121     */
122    public void publish(String message) throws JMSException {
123       MessageProducer p = getProducer();
124       Session s = p.getSession();    
125       TextMessage m = s.createTextMessage(message);
126       log.debug("Sending message: " + message);
127       p.send(m);
128       p.close();
129    }
130    
131    //--- Lifecykle methods ----
132 
133    protected void startService() throws Exception {
134       super.startService();
135       if (producerJNDI != null ) {
136          log.info("Binding producer proxy into jndi: " + producerJNDI);
137          prodProxy  = (ProducerFactory)MBeanProxyFactory.create(ProducerFactory.class,
138                                                                 name.toString());
139          MBeanProxyFactory.rebind(producerJNDI, prodProxy);                        
140       } // end of if ()
141       
142    }
143    
144    protected void stopService() throws Exception {
145       if ( prodProxy != null) {
146          MBeanProxyFactory.unbind(producerJNDI); 
147          prodProxy = null;
148       } // end of if ()
149       super.stopService();
150    }
151 
152    /***
153     * An extended message procuder.
154     *
155     * <p>This class implements the jms 1.1 interface for 1.0.2 providers. It also ads the possibility to get the session; to have something to create messages from.</p>
156     */
157    static class MessageProducerImpl implements MessageProducer {
158       boolean isTopic = true;
159       Session session;
160       Producer factory;
161       javax.jms.MessageProducer producer;
162       Destination dest;
163 
164       MessageProducerImpl(Producer factory,Session session, Destination dest,boolean isTopic) throws JMSException{
165          this.factory = factory;
166          this.session = session;
167          this.isTopic = isTopic;
168          this.dest = dest;
169 
170          if ( isTopic) {
171             producer = ((TopicSession)session).createPublisher((Topic)dest);
172          } else {
173             producer = ((QueueSession)session).createSender((Queue)dest);
174          } // end of else
175          
176       }
177 
178 
179       public void setDisableMessageID(boolean value) throws JMSException {
180          producer.setDisableMessageID(value);
181       }
182       
183       public boolean getDisableMessageID() throws JMSException {
184          return producer.getDisableMessageID();
185       }
186 
187       public void setDisableMessageTimestamp(boolean value) throws JMSException {
188          producer.setDisableMessageTimestamp(value);
189       }
190 
191       public boolean getDisableMessageTimestamp() throws JMSException {
192          return producer.getDisableMessageTimestamp();
193       }
194 
195       public void setDeliveryMode(int deliveryMode) throws JMSException {
196          producer.setDeliveryMode(deliveryMode);
197       }
198 
199       public int getDeliveryMode() throws JMSException {
200          return producer.getDeliveryMode();
201       }
202 
203       public void setPriority(int defaultPriorityFromLow0High9) throws JMSException {
204          producer.setPriority(defaultPriorityFromLow0High9);
205       }
206 
207       public int getPriority() throws JMSException {
208          return producer.getPriority();
209       }
210 
211       public void setTimeToLive(long timeToLiveInMilliseconds) throws JMSException {
212          producer.setTimeToLive(timeToLiveInMilliseconds);
213       }
214 
215       public long getTimeToLive() throws JMSException {
216          return producer.getTimeToLive();
217       }
218 
219       public void close() throws JMSException {
220          producer.close();
221          factory.leaveProducer(this);
222       }
223 
224       public void send(Message message) throws JMSException {
225          if ( isTopic) {
226             ((TopicPublisher)producer).publish(message);
227          } else {
228              ((QueueSender)producer).send(message);
229          } // end of else
230          
231          
232       }
233 
234       public void send(
235                        Message message, 
236                        int deliveryMode, 
237                        int priority,
238                        long timeToLive
239                        )
240          throws JMSException {
241          if ( isTopic) {
242             ((TopicPublisher)producer).publish(message,deliveryMode,priority,timeToLive);
243          } else {
244              ((QueueSender)producer).send(message,deliveryMode,priority,timeToLive);
245          } // end of else
246          
247       }
248 
249       public void send(Destination destination, Message message) throws JMSException {
250          if ( isTopic) {
251             ((TopicPublisher)producer).publish((Topic)destination,message);
252          } else {
253              ((QueueSender)producer).send((Queue)destination,message);
254          } // end of else
255          
256 
257       }
258       public void send(
259                        Destination destination, 
260                        Message message, 
261                        int deliveryMode, 
262                        int priority,
263                        long timeToLive
264                        )
265          throws JMSException {
266          if ( isTopic) {
267             ((TopicPublisher)producer).publish((Topic)destination,message,deliveryMode,priority,timeToLive);
268          } else {
269              ((QueueSender)producer).send((Queue)destination,message,deliveryMode,priority,timeToLive);
270          } // end of else
271       }
272 
273       /***
274        * Get the underlying session; please:only use it to create new messages!
275        */
276       public Session getSession() {
277          return session;
278       }
279    }
280 }// ProducerJMX