001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.transport.mqtt;
019
020import java.io.IOException;
021import java.util.Timer;
022import java.util.concurrent.SynchronousQueue;
023import java.util.concurrent.ThreadFactory;
024import java.util.concurrent.ThreadPoolExecutor;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicInteger;
028import java.util.concurrent.locks.ReentrantReadWriteLock;
029
030import org.apache.activemq.command.KeepAliveInfo;
031import org.apache.activemq.thread.SchedulerTimerTask;
032import org.apache.activemq.transport.AbstractInactivityMonitor;
033import org.apache.activemq.transport.InactivityIOException;
034import org.apache.activemq.transport.Transport;
035import org.apache.activemq.transport.TransportFilter;
036import org.apache.activemq.wireformat.WireFormat;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040public class MQTTInactivityMonitor extends TransportFilter {
041
042    private static final Logger LOG = LoggerFactory.getLogger(MQTTInactivityMonitor.class);
043
044    private static ThreadPoolExecutor ASYNC_TASKS;
045    private static int CHECKER_COUNTER;
046    private static long DEFAULT_CHECK_TIME_MILLS = 30000;
047    private static Timer READ_CHECK_TIMER;
048
049    private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
050
051    private final AtomicBoolean commandSent = new AtomicBoolean(false);
052    private final AtomicBoolean inSend = new AtomicBoolean(false);
053    private final AtomicBoolean failed = new AtomicBoolean(false);
054
055    private final AtomicBoolean commandReceived = new AtomicBoolean(true);
056    private final AtomicBoolean inReceive = new AtomicBoolean(false);
057    private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
058
059    private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
060    private SchedulerTimerTask readCheckerTask;
061
062    private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
063    private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
064    private boolean keepAliveResponseRequired;
065    private MQTTProtocolConverter protocolConverter;
066
067
068    private final Runnable readChecker = new Runnable() {
069        long lastRunTime;
070
071        public void run() {
072            long now = System.currentTimeMillis();
073            long elapsed = (now - lastRunTime);
074
075            if (lastRunTime != 0 && LOG.isDebugEnabled()) {
076                LOG.debug("" + elapsed + " ms elapsed since last read check.");
077            }
078
079            // Perhaps the timer executed a read check late.. and then executes
080            // the next read check on time which causes the time elapsed between
081            // read checks to be small..
082
083            // If less than 90% of the read check Time elapsed then abort this readcheck.
084            if (!allowReadCheck(elapsed)) { // FUNKY qdox bug does not allow me to inline this expression.
085                LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
086                return;
087            }
088
089            lastRunTime = now;
090            readCheck();
091        }
092    };
093
094    private boolean allowReadCheck(long elapsed) {
095        return elapsed > (readCheckTime * 9 / 10);
096    }
097
098
099    public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
100        super(next);
101    }
102
103    public void start() throws Exception {
104        next.start();
105        startMonitorThread();
106    }
107
108    public void stop() throws Exception {
109        stopMonitorThread();
110        next.stop();
111    }
112
113
114    final void readCheck() {
115        int currentCounter = next.getReceiveCounter();
116        int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
117        if (inReceive.get() || currentCounter != previousCounter) {
118            if (LOG.isTraceEnabled()) {
119                LOG.trace("A receive is in progress");
120            }
121            return;
122        }
123        if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
124            if (LOG.isDebugEnabled()) {
125                LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
126            }
127            ASYNC_TASKS.execute(new Runnable() {
128                public void run() {
129                    if (protocolConverter != null) {
130                        protocolConverter.onTransportError();
131                    }
132                    onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
133                }
134
135                ;
136            });
137        } else {
138            if (LOG.isTraceEnabled()) {
139                LOG.trace("Message received since last read check, resetting flag: ");
140            }
141        }
142        commandReceived.set(false);
143    }
144
145
146    public void onCommand(Object command) {
147        commandReceived.set(true);
148        inReceive.set(true);
149        try {
150            if (command.getClass() == KeepAliveInfo.class) {
151                KeepAliveInfo info = (KeepAliveInfo) command;
152                if (info.isResponseRequired()) {
153                    sendLock.readLock().lock();
154                    try {
155                        info.setResponseRequired(false);
156                        oneway(info);
157                    } catch (IOException e) {
158                        onException(e);
159                    } finally {
160                        sendLock.readLock().unlock();
161                    }
162                }
163            } else {
164                transportListener.onCommand(command);
165            }
166        } finally {
167            inReceive.set(false);
168        }
169    }
170
171    public void oneway(Object o) throws IOException {
172        // To prevent the inactivity monitor from sending a message while we
173        // are performing a send we take a read lock.  The inactivity monitor
174        // sends its Heart-beat commands under a write lock.  This means that
175        // the MutexTransport is still responsible for synchronizing sends
176        this.sendLock.readLock().lock();
177        inSend.set(true);
178        try {
179            doOnewaySend(o);
180        } finally {
181            commandSent.set(true);
182            inSend.set(false);
183            this.sendLock.readLock().unlock();
184        }
185    }
186
187    // Must be called under lock, either read or write on sendLock.
188    private void doOnewaySend(Object command) throws IOException {
189        if (failed.get()) {
190            throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress());
191        }
192        next.oneway(command);
193    }
194
195    public void onException(IOException error) {
196        if (failed.compareAndSet(false, true)) {
197            stopMonitorThread();
198            transportListener.onException(error);
199        }
200    }
201
202
203    public long getReadCheckTime() {
204        return readCheckTime;
205    }
206
207    public void setReadCheckTime(long readCheckTime) {
208        this.readCheckTime = readCheckTime;
209    }
210
211
212    public long getInitialDelayTime() {
213        return initialDelayTime;
214    }
215
216    public void setInitialDelayTime(long initialDelayTime) {
217        this.initialDelayTime = initialDelayTime;
218    }
219
220    public boolean isKeepAliveResponseRequired() {
221        return this.keepAliveResponseRequired;
222    }
223
224    public void setKeepAliveResponseRequired(boolean value) {
225        this.keepAliveResponseRequired = value;
226    }
227
228    public boolean isMonitorStarted() {
229        return this.monitorStarted.get();
230    }
231
232    public void setProtocolConverter(MQTTProtocolConverter protocolConverter) {
233        this.protocolConverter = protocolConverter;
234    }
235
236    public MQTTProtocolConverter getProtocolConverter() {
237        return protocolConverter;
238    }
239
240    synchronized void startMonitorThread() {
241        if (monitorStarted.get()) {
242            return;
243        }
244
245
246        if (readCheckTime > 0) {
247            readCheckerTask = new SchedulerTimerTask(readChecker);
248        }
249
250
251        if (readCheckTime > 0) {
252            monitorStarted.set(true);
253            synchronized (AbstractInactivityMonitor.class) {
254                if (CHECKER_COUNTER == 0) {
255                    ASYNC_TASKS = createExecutor();
256                    READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
257                }
258                CHECKER_COUNTER++;
259                if (readCheckTime > 0) {
260                    READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
261                }
262            }
263        }
264    }
265
266
267    synchronized void stopMonitorThread() {
268        if (monitorStarted.compareAndSet(true, false)) {
269            if (readCheckerTask != null) {
270                readCheckerTask.cancel();
271            }
272
273            synchronized (AbstractInactivityMonitor.class) {
274                READ_CHECK_TIMER.purge();
275                CHECKER_COUNTER--;
276                if (CHECKER_COUNTER == 0) {
277                    READ_CHECK_TIMER.cancel();
278                    READ_CHECK_TIMER = null;
279                    ASYNC_TASKS.shutdown();
280                    ASYNC_TASKS = null;
281                }
282            }
283        }
284    }
285
286    private ThreadFactory factory = new ThreadFactory() {
287        public Thread newThread(Runnable runnable) {
288            Thread thread = new Thread(runnable, "MQTTInactivityMonitor Async Task: " + runnable);
289            thread.setDaemon(true);
290            return thread;
291        }
292    };
293
294    private ThreadPoolExecutor createExecutor() {
295        ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
296        exec.allowCoreThreadTimeOut(true);
297        return exec;
298    }
299}
300