001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import org.apache.activemq.filter.BooleanExpression;
020import org.apache.activemq.filter.MessageEvaluationContext;
021import org.apache.activemq.util.JMSExceptionSupport;
022import org.slf4j.Logger;
023import org.slf4j.LoggerFactory;
024
025import javax.jms.JMSException;
026import java.io.IOException;
027import java.util.Arrays;
028
029/**
030 * @openwire:marshaller code="91"
031 * 
032 */
033public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
034
035    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER;
036    static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class);
037
038    protected BrokerId networkBrokerId;
039    protected int networkTTL;
040
041    public NetworkBridgeFilter() {
042    }
043
044    public NetworkBridgeFilter(BrokerId networkBrokerId, int networkTTL) {
045        this.networkBrokerId = networkBrokerId;
046        this.networkTTL = networkTTL;
047    }
048
049    public byte getDataStructureType() {
050        return DATA_STRUCTURE_TYPE;
051    }
052
053    public boolean isMarshallAware() {
054        return false;
055    }
056
057    public boolean matches(MessageEvaluationContext mec) throws JMSException {
058        try {
059            // for Queues - the message can be acknowledged and dropped whilst
060            // still
061            // in the dispatch loop
062            // so need to get the reference to it
063            Message message = mec.getMessage();
064            return message != null && matchesForwardingFilter(message, mec);
065        } catch (IOException e) {
066            throw JMSExceptionSupport.create(e);
067        }
068    }
069
070    public Object evaluate(MessageEvaluationContext message) throws JMSException {
071        return matches(message) ? Boolean.TRUE : Boolean.FALSE;
072    }
073
074    protected boolean matchesForwardingFilter(Message message, MessageEvaluationContext mec) {
075
076        if (contains(message.getBrokerPath(), networkBrokerId)) {
077            if (LOG.isTraceEnabled()) {
078                LOG.trace("Message all ready routed once through target broker ("
079                        + networkBrokerId + "), path: "
080                        + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message);
081            }
082            return false;
083        }
084
085        int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
086
087        if (hops >= networkTTL) {
088            if (LOG.isTraceEnabled()) {
089                LOG.trace("Message restricted to " + networkTTL + " network hops ignoring: " + message);
090            }
091            return false;
092        }
093
094        if (message.isAdvisory() && message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
095            ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
096            hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
097            if (hops >= networkTTL) {
098                if (LOG.isTraceEnabled()) {
099                    LOG.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message);
100                }
101                return false;
102            }
103
104            if (contains(info.getBrokerPath(), networkBrokerId)) {
105                LOG.trace("ConsumerInfo advisory all ready routed once through target broker ("
106                        + networkBrokerId + "), path: "
107                        + Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message);
108                return false;
109            }
110        }
111        return true;
112    }
113
114    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
115        if (brokerPath != null && brokerId != null) {
116            for (int i = 0; i < brokerPath.length; i++) {
117                if (brokerId.equals(brokerPath[i])) {
118                    return true;
119                }
120            }
121        }
122        return false;
123    }
124
125    /**
126     * @openwire:property version=1
127     */
128    public int getNetworkTTL() {
129        return networkTTL;
130    }
131
132    public void setNetworkTTL(int networkTTL) {
133        this.networkTTL = networkTTL;
134    }
135
136    /**
137     * @openwire:property version=1 cache=true
138     */
139    public BrokerId getNetworkBrokerId() {
140        return networkBrokerId;
141    }
142
143    public void setNetworkBrokerId(BrokerId remoteBrokerPath) {
144        this.networkBrokerId = remoteBrokerPath;
145    }
146
147}