001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import java.io.IOException;
020import java.io.UnsupportedEncodingException;
021import java.util.Enumeration;
022import java.util.HashMap;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.Vector;
027import javax.jms.DeliveryMode;
028import javax.jms.Destination;
029import javax.jms.JMSException;
030import javax.jms.MessageFormatException;
031import javax.jms.MessageNotWriteableException;
032import org.apache.activemq.ActiveMQConnection;
033import org.apache.activemq.ScheduledMessage;
034import org.apache.activemq.broker.scheduler.CronParser;
035import org.apache.activemq.filter.PropertyExpression;
036import org.apache.activemq.state.CommandVisitor;
037import org.apache.activemq.util.Callback;
038import org.apache.activemq.util.JMSExceptionSupport;
039import org.apache.activemq.util.TypeConversionSupport;
040
041/**
042 * 
043 * @openwire:marshaller code="23"
044 */
045public class ActiveMQMessage extends Message implements org.apache.activemq.Message, ScheduledMessage {
046    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MESSAGE;
047    public static final String DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = "dlqDeliveryFailureCause";
048    private static final Map<String, PropertySetter> JMS_PROPERTY_SETERS = new HashMap<String, PropertySetter>();
049
050    protected transient Callback acknowledgeCallback;
051
052    public byte getDataStructureType() {
053        return DATA_STRUCTURE_TYPE;
054    }
055
056
057    @Override
058    public Message copy() {
059        ActiveMQMessage copy = new ActiveMQMessage();
060        copy(copy);
061        return copy;
062    }
063
064    protected void copy(ActiveMQMessage copy) {
065        super.copy(copy);
066        copy.acknowledgeCallback = acknowledgeCallback;
067    }
068
069    @Override
070    public int hashCode() {
071        MessageId id = getMessageId();
072        if (id != null) {
073            return id.hashCode();
074        } else {
075            return super.hashCode();
076        }
077    }
078
079    @Override
080    public boolean equals(Object o) {
081        if (this == o) {
082            return true;
083        }
084        if (o == null || o.getClass() != getClass()) {
085            return false;
086        }
087
088        ActiveMQMessage msg = (ActiveMQMessage) o;
089        MessageId oMsg = msg.getMessageId();
090        MessageId thisMsg = this.getMessageId();
091        return thisMsg != null && oMsg != null && oMsg.equals(thisMsg);
092    }
093
094    public void acknowledge() throws JMSException {
095        if (acknowledgeCallback != null) {
096            try {
097                acknowledgeCallback.execute();
098            } catch (JMSException e) {
099                throw e;
100            } catch (Throwable e) {
101                throw JMSExceptionSupport.create(e);
102            }
103        }
104    }
105
106    @Override
107    public void clearBody() throws JMSException {
108        setContent(null);
109        readOnlyBody = false;
110    }
111
112    public String getJMSMessageID() {
113        MessageId messageId = this.getMessageId();
114        if (messageId == null) {
115            return null;
116        }
117        return messageId.toString();
118    }
119
120    /**
121     * Seems to be invalid because the parameter doesn't initialize MessageId
122     * instance variables ProducerId and ProducerSequenceId
123     *
124     * @param value
125     * @throws JMSException
126     */
127    public void setJMSMessageID(String value) throws JMSException {
128        if (value != null) {
129            try {
130                MessageId id = new MessageId(value);
131                this.setMessageId(id);
132            } catch (NumberFormatException e) {
133                // we must be some foreign JMS provider or strange user-supplied
134                // String
135                // so lets set the IDs to be 1
136                MessageId id = new MessageId();
137                id.setTextView(value);
138                this.setMessageId(messageId);
139            }
140        } else {
141            this.setMessageId(null);
142        }
143    }
144
145    /**
146     * This will create an object of MessageId. For it to be valid, the instance
147     * variable ProducerId and producerSequenceId must be initialized.
148     *
149     * @param producerId
150     * @param producerSequenceId
151     * @throws JMSException
152     */
153    public void setJMSMessageID(ProducerId producerId, long producerSequenceId) throws JMSException {
154        MessageId id = null;
155        try {
156            id = new MessageId(producerId, producerSequenceId);
157            this.setMessageId(id);
158        } catch (Throwable e) {
159            throw JMSExceptionSupport.create("Invalid message id '" + id + "', reason: " + e.getMessage(), e);
160        }
161    }
162
163    public long getJMSTimestamp() {
164        return this.getTimestamp();
165    }
166
167    public void setJMSTimestamp(long timestamp) {
168        this.setTimestamp(timestamp);
169    }
170
171    public String getJMSCorrelationID() {
172        return this.getCorrelationId();
173    }
174
175    public void setJMSCorrelationID(String correlationId) {
176        this.setCorrelationId(correlationId);
177    }
178
179    public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
180        return encodeString(this.getCorrelationId());
181    }
182
183    public void setJMSCorrelationIDAsBytes(byte[] correlationId) throws JMSException {
184        this.setCorrelationId(decodeString(correlationId));
185    }
186
187    public String getJMSXMimeType() {
188        return "jms/message";
189    }
190
191    protected static String decodeString(byte[] data) throws JMSException {
192        try {
193            if (data == null) {
194                return null;
195            }
196            return new String(data, "UTF-8");
197        } catch (UnsupportedEncodingException e) {
198            throw new JMSException("Invalid UTF-8 encoding: " + e.getMessage());
199        }
200    }
201
202    protected static byte[] encodeString(String data) throws JMSException {
203        try {
204            if (data == null) {
205                return null;
206            }
207            return data.getBytes("UTF-8");
208        } catch (UnsupportedEncodingException e) {
209            throw new JMSException("Invalid UTF-8 encoding: " + e.getMessage());
210        }
211    }
212
213    public Destination getJMSReplyTo() {
214        return this.getReplyTo();
215    }
216
217    public void setJMSReplyTo(Destination destination) throws JMSException {
218        this.setReplyTo(ActiveMQDestination.transform(destination));
219    }
220
221    public Destination getJMSDestination() {
222        return this.getDestination();
223    }
224
225    public void setJMSDestination(Destination destination) throws JMSException {
226        this.setDestination(ActiveMQDestination.transform(destination));
227    }
228
229    public int getJMSDeliveryMode() {
230        return this.isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
231    }
232
233    public void setJMSDeliveryMode(int mode) {
234        this.setPersistent(mode == DeliveryMode.PERSISTENT);
235    }
236
237    public boolean getJMSRedelivered() {
238        return this.isRedelivered();
239    }
240
241    public void setJMSRedelivered(boolean redelivered) {
242        this.setRedelivered(redelivered);
243    }
244
245    public String getJMSType() {
246        return this.getType();
247    }
248
249    public void setJMSType(String type) {
250        this.setType(type);
251    }
252
253    public long getJMSExpiration() {
254        return this.getExpiration();
255    }
256
257    public void setJMSExpiration(long expiration) {
258        this.setExpiration(expiration);
259    }
260
261    public int getJMSPriority() {
262        return this.getPriority();
263    }
264
265    public void setJMSPriority(int priority) {
266        this.setPriority((byte) priority);
267    }
268
269    @Override
270    public void clearProperties() {
271        super.clearProperties();
272        readOnlyProperties = false;
273    }
274
275    public boolean propertyExists(String name) throws JMSException {
276        try {
277            return (this.getProperties().containsKey(name) || getObjectProperty(name)!= null);
278        } catch (IOException e) {
279            throw JMSExceptionSupport.create(e);
280        }
281    }
282
283    public Enumeration getPropertyNames() throws JMSException {
284        try {
285            Vector<String> result = new Vector<String>(this.getProperties().keySet());
286            return result.elements();
287        } catch (IOException e) {
288            throw JMSExceptionSupport.create(e);
289        }
290    }
291
292    /**
293     * return all property names, including standard JMS properties and JMSX properties
294     * @return  Enumeration of all property names on this message
295     * @throws JMSException
296     */
297    public Enumeration getAllPropertyNames() throws JMSException {
298        try {
299            Vector<String> result = new Vector<String>(this.getProperties().keySet());
300            result.addAll(JMS_PROPERTY_SETERS.keySet());
301            return result.elements();
302        } catch (IOException e) {
303            throw JMSExceptionSupport.create(e);
304        }
305    }
306
307    interface PropertySetter {
308
309        void set(Message message, Object value) throws MessageFormatException;
310    }
311
312    static {
313        JMS_PROPERTY_SETERS.put("JMSXDeliveryCount", new PropertySetter() {
314            public void set(Message message, Object value) throws MessageFormatException {
315                Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
316                if (rc == null) {
317                    throw new MessageFormatException("Property JMSXDeliveryCount cannot be set from a " + value.getClass().getName() + ".");
318                }
319                message.setRedeliveryCounter(rc.intValue() - 1);
320            }
321        });
322        JMS_PROPERTY_SETERS.put("JMSXGroupID", new PropertySetter() {
323            public void set(Message message, Object value) throws MessageFormatException {
324                String rc = (String) TypeConversionSupport.convert(value, String.class);
325                if (rc == null) {
326                    throw new MessageFormatException("Property JMSXGroupID cannot be set from a " + value.getClass().getName() + ".");
327                }
328                message.setGroupID(rc);
329            }
330        });
331        JMS_PROPERTY_SETERS.put("JMSXGroupSeq", new PropertySetter() {
332            public void set(Message message, Object value) throws MessageFormatException {
333                Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
334                if (rc == null) {
335                    throw new MessageFormatException("Property JMSXGroupSeq cannot be set from a " + value.getClass().getName() + ".");
336                }
337                message.setGroupSequence(rc.intValue());
338            }
339        });
340        JMS_PROPERTY_SETERS.put("JMSCorrelationID", new PropertySetter() {
341            public void set(Message message, Object value) throws MessageFormatException {
342                String rc = (String) TypeConversionSupport.convert(value, String.class);
343                if (rc == null) {
344                    throw new MessageFormatException("Property JMSCorrelationID cannot be set from a " + value.getClass().getName() + ".");
345                }
346                ((ActiveMQMessage) message).setJMSCorrelationID(rc);
347            }
348        });
349        JMS_PROPERTY_SETERS.put("JMSDeliveryMode", new PropertySetter() {
350            public void set(Message message, Object value) throws MessageFormatException {
351                Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
352                if (rc == null) {
353                    Boolean bool = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
354                    if (bool == null) {
355                        throw new MessageFormatException("Property JMSDeliveryMode cannot be set from a " + value.getClass().getName() + ".");
356                    }
357                    else {
358                        rc = bool.booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
359                    }
360                }
361                ((ActiveMQMessage) message).setJMSDeliveryMode(rc);
362            }
363        });
364        JMS_PROPERTY_SETERS.put("JMSExpiration", new PropertySetter() {
365            public void set(Message message, Object value) throws MessageFormatException {
366                Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
367                if (rc == null) {
368                    throw new MessageFormatException("Property JMSExpiration cannot be set from a " + value.getClass().getName() + ".");
369                }
370                ((ActiveMQMessage) message).setJMSExpiration(rc.longValue());
371            }
372        });
373        JMS_PROPERTY_SETERS.put("JMSPriority", new PropertySetter() {
374            public void set(Message message, Object value) throws MessageFormatException {
375                Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
376                if (rc == null) {
377                    throw new MessageFormatException("Property JMSPriority cannot be set from a " + value.getClass().getName() + ".");
378                }
379                ((ActiveMQMessage) message).setJMSPriority(rc.intValue());
380            }
381        });
382        JMS_PROPERTY_SETERS.put("JMSRedelivered", new PropertySetter() {
383            public void set(Message message, Object value) throws MessageFormatException {
384                Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
385                if (rc == null) {
386                    throw new MessageFormatException("Property JMSRedelivered cannot be set from a " + value.getClass().getName() + ".");
387                }
388                ((ActiveMQMessage) message).setJMSRedelivered(rc.booleanValue());
389            }
390        });
391        JMS_PROPERTY_SETERS.put("JMSReplyTo", new PropertySetter() {
392            public void set(Message message, Object value) throws MessageFormatException {
393                ActiveMQDestination rc = (ActiveMQDestination) TypeConversionSupport.convert(value, ActiveMQDestination.class);
394                if (rc == null) {
395                    throw new MessageFormatException("Property JMSReplyTo cannot be set from a " + value.getClass().getName() + ".");
396                }
397                ((ActiveMQMessage) message).setReplyTo(rc);
398            }
399        });
400        JMS_PROPERTY_SETERS.put("JMSTimestamp", new PropertySetter() {
401            public void set(Message message, Object value) throws MessageFormatException {
402                Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
403                if (rc == null) {
404                    throw new MessageFormatException("Property JMSTimestamp cannot be set from a " + value.getClass().getName() + ".");
405                }
406                ((ActiveMQMessage) message).setJMSTimestamp(rc.longValue());
407            }
408        });
409        JMS_PROPERTY_SETERS.put("JMSType", new PropertySetter() {
410            public void set(Message message, Object value) throws MessageFormatException {
411                String rc = (String) TypeConversionSupport.convert(value, String.class);
412                if (rc == null) {
413                    throw new MessageFormatException("Property JMSType cannot be set from a " + value.getClass().getName() + ".");
414                }
415                ((ActiveMQMessage) message).setJMSType(rc);
416            }
417        });
418    }
419
420    public void setObjectProperty(String name, Object value) throws JMSException {
421        setObjectProperty(name, value, true);
422    }
423
424    public void setObjectProperty(String name, Object value, boolean checkReadOnly) throws JMSException {
425
426        if (checkReadOnly) {
427            checkReadOnlyProperties();
428        }
429        if (name == null || name.equals("")) {
430            throw new IllegalArgumentException("Property name cannot be empty or null");
431        }
432
433        checkValidObject(value);
434        value = convertScheduled(name, value);
435        PropertySetter setter = JMS_PROPERTY_SETERS.get(name);
436
437        if (setter != null && value != null) {
438            setter.set(this, value);
439        } else {
440            try {
441                this.setProperty(name, value);
442            } catch (IOException e) {
443                throw JMSExceptionSupport.create(e);
444            }
445        }
446    }
447
448    public void setProperties(Map properties) throws JMSException {
449        for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) {
450            Map.Entry entry = (Map.Entry) iter.next();
451
452            // Lets use the object property method as we may contain standard
453            // extension headers like JMSXGroupID
454            setObjectProperty((String) entry.getKey(), entry.getValue());
455        }
456    }
457
458    protected void checkValidObject(Object value) throws MessageFormatException {
459
460        boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long;
461        valid = valid || value instanceof Float || value instanceof Double || value instanceof Character || value instanceof String || value == null;
462
463        if (!valid) {
464
465            ActiveMQConnection conn = getConnection();
466            // conn is null if we are in the broker rather than a JMS client
467            if (conn == null || conn.isNestedMapAndListEnabled()) {
468                if (!(value instanceof Map || value instanceof List)) {
469                    throw new MessageFormatException("Only objectified primitive objects, String, Map and List types are allowed but was: " + value + " type: " + value.getClass());
470                }
471            } else {
472                throw new MessageFormatException("Only objectified primitive objects and String types are allowed but was: " + value + " type: " + value.getClass());
473            }
474        }
475    }
476    
477    protected void  checkValidScheduled(String name, Object value) throws MessageFormatException {
478        if (AMQ_SCHEDULED_DELAY.equals(name) || AMQ_SCHEDULED_PERIOD.equals(name) || AMQ_SCHEDULED_REPEAT.equals(name)) {
479            if (value instanceof Long == false && value instanceof Integer == false) {
480                throw new MessageFormatException(name + " should be long or int value");
481            }
482        }
483        if (AMQ_SCHEDULED_CRON.equals(name)) {
484            CronParser.validate(value.toString());
485        }
486    }
487    
488    protected Object  convertScheduled(String name, Object value) throws MessageFormatException {
489        Object result = value;
490        if (AMQ_SCHEDULED_DELAY.equals(name)){
491            result = TypeConversionSupport.convert(value, Long.class);
492        }
493        else if (AMQ_SCHEDULED_PERIOD.equals(name)){
494            result = TypeConversionSupport.convert(value, Long.class);
495        }
496        else if (AMQ_SCHEDULED_REPEAT.equals(name)){
497            result = TypeConversionSupport.convert(value, Integer.class);
498        }
499        return result;
500    }
501
502    public Object getObjectProperty(String name) throws JMSException {
503        if (name == null) {
504            throw new NullPointerException("Property name cannot be null");
505        }
506
507        // PropertyExpression handles converting message headers to properties.
508        PropertyExpression expression = new PropertyExpression(name);
509        return expression.evaluate(this);
510    }
511
512    public boolean getBooleanProperty(String name) throws JMSException {
513        Object value = getObjectProperty(name);
514        if (value == null) {
515            return false;
516        }
517        Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
518        if (rc == null) {
519            throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a boolean");
520        }
521        return rc.booleanValue();
522    }
523
524    public byte getByteProperty(String name) throws JMSException {
525        Object value = getObjectProperty(name);
526        if (value == null) {
527            throw new NumberFormatException("property " + name + " was null");
528        }
529        Byte rc = (Byte) TypeConversionSupport.convert(value, Byte.class);
530        if (rc == null) {
531            throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a byte");
532        }
533        return rc.byteValue();
534    }
535
536    public short getShortProperty(String name) throws JMSException {
537        Object value = getObjectProperty(name);
538        if (value == null) {
539            throw new NumberFormatException("property " + name + " was null");
540        }
541        Short rc = (Short) TypeConversionSupport.convert(value, Short.class);
542        if (rc == null) {
543            throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a short");
544        }
545        return rc.shortValue();
546    }
547
548    public int getIntProperty(String name) throws JMSException {
549        Object value = getObjectProperty(name);
550        if (value == null) {
551            throw new NumberFormatException("property " + name + " was null");
552        }
553        Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
554        if (rc == null) {
555            throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as an integer");
556        }
557        return rc.intValue();
558    }
559
560    public long getLongProperty(String name) throws JMSException {
561        Object value = getObjectProperty(name);
562        if (value == null) {
563            throw new NumberFormatException("property " + name + " was null");
564        }
565        Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
566        if (rc == null) {
567            throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a long");
568        }
569        return rc.longValue();
570    }
571
572    public float getFloatProperty(String name) throws JMSException {
573        Object value = getObjectProperty(name);
574        if (value == null) {
575            throw new NullPointerException("property " + name + " was null");
576        }
577        Float rc = (Float) TypeConversionSupport.convert(value, Float.class);
578        if (rc == null) {
579            throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a float");
580        }
581        return rc.floatValue();
582    }
583
584    public double getDoubleProperty(String name) throws JMSException {
585        Object value = getObjectProperty(name);
586        if (value == null) {
587            throw new NullPointerException("property " + name + " was null");
588        }
589        Double rc = (Double) TypeConversionSupport.convert(value, Double.class);
590        if (rc == null) {
591            throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a double");
592        }
593        return rc.doubleValue();
594    }
595
596    public String getStringProperty(String name) throws JMSException {
597        Object value = null;
598        if (name.equals("JMSXUserID")) {
599            value = getUserID();
600            if (value == null) {
601                value = getObjectProperty(name);
602            }
603        } else {
604            value = getObjectProperty(name);
605        }
606        if (value == null) {
607            return null;
608        }
609        String rc = (String) TypeConversionSupport.convert(value, String.class);
610        if (rc == null) {
611            throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a String");
612        }
613        return rc;
614    }
615
616    public void setBooleanProperty(String name, boolean value) throws JMSException {
617        setBooleanProperty(name, value, true);
618    }
619
620    public void setBooleanProperty(String name, boolean value, boolean checkReadOnly) throws JMSException {
621        setObjectProperty(name, Boolean.valueOf(value), checkReadOnly);
622    }
623
624    public void setByteProperty(String name, byte value) throws JMSException {
625        setObjectProperty(name, Byte.valueOf(value));
626    }
627
628    public void setShortProperty(String name, short value) throws JMSException {
629        setObjectProperty(name, Short.valueOf(value));
630    }
631
632    public void setIntProperty(String name, int value) throws JMSException {
633        setObjectProperty(name, Integer.valueOf(value));
634    }
635
636    public void setLongProperty(String name, long value) throws JMSException {
637        setObjectProperty(name, Long.valueOf(value));
638    }
639
640    public void setFloatProperty(String name, float value) throws JMSException {
641        setObjectProperty(name, new Float(value));
642    }
643
644    public void setDoubleProperty(String name, double value) throws JMSException {
645        setObjectProperty(name, new Double(value));
646    }
647
648    public void setStringProperty(String name, String value) throws JMSException {
649        setObjectProperty(name, value);
650    }
651
652    private void checkReadOnlyProperties() throws MessageNotWriteableException {
653        if (readOnlyProperties) {
654            throw new MessageNotWriteableException("Message properties are read-only");
655        }
656    }
657
658    protected void checkReadOnlyBody() throws MessageNotWriteableException {
659        if (readOnlyBody) {
660            throw new MessageNotWriteableException("Message body is read-only");
661        }
662    }
663
664    public Callback getAcknowledgeCallback() {
665        return acknowledgeCallback;
666    }
667
668    public void setAcknowledgeCallback(Callback acknowledgeCallback) {
669        this.acknowledgeCallback = acknowledgeCallback;
670    }
671
672    /**
673     * Send operation event listener. Used to get the message ready to be sent.
674     */
675    public void onSend() throws JMSException {
676        setReadOnlyBody(true);
677        setReadOnlyProperties(true);
678    }
679
680    public Response visit(CommandVisitor visitor) throws Exception {
681        return visitor.processMessage(this);
682    }
683}