001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc.adapter;
018
019import java.io.IOException;
020import java.io.PrintStream;
021import java.sql.Connection;
022import java.sql.PreparedStatement;
023import java.sql.ResultSet;
024import java.sql.ResultSetMetaData;
025import java.sql.SQLException;
026import java.sql.Statement;
027import java.util.ArrayList;
028import java.util.HashSet;
029import java.util.LinkedList;
030import java.util.Set;
031import java.util.concurrent.locks.ReadWriteLock;
032import java.util.concurrent.locks.ReentrantReadWriteLock;
033
034import org.apache.activemq.command.ActiveMQDestination;
035import org.apache.activemq.command.MessageId;
036import org.apache.activemq.command.ProducerId;
037import org.apache.activemq.command.SubscriptionInfo;
038import org.apache.activemq.store.jdbc.JDBCAdapter;
039import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
040import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
041import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
042import org.apache.activemq.store.jdbc.Statements;
043import org.apache.activemq.store.jdbc.TransactionContext;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
049 * encouraged to override the default implementation of methods to account for differences in JDBC Driver
050 * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
051 * The databases/JDBC drivers that use this adapter are:
052 * <ul>
053 * <li></li>
054 * </ul>
055 * 
056 * @org.apache.xbean.XBean element="defaultJDBCAdapter"
057 * 
058 * 
059 */
060public class DefaultJDBCAdapter implements JDBCAdapter {
061    private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class);
062    public static final int MAX_ROWS = 10000;
063    protected Statements statements;
064    protected boolean batchStatments = true;
065    protected boolean prioritizedMessages;
066    protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
067    // needs to be min twice the prefetch for a durable sub and large enough for selector range
068    protected int maxRows = MAX_ROWS;
069
070    protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
071        s.setBytes(index, data);
072    }
073
074    protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
075        return rs.getBytes(index);
076    }
077
078    public void doCreateTables(TransactionContext c) throws SQLException, IOException {
079        Statement s = null;
080        cleanupExclusiveLock.writeLock().lock();
081        try {
082            // Check to see if the table already exists. If it does, then don't
083            // log warnings during startup.
084            // Need to run the scripts anyways since they may contain ALTER
085            // statements that upgrade a previous version
086            // of the table
087            boolean alreadyExists = false;
088            ResultSet rs = null;
089            try {
090                rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),
091                        new String[] { "TABLE" });
092                alreadyExists = rs.next();
093            } catch (Throwable ignore) {
094            } finally {
095                close(rs);
096            }
097            s = c.getConnection().createStatement();
098            String[] createStatments = this.statements.getCreateSchemaStatements();
099            for (int i = 0; i < createStatments.length; i++) {
100                // This will fail usually since the tables will be
101                // created already.
102                try {
103                    LOG.debug("Executing SQL: " + createStatments[i]);
104                    s.execute(createStatments[i]);
105                } catch (SQLException e) {
106                    if (alreadyExists) {
107                        LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: "
108                                + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
109                                + " Vendor code: " + e.getErrorCode());
110                    } else {
111                        LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
112                                + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
113                                + " Vendor code: " + e.getErrorCode());
114                        JDBCPersistenceAdapter.log("Failure details: ", e);
115                    }
116                }
117            }
118            c.getConnection().commit();
119        } finally {
120            cleanupExclusiveLock.writeLock().unlock();
121            try {
122                s.close();
123            } catch (Throwable e) {
124            }
125        }
126    }
127
128    public void doDropTables(TransactionContext c) throws SQLException, IOException {
129        Statement s = null;
130        cleanupExclusiveLock.writeLock().lock();
131        try {
132            s = c.getConnection().createStatement();
133            String[] dropStatments = this.statements.getDropSchemaStatements();
134            for (int i = 0; i < dropStatments.length; i++) {
135                // This will fail usually since the tables will be
136                // created already.
137                try {
138                    LOG.debug("Executing SQL: " + dropStatments[i]);
139                    s.execute(dropStatments[i]);
140                } catch (SQLException e) {
141                    LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
142                            + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
143                            + e.getErrorCode());
144                    JDBCPersistenceAdapter.log("Failure details: ", e);
145                }
146            }
147            c.getConnection().commit();
148        } finally {
149            cleanupExclusiveLock.writeLock().unlock();
150            try {
151                s.close();
152            } catch (Throwable e) {
153            }
154        }
155    }
156
157    public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
158        PreparedStatement s = null;
159        ResultSet rs = null;
160        cleanupExclusiveLock.readLock().lock();
161        try {
162            s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
163            rs = s.executeQuery();
164            long seq1 = 0;
165            if (rs.next()) {
166                seq1 = rs.getLong(1);
167            }
168            rs.close();
169            s.close();
170            s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement());
171            rs = s.executeQuery();
172            long seq2 = 0;
173            if (rs.next()) {
174                seq2 = rs.getLong(1);
175            }
176            long seq = Math.max(seq1, seq2);
177            return seq;
178        } finally {
179            cleanupExclusiveLock.readLock().unlock();
180            close(rs);
181            close(s);
182        }
183    }
184    
185    public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
186        PreparedStatement s = null;
187        ResultSet rs = null;
188        cleanupExclusiveLock.readLock().lock();
189        try {
190            s = c.getConnection().prepareStatement(
191                    this.statements.getFindMessageByIdStatement());
192            s.setLong(1, storeSequenceId);
193            rs = s.executeQuery();
194            if (!rs.next()) {
195                return null;
196            }
197            return getBinaryData(rs, 1);
198        } finally {
199            cleanupExclusiveLock.readLock().unlock();
200            close(rs);
201            close(s);
202        }
203    }
204    
205
206    public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
207            long expiration, byte priority) throws SQLException, IOException {
208        PreparedStatement s = c.getAddMessageStatement();
209        cleanupExclusiveLock.readLock().lock();
210        try {
211            if (s == null) {
212                s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
213                if (this.batchStatments) {
214                    c.setAddMessageStatement(s);
215                }
216            }
217            s.setLong(1, sequence);
218            s.setString(2, messageID.getProducerId().toString());
219            s.setLong(3, messageID.getProducerSequenceId());
220            s.setString(4, destination.getQualifiedName());
221            s.setLong(5, expiration);
222            s.setLong(6, priority);
223            setBinaryData(s, 7, data);
224            if (this.batchStatments) {
225                s.addBatch();
226            } else if (s.executeUpdate() != 1) {
227                throw new SQLException("Failed add a message");
228            }
229        } finally {
230            cleanupExclusiveLock.readLock().unlock();
231            if (!this.batchStatments) {
232                if (s != null) {
233                    s.close();
234                }
235            }
236        }
237    }
238
239    public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
240            long expirationTime, String messageRef) throws SQLException, IOException {
241        PreparedStatement s = c.getAddMessageStatement();
242        cleanupExclusiveLock.readLock().lock();
243        try {
244            if (s == null) {
245                s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
246                if (this.batchStatments) {
247                    c.setAddMessageStatement(s);
248                }
249            }
250            s.setLong(1, messageID.getBrokerSequenceId());
251            s.setString(2, messageID.getProducerId().toString());
252            s.setLong(3, messageID.getProducerSequenceId());
253            s.setString(4, destination.getQualifiedName());
254            s.setLong(5, expirationTime);
255            s.setString(6, messageRef);
256            if (this.batchStatments) {
257                s.addBatch();
258            } else if (s.executeUpdate() != 1) {
259                throw new SQLException("Failed add a message");
260            }
261        } finally {
262            cleanupExclusiveLock.readLock().unlock();
263            if (!this.batchStatments) {
264                s.close();
265            }
266        }
267    }
268
269    public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
270        PreparedStatement s = null;
271        ResultSet rs = null;
272        cleanupExclusiveLock.readLock().lock();
273        try {
274            s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
275            s.setString(1, messageID.getProducerId().toString());
276            s.setLong(2, messageID.getProducerSequenceId());
277            s.setString(3, destination.getQualifiedName());
278            rs = s.executeQuery();
279            if (!rs.next()) {
280                return new long[]{0,0};
281            }
282            return new long[]{rs.getLong(1), rs.getLong(2)};
283        } finally {
284            cleanupExclusiveLock.readLock().unlock();
285            close(rs);
286            close(s);
287        }
288    }
289
290    public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
291        PreparedStatement s = null;
292        ResultSet rs = null;
293        cleanupExclusiveLock.readLock().lock();
294        try {
295            s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
296            s.setString(1, id.getProducerId().toString());
297            s.setLong(2, id.getProducerSequenceId());
298            rs = s.executeQuery();
299            if (!rs.next()) {
300                return null;
301            }
302            return getBinaryData(rs, 1);
303        } finally {
304            cleanupExclusiveLock.readLock().unlock();
305            close(rs);
306            close(s);
307        }
308    }
309
310    public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
311        PreparedStatement s = null;
312        ResultSet rs = null;
313        cleanupExclusiveLock.readLock().lock();
314        try {
315            s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
316            s.setLong(1, seq);
317            rs = s.executeQuery();
318            if (!rs.next()) {
319                return null;
320            }
321            return rs.getString(1);
322        } finally {
323            cleanupExclusiveLock.readLock().unlock();
324            close(rs);
325            close(s);
326        }
327    }
328
329    public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException {
330        PreparedStatement s = c.getRemovedMessageStatement();
331        cleanupExclusiveLock.readLock().lock();
332        try {
333            if (s == null) {
334                s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatement());
335                if (this.batchStatments) {
336                    c.setRemovedMessageStatement(s);
337                }
338            }
339            s.setLong(1, seq);
340            if (this.batchStatments) {
341                s.addBatch();
342            } else if (s.executeUpdate() != 1) {
343                throw new SQLException("Failed to remove message");
344            }
345        } finally {
346            cleanupExclusiveLock.readLock().unlock();
347            if (!this.batchStatments && s != null) {
348                s.close();
349            }
350        }
351    }
352
353    public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
354            throws Exception {
355        PreparedStatement s = null;
356        ResultSet rs = null;
357        cleanupExclusiveLock.readLock().lock();
358        try {
359            s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
360            s.setString(1, destination.getQualifiedName());
361            rs = s.executeQuery();
362            if (this.statements.isUseExternalMessageReferences()) {
363                while (rs.next()) {
364                    if (!listener.recoverMessageReference(rs.getString(2))) {
365                        break;
366                    }
367                }
368            } else {
369                while (rs.next()) {
370                    if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
371                        break;
372                    }
373                }
374            }
375        } finally {
376            cleanupExclusiveLock.readLock().unlock();
377            close(rs);
378            close(s);
379        }
380    }
381
382    public void doMessageIdScan(TransactionContext c, int limit, 
383            JDBCMessageIdScanListener listener) throws SQLException, IOException {
384        PreparedStatement s = null;
385        ResultSet rs = null;
386        cleanupExclusiveLock.readLock().lock();
387        try {
388            s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
389            s.setMaxRows(limit);
390            rs = s.executeQuery();
391            // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
392            LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>();
393            while (rs.next()) {
394                reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3)));
395            }
396            if (LOG.isDebugEnabled()) {
397                LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids");
398            }
399            for (MessageId id : reverseOrderIds) {
400                listener.messageId(id);
401            }
402        } finally {
403            cleanupExclusiveLock.readLock().unlock();
404            close(rs);
405            close(s);
406        }
407    }
408    
409    public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
410            String subscriptionName, long seq, long prio) throws SQLException, IOException {
411        PreparedStatement s = c.getUpdateLastAckStatement();
412        cleanupExclusiveLock.readLock().lock();
413        try {
414            if (s == null) {
415                s = c.getConnection().prepareStatement(this.statements.getUpdateLastPriorityAckRowOfDurableSubStatement());
416                if (this.batchStatments) {
417                    c.setUpdateLastAckStatement(s);
418                }
419            }
420            s.setLong(1, seq);
421            s.setString(2, destination.getQualifiedName());
422            s.setString(3, clientId);
423            s.setString(4, subscriptionName);
424            s.setLong(5, prio);
425            if (this.batchStatments) {
426                s.addBatch();
427            } else if (s.executeUpdate() != 1) {
428                throw new SQLException("Failed update last ack with priority: " + prio + ", for sub: " + subscriptionName);
429            }
430        } finally {
431            cleanupExclusiveLock.readLock().unlock();
432            if (!this.batchStatments) {
433                close(s);
434            }
435        }
436    }
437
438
439    public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
440                                        String subscriptionName, long seq, long priority) throws SQLException, IOException {
441        PreparedStatement s = c.getUpdateLastAckStatement();
442        cleanupExclusiveLock.readLock().lock();
443        try {
444            if (s == null) {
445                s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement());
446                if (this.batchStatments) {
447                    c.setUpdateLastAckStatement(s);
448                }
449            }
450            s.setLong(1, seq);
451            s.setString(2, destination.getQualifiedName());
452            s.setString(3, clientId);
453            s.setString(4, subscriptionName);
454
455            if (this.batchStatments) {
456                s.addBatch();
457            } else if (s.executeUpdate() != 1) {
458                throw new IOException("Could not update last ack seq : "
459                            + seq + ", for sub: " + subscriptionName);
460            }
461        } finally {
462            cleanupExclusiveLock.readLock().unlock();
463            if (!this.batchStatments) {
464                close(s);
465            }            
466        }
467    }
468
469    public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
470            String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
471        // dumpTables(c,
472        // destination.getQualifiedName(),clientId,subscriptionName);
473        PreparedStatement s = null;
474        ResultSet rs = null;
475        cleanupExclusiveLock.readLock().lock();
476        try {
477            s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
478            s.setString(1, destination.getQualifiedName());
479            s.setString(2, clientId);
480            s.setString(3, subscriptionName);
481            rs = s.executeQuery();
482            if (this.statements.isUseExternalMessageReferences()) {
483                while (rs.next()) {
484                    if (!listener.recoverMessageReference(rs.getString(2))) {
485                        break;
486                    }
487                }
488            } else {
489                while (rs.next()) {
490                    if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
491                        break;
492                    }
493                }
494            }
495        } finally {
496            cleanupExclusiveLock.readLock().unlock();
497            close(rs);
498            close(s);
499        }
500    }
501
502    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
503            String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
504        
505        PreparedStatement s = null;
506        ResultSet rs = null;
507        cleanupExclusiveLock.readLock().lock();
508        try {
509            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
510            s.setMaxRows(Math.max(maxReturned * 2, maxRows));
511            s.setString(1, destination.getQualifiedName());
512            s.setString(2, clientId);
513            s.setString(3, subscriptionName);
514            s.setLong(4, seq);
515            rs = s.executeQuery();
516            int count = 0;
517            if (this.statements.isUseExternalMessageReferences()) {
518                while (rs.next() && count < maxReturned) {
519                    if (listener.recoverMessageReference(rs.getString(1))) {
520                        count++;
521                    }
522                }
523            } else {
524                while (rs.next() && count < maxReturned) {
525                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
526                        count++;
527                    }
528                }
529            }
530        } finally {
531            cleanupExclusiveLock.readLock().unlock();
532            close(rs);
533            close(s);
534        }
535    }
536
537    public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
538            String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
539
540        PreparedStatement s = null;
541        ResultSet rs = null;
542        cleanupExclusiveLock.readLock().lock();
543        try {
544            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
545            s.setMaxRows(Math.max(maxReturned * 2, maxRows));
546            s.setString(1, destination.getQualifiedName());
547            s.setString(2, clientId);
548            s.setString(3, subscriptionName);
549            s.setLong(4, seq);
550            s.setLong(5, priority);
551            rs = s.executeQuery();
552            int count = 0;
553            if (this.statements.isUseExternalMessageReferences()) {
554                while (rs.next() && count < maxReturned) {
555                    if (listener.recoverMessageReference(rs.getString(1))) {
556                        count++;
557                    }
558                }
559            } else {
560                while (rs.next() && count < maxReturned) {
561                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
562                        count++;
563                    }
564                }
565            }
566        } finally {
567            cleanupExclusiveLock.readLock().unlock();
568            close(rs);
569            close(s);
570        }
571    }
572
573    public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
574            String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException {
575        PreparedStatement s = null;
576        ResultSet rs = null;
577        int result = 0;
578        cleanupExclusiveLock.readLock().lock();
579        try {
580            if (isPrioritizedMessages) {
581                s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
582            } else {
583                s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());    
584            }
585            s.setString(1, destination.getQualifiedName());
586            s.setString(2, clientId);
587            s.setString(3, subscriptionName);
588            rs = s.executeQuery();
589            if (rs.next()) {
590                result = rs.getInt(1);
591            }
592        } finally {
593            cleanupExclusiveLock.readLock().unlock();
594            close(rs);
595            close(s);
596        }
597        return result;
598    }
599
600    /**
601     * @param c 
602     * @param info 
603     * @param retroactive 
604     * @throws SQLException 
605     * @throws IOException 
606     */
607    public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages)
608            throws SQLException, IOException {
609        // dumpTables(c, destination.getQualifiedName(), clientId,
610        // subscriptionName);
611        PreparedStatement s = null;
612        cleanupExclusiveLock.readLock().lock();
613        try {
614            long lastMessageId = -1;
615            if (!retroactive) {
616                s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
617                ResultSet rs = null;
618                try {
619                    rs = s.executeQuery();
620                    if (rs.next()) {
621                        lastMessageId = rs.getLong(1);
622                    }
623                } finally {
624                    close(rs);
625                    close(s);
626                }
627            }
628            s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
629            int maxPriority = 1;
630            if (isPrioritizedMessages) {
631                maxPriority = 10;
632            }
633
634            for (int priority = 0; priority < maxPriority; priority++) {
635                s.setString(1, info.getDestination().getQualifiedName());
636                s.setString(2, info.getClientId());
637                s.setString(3, info.getSubscriptionName());
638                s.setString(4, info.getSelector());
639                s.setLong(5, lastMessageId);
640                s.setString(6, info.getSubscribedDestination().getQualifiedName());
641                s.setLong(7, priority);
642
643                if (s.executeUpdate() != 1) {
644                    throw new IOException("Could not create durable subscription for: " + info.getClientId());
645                }
646            }
647
648        } finally {
649            cleanupExclusiveLock.readLock().unlock();
650            close(s);
651        }
652    }
653
654    public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
655            String clientId, String subscriptionName) throws SQLException, IOException {
656        PreparedStatement s = null;
657        ResultSet rs = null;
658        cleanupExclusiveLock.readLock().lock();
659        try {
660            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
661            s.setString(1, destination.getQualifiedName());
662            s.setString(2, clientId);
663            s.setString(3, subscriptionName);
664            rs = s.executeQuery();
665            if (!rs.next()) {
666                return null;
667            }
668            SubscriptionInfo subscription = new SubscriptionInfo();
669            subscription.setDestination(destination);
670            subscription.setClientId(clientId);
671            subscription.setSubscriptionName(subscriptionName);
672            subscription.setSelector(rs.getString(1));
673            subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
674                    ActiveMQDestination.QUEUE_TYPE));
675            return subscription;
676        } finally {
677            cleanupExclusiveLock.readLock().unlock();
678            close(rs);
679            close(s);
680        }
681    }
682
683    public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
684            throws SQLException, IOException {
685        PreparedStatement s = null;
686        ResultSet rs = null;
687        cleanupExclusiveLock.readLock().lock();
688        try {
689            s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
690            s.setString(1, destination.getQualifiedName());
691            rs = s.executeQuery();
692            ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
693            while (rs.next()) {
694                SubscriptionInfo subscription = new SubscriptionInfo();
695                subscription.setDestination(destination);
696                subscription.setSelector(rs.getString(1));
697                subscription.setSubscriptionName(rs.getString(2));
698                subscription.setClientId(rs.getString(3));
699                subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
700                        ActiveMQDestination.QUEUE_TYPE));
701                rc.add(subscription);
702            }
703            return rc.toArray(new SubscriptionInfo[rc.size()]);
704        } finally {
705            cleanupExclusiveLock.readLock().unlock();
706            close(rs);
707            close(s);
708        }
709    }
710
711    public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
712            IOException {
713        PreparedStatement s = null;
714        cleanupExclusiveLock.readLock().lock();
715        try {
716            s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
717            s.setString(1, destinationName.getQualifiedName());
718            s.executeUpdate();
719            s.close();
720            s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement());
721            s.setString(1, destinationName.getQualifiedName());
722            s.executeUpdate();
723        } finally {
724            cleanupExclusiveLock.readLock().unlock();
725            close(s);
726        }
727    }
728
729    public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
730            String subscriptionName) throws SQLException, IOException {
731        PreparedStatement s = null;
732        cleanupExclusiveLock.readLock().lock();
733        try {
734            s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
735            s.setString(1, destination.getQualifiedName());
736            s.setString(2, clientId);
737            s.setString(3, subscriptionName);
738            s.executeUpdate();
739        } finally {
740            cleanupExclusiveLock.readLock().unlock();
741            close(s);
742        }
743    }
744
745    int priorityIterator = 0;
746    public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
747        PreparedStatement s = null;
748        cleanupExclusiveLock.writeLock().lock();
749        try {
750            LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
751            s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
752            int priority = priorityIterator++%10;
753            s.setInt(1, priority);
754            s.setInt(2, priority);
755            int i = s.executeUpdate();
756            LOG.debug("Deleted " + i + " old message(s) at priority: " + priority);
757        } finally {
758            cleanupExclusiveLock.writeLock().unlock();
759            close(s);
760        }
761    }
762
763    public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
764            String clientId, String subscriberName) throws SQLException, IOException {
765        PreparedStatement s = null;
766        ResultSet rs = null;
767        long result = -1;
768        cleanupExclusiveLock.readLock().lock();
769        try {
770            s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
771            s.setString(1, destination.getQualifiedName());
772            s.setString(2, clientId);
773            s.setString(3, subscriberName);
774            rs = s.executeQuery();
775            if (rs.next()) {
776                result = rs.getLong(1);
777                if (result == 0 && rs.wasNull()) {
778                    result = -1;
779                }
780            }
781        } finally {
782            cleanupExclusiveLock.readLock().unlock();
783            close(rs);
784            close(s);
785        }
786        return result;
787    }
788
789    protected static void close(PreparedStatement s) {
790        try {
791            s.close();
792        } catch (Throwable e) {
793        }
794    }
795
796    protected static void close(ResultSet rs) {
797        try {
798            rs.close();
799        } catch (Throwable e) {
800        }
801    }
802
803    public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException {
804        HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
805        PreparedStatement s = null;
806        ResultSet rs = null;
807        cleanupExclusiveLock.readLock().lock();
808        try {
809            s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
810            rs = s.executeQuery();
811            while (rs.next()) {
812                rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
813            }
814        } finally {
815            cleanupExclusiveLock.readLock().unlock();
816            close(rs);
817            close(s);
818        }
819        return rc;
820    }
821
822    /**
823     * @return true if batchStements
824     */
825    public boolean isBatchStatments() {
826        return this.batchStatments;
827    }
828
829    /**
830     * @param batchStatments
831     */
832    public void setBatchStatments(boolean batchStatments) {
833        this.batchStatments = batchStatments;
834    }
835
836    public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
837        this.statements.setUseExternalMessageReferences(useExternalMessageReferences);
838    }
839
840    /**
841     * @return the statements
842     */
843    public Statements getStatements() {
844        return this.statements;
845    }
846
847    public void setStatements(Statements statements) {
848        this.statements = statements;
849    }
850
851    public int getMaxRows() {
852        return maxRows;
853    }
854
855    public void setMaxRows(int maxRows) {
856        this.maxRows = maxRows;
857    }
858
859    @Override
860    public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException {
861        PreparedStatement s = null;
862        cleanupExclusiveLock.readLock().lock();
863        try {
864            s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
865            s.setString(1, destination.getQualifiedName());
866            s.setString(2, destination.getQualifiedName());
867            s.setString(3, destination.getQualifiedName());
868            s.setString(4, null);
869            s.setLong(5, 0);
870            s.setString(6, destination.getQualifiedName());
871            s.setLong(7, 11);  // entry out of priority range
872
873            if (s.executeUpdate() != 1) {
874                throw new IOException("Could not create ack record for destination: " + destination);
875            }
876        } finally {
877            cleanupExclusiveLock.readLock().unlock();
878            close(s);
879        }
880    }
881
882    /**
883     * @param c
884     * @param destination
885     * @param clientId
886     * @param subscriberName
887     * @return
888     * @throws SQLException
889     * @throws IOException
890     */
891    public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,
892            String clientId, String subscriberName) throws SQLException, IOException {
893        PreparedStatement s = null;
894        ResultSet rs = null;
895        cleanupExclusiveLock.readLock().lock();
896        try {
897            s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement());
898            s.setString(1, destination.getQualifiedName());
899            s.setString(2, clientId);
900            s.setString(3, subscriberName);
901            rs = s.executeQuery();
902            if (!rs.next()) {
903                return null;
904            }
905            return getBinaryData(rs, 1);
906        } finally {
907            close(rs);
908            cleanupExclusiveLock.readLock().unlock();
909            close(s);
910        }
911    }
912
913    public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
914            IOException {
915        PreparedStatement s = null;
916        ResultSet rs = null;
917        int result = 0;
918        cleanupExclusiveLock.readLock().lock();
919        try {
920            s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
921            s.setString(1, destination.getQualifiedName());
922            rs = s.executeQuery();
923            if (rs.next()) {
924                result = rs.getInt(1);
925            }
926        } finally {
927            cleanupExclusiveLock.readLock().unlock();
928            close(rs);
929            close(s);
930        }
931        return result;
932    }
933
934    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
935            long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
936        PreparedStatement s = null;
937        ResultSet rs = null;
938        cleanupExclusiveLock.readLock().lock();
939        try {
940            if (isPrioritizedMessages) {
941                s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement());
942            } else {
943                s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
944            }
945            s.setMaxRows(Math.max(maxReturned * 2, maxRows));
946            s.setString(1, destination.getQualifiedName());
947            s.setLong(2, nextSeq);
948            if (isPrioritizedMessages) {
949                s.setLong(3, priority);
950                s.setLong(4, priority);
951            }
952            rs = s.executeQuery();
953            int count = 0;
954            if (this.statements.isUseExternalMessageReferences()) {
955                while (rs.next() && count < maxReturned) {
956                    if (listener.recoverMessageReference(rs.getString(1))) {
957                        count++;
958                    } else {
959                        LOG.debug("Stopped recover next messages");
960                        break;
961                    }
962                }
963            } else {
964                while (rs.next() && count < maxReturned) {
965                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
966                        count++;
967                    } else {
968                        LOG.debug("Stopped recover next messages");
969                        break;
970                    }
971                }
972            }
973        } catch (Exception e) {
974            e.printStackTrace();
975        } finally {
976            cleanupExclusiveLock.readLock().unlock();
977            close(rs);
978            close(s);
979        }
980    }
981    
982/*    public void dumpTables(Connection c, String destinationName, String clientId, String
983      subscriptionName) throws SQLException { 
984        printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 
985        printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 
986        PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM " 
987                + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " 
988                + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 
989                + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" 
990                + " ORDER BY M.ID");
991      s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
992      printQuery(s,System.out); }
993
994    public void dumpTables(Connection c) throws SQLException {
995        printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
996        printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
997    }
998
999    private void printQuery(Connection c, String query, PrintStream out)
1000            throws SQLException {
1001        printQuery(c.prepareStatement(query), out);
1002    }
1003
1004    private void printQuery(PreparedStatement s, PrintStream out)
1005            throws SQLException {
1006
1007        ResultSet set = null;
1008        try {
1009            set = s.executeQuery();
1010            ResultSetMetaData metaData = set.getMetaData();
1011            for (int i = 1; i <= metaData.getColumnCount(); i++) {
1012                if (i == 1)
1013                    out.print("||");
1014                out.print(metaData.getColumnName(i) + "||");
1015            }
1016            out.println();
1017            while (set.next()) {
1018                for (int i = 1; i <= metaData.getColumnCount(); i++) {
1019                    if (i == 1)
1020                        out.print("|");
1021                    out.print(set.getString(i) + "|");
1022                }
1023                out.println();
1024            }
1025        } finally {
1026            try {
1027                set.close();
1028            } catch (Throwable ignore) {
1029            }
1030            try {
1031                s.close();
1032            } catch (Throwable ignore) {
1033            }
1034        }
1035    }  */
1036
1037    public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
1038            throws SQLException, IOException {
1039        PreparedStatement s = null;
1040        ResultSet rs = null;
1041        cleanupExclusiveLock.readLock().lock();
1042        try {
1043            s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
1044            s.setString(1, id.toString());
1045            rs = s.executeQuery();
1046            long seq = -1;
1047            if (rs.next()) {
1048                seq = rs.getLong(1);
1049            }
1050            return seq;
1051        } finally {
1052            cleanupExclusiveLock.readLock().unlock();
1053            close(rs);
1054            close(s);
1055        }
1056    }
1057
1058}