/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.mq.pm.jdbc2;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.StreamCorruptedException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import javax.jms.JMSException;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanException;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.sql.DataSource;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.Xid;
import org.jboss.mq.SpyDestination;
import org.jboss.mq.SpyJMSException;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.SpyTopic;
import org.jboss.mq.pm.CacheStore;
import org.jboss.mq.pm.Tx;
import org.jboss.mq.pm.TxManager;
import org.jboss.mq.pm.jdbc2.PersistenceManagerMBean;
import org.jboss.mq.server.JMSDestination;
import org.jboss.mq.server.MessageCache;
import org.jboss.mq.server.MessageReference;
import org.jboss.system.ServiceMBeanSupport;
import org.jboss.tm.TransactionManagerService;
import org.jboss.tm.TransactionTimeoutConfiguration;

public class PersistenceManager
extends ServiceMBeanSupport
implements PersistenceManagerMBean,
org.jboss.mq.pm.PersistenceManager,
CacheStore {
    protected SynchronizedLong nextTransactionId = new SynchronizedLong(0L);
    protected TxManager txManager;
    protected DataSource datasource;
    protected TransactionManager tm;
    private int recoveryTimeout = 0;
    private int recoveryRetries = 0;
    private int recoverMessagesChunk = 0;
    protected String UPDATE_MARKED_MESSAGES = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=?";
    protected String UPDATE_MARKED_MESSAGES_XARECOVERY = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID NOT IN (SELECT TXID FROM JMS_TRANSACTIONS WHERE XID IS NOT NULL)";
    protected String UPDATE_MARKED_MESSAGES_WITH_TX = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?";
    protected String DELETE_MARKED_MESSAGES_WITH_TX = "DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS) AND TXOP=?";
    protected String DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY = "DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS WHERE XID = NULL) AND TXOP=?";
    protected String DELETE_TX = "DELETE FROM JMS_TRANSACTIONS WHERE TXID = ?";
    protected String DELETE_MARKED_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXID=? AND TXOP=?";
    protected String DELETE_TEMPORARY_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXOP = 'T'";
    protected String INSERT_TX = "INSERT INTO JMS_TRANSACTIONS (TXID) values(?)";
    protected String INSERT_TX_XARECOVERY = "INSERT INTO JMS_TRANSACTIONS (TXID, XID) values(?, ?)";
    protected String DELETE_ALL_TX = "DELETE FROM JMS_TRANSACTIONS";
    protected String DELETE_ALL_TX_XARECOVERY = "DELETE FROM JMS_TRANSACTIONS WHERE XID = NULL";
    protected String SELECT_MAX_TX = "SELECT MAX(TXID) FROM (SELECT MAX(TXID) FROM JMS_TRANSACTIONS UNION SELECT MAX(TXID) FROM JMS_MESSAGES)";
    protected String SELECT_ALL_TX_XARECOVERY = "SELECT TXID, XID FROM JMS_TRANSACTIONS";
    protected String SELECT_MESSAGES_IN_DEST = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE DESTINATION=?";
    protected String SELECT_MESSAGES_IN_DEST_XARECOVERY = "SELECT MESSAGEID, MESSAGEBLOB, TXID, TXOP FROM JMS_MESSAGES WHERE DESTINATION=?";
    protected String SELECT_MESSAGE_KEYS_IN_DEST = "SELECT MESSAGEID FROM JMS_MESSAGES WHERE DESTINATION=?";
    protected String SELECT_MESSAGE = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
    protected String SELECT_MESSAGE_XARECOVERY = "SELECT MESSAGEID, MESSAGEBLOB, TXID, TXOP FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
    protected String INSERT_MESSAGE = "INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP) VALUES(?,?,?,?,?)";
    protected String MARK_MESSAGE = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?";
    protected String DELETE_MESSAGE = "DELETE FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
    protected String UPDATE_MESSAGE = "UPDATE JMS_MESSAGES SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?";
    protected String CREATE_MESSAGE_TABLE = "CREATE TABLE JMS_MESSAGES ( MESSAGEID INTEGER NOT NULL, DESTINATION VARCHAR(32) NOT NULL, TXID INTEGER, TXOP CHAR(1),MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) )";
    protected String CREATE_IDX_MESSAGE_TXOP_TXID = "CREATE INDEX JMS_MESSAGES_TXOP_TXID ON JMS_MESSAGES (TXOP, TXID)";
    protected String CREATE_IDX_MESSAGE_DESTINATION = "CREATE INDEX JMS_MESSAGES_DESTINATION ON JMS_MESSAGES (DESTINATION)";
    protected String CREATE_TX_TABLE = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, PRIMARY KEY (TXID) )";
    protected String CREATE_TX_TABLE_XARECOVERY = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, XID OBJECT, PRIMARY KEY (TXID) )";
    protected static final int OBJECT_BLOB = 0;
    protected static final int BYTES_BLOB = 1;
    protected static final int BINARYSTREAM_BLOB = 2;
    protected static final int BLOB_BLOB = 3;
    protected int blobType = 0;
    protected boolean createTables;
    protected int connectionRetryAttempts = 5;
    protected boolean xaRecovery = false;
    protected ObjectName connectionManagerName;
    protected Properties sqlProperties = new Properties();

    public PersistenceManager() throws JMSException {
        this.txManager = new TxManager(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected synchronized void createSchema() throws JMSException {
        block66: {
            TransactionManagerStrategy tms = new TransactionManagerStrategy();
            tms.startTX();
            Connection c = null;
            Statement stmt = null;
            boolean threadWasInterrupted = Thread.interrupted();
            try {
                if (!this.createTables) break block66;
                c = this.getConnection();
                boolean createdMessageTable = false;
                try {
                    stmt = c.prepareStatement(this.CREATE_MESSAGE_TABLE);
                    stmt.executeUpdate();
                    createdMessageTable = true;
                }
                catch (SQLException e) {
                    this.log.debug("Could not create table with SQL: " + this.CREATE_MESSAGE_TABLE, e);
                }
                finally {
                    try {
                        if (stmt != null) {
                            stmt.close();
                        }
                    }
                    catch (Throwable ignored) {
                        this.log.trace("Ignored: " + ignored);
                    }
                    stmt = null;
                }
                if (createdMessageTable) {
                    try {
                        stmt = c.prepareStatement(this.CREATE_IDX_MESSAGE_TXOP_TXID);
                        stmt.executeUpdate();
                    }
                    catch (SQLException e) {
                        this.log.debug("Could not create index with SQL: " + this.CREATE_IDX_MESSAGE_TXOP_TXID, e);
                    }
                    finally {
                        try {
                            if (stmt != null) {
                                stmt.close();
                            }
                        }
                        catch (Throwable ignored) {
                            this.log.trace("Ignored: " + ignored);
                        }
                        stmt = null;
                    }
                    try {
                        stmt = c.prepareStatement(this.CREATE_IDX_MESSAGE_DESTINATION);
                        stmt.executeUpdate();
                    }
                    catch (SQLException e) {
                        this.log.debug("Could not create index with SQL: " + this.CREATE_IDX_MESSAGE_DESTINATION, e);
                    }
                    finally {
                        try {
                            if (stmt != null) {
                                stmt.close();
                            }
                        }
                        catch (Throwable ignored) {
                            this.log.trace("Ignored: " + ignored);
                        }
                        stmt = null;
                    }
                }
                String createTxTable = this.CREATE_TX_TABLE;
                if (this.xaRecovery) {
                    createTxTable = this.CREATE_TX_TABLE_XARECOVERY;
                }
                try {
                    stmt = c.prepareStatement(createTxTable);
                    stmt.executeUpdate();
                }
                catch (SQLException e) {
                    this.log.debug("Could not create table with SQL: " + createTxTable, e);
                }
                finally {
                    try {
                        if (stmt != null) {
                            stmt.close();
                        }
                    }
                    catch (Throwable ignored) {
                        this.log.trace("Ignored: " + ignored);
                    }
                    stmt = null;
                }
            }
            catch (SQLException e) {
                tms.setRollbackOnly();
                throw new SpyJMSException("Could not get a connection for jdbc2 table construction ", e);
            }
            finally {
                try {
                    if (stmt != null) {
                        stmt.close();
                    }
                }
                catch (Throwable ignore) {}
                stmt = null;
                try {
                    if (c != null) {
                        c.close();
                    }
                }
                catch (Throwable ignore) {}
                c = null;
                tms.endTX();
                if (threadWasInterrupted) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    protected synchronized void resolveAllUncommitedTXs() throws JMSException {
        TransactionManagerStrategy tms = new TransactionManagerStrategy();
        tms.startTX();
        Connection c = null;
        PreparedStatement stmt = null;
        ResultSet rs = null;
        boolean threadWasInterrupted = Thread.interrupted();
        try {
            c = this.getConnection();
            stmt = c.prepareStatement(this.SELECT_MAX_TX);
            rs = stmt.executeQuery();
            if (rs.next()) {
                this.nextTransactionId.set(rs.getLong(1) + 1L);
            }
            rs.close();
            rs = null;
            stmt.close();
            stmt = null;
            stmt = c.prepareStatement(this.DELETE_TEMPORARY_MESSAGES);
            stmt.executeUpdate();
            stmt.close();
            stmt = null;
            String deleteMarkedMessagesWithTx = this.DELETE_MARKED_MESSAGES_WITH_TX;
            if (this.xaRecovery) {
                deleteMarkedMessagesWithTx = this.DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY;
            }
            stmt = c.prepareStatement(deleteMarkedMessagesWithTx);
            stmt.setString(1, "A");
            stmt.executeUpdate();
            stmt.close();
            stmt = null;
            String updateMarkedMessages = this.UPDATE_MARKED_MESSAGES;
            if (this.xaRecovery) {
                updateMarkedMessages = this.UPDATE_MARKED_MESSAGES_XARECOVERY;
            }
            stmt = c.prepareStatement(updateMarkedMessages);
            stmt.setNull(1, -5);
            stmt.setString(2, "A");
            stmt.setString(3, "D");
            stmt.executeUpdate();
            stmt.close();
            stmt = null;
            String deleteAllTx = this.DELETE_ALL_TX;
            if (this.xaRecovery) {
                deleteAllTx = this.DELETE_ALL_TX_XARECOVERY;
            }
            stmt = c.prepareStatement(deleteAllTx);
            stmt.execute();
            stmt.close();
            stmt = null;
            if (this.xaRecovery) {
                stmt = c.prepareStatement(this.SELECT_ALL_TX_XARECOVERY);
                rs = stmt.executeQuery();
                while (rs.next()) {
                    long txid = rs.getLong(1);
                    Xid xid = this.extractXid(rs, 2);
                    Tx tx = new Tx(txid);
                    tx.setXid(xid);
                    tx.checkPersisted();
                    this.txManager.restoreTx(tx);
                }
                rs.close();
                rs = null;
                stmt.close();
                stmt = null;
            }
        }
        catch (Exception e) {
            tms.setRollbackOnly();
            throw new SpyJMSException("Could not resolve uncommited transactions.  Message recovery may not be accurate", e);
        }
        finally {
            try {
                if (rs != null) {
                    rs.close();
                }
            }
            catch (Throwable ignore) {}
            try {
                if (stmt != null) {
                    stmt.close();
                }
            }
            catch (Throwable ignore) {}
            try {
                if (c != null) {
                    c.close();
                }
            }
            catch (Throwable ignore) {}
            tms.endTX();
            if (threadWasInterrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void restoreQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException {
        if (jmsDest == null) {
            throw new IllegalArgumentException("Must supply non null JMSDestination to restoreQueue");
        }
        if (dest == null) {
            throw new IllegalArgumentException("Must supply non null SpyDestination to restoreQueue");
        }
        boolean canOverrideTimeout = this.tm instanceof TransactionTimeoutConfiguration;
        int previousTimeout = 0;
        try {
            if (this.recoveryTimeout != 0) {
                if (canOverrideTimeout) {
                    previousTimeout = ((TransactionTimeoutConfiguration)((Object)this.tm)).getTransactionTimeout();
                    this.tm.setTransactionTimeout(this.recoveryTimeout);
                } else {
                    this.log.debug("Cannot override recovery timeout, TransactionManager does implement " + TransactionTimeoutConfiguration.class.getName());
                }
            }
            try {
                this.internalRestoreQueue(jmsDest, dest);
            }
            finally {
                if (this.recoveryTimeout != 0 && canOverrideTimeout) {
                    this.tm.setTransactionTimeout(previousTimeout);
                }
            }
        }
        catch (Exception e) {
            SpyJMSException.rethrowAsJMSException("Unexpected error in recovery", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected synchronized void internalRestoreQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException {
        HashMap<Long, Tx> prepared = null;
        if (this.xaRecovery) {
            prepared = new HashMap<Long, Tx>();
            Map map = this.txManager.getPreparedTransactions();
            Iterator i = map.values().iterator();
            while (i.hasNext()) {
                TxManager.PreparedInfo info = (TxManager.PreparedInfo)i.next();
                Iterator j = info.getTxids().iterator();
                while (j.hasNext()) {
                    Tx tx = (Tx)j.next();
                    prepared.put(new Long(tx.longValue()), tx);
                }
            }
        }
        TransactionManagerStrategy tms = new TransactionManagerStrategy();
        tms.startTX();
        Connection c = null;
        Statement stmt = null;
        PreparedStatement stmt2 = null;
        ResultSet rs = null;
        boolean threadWasInterrupted = Thread.interrupted();
        try {
            String selectMessagesInDest = this.SELECT_MESSAGES_IN_DEST;
            String selectMessage = this.SELECT_MESSAGE;
            if (this.xaRecovery) {
                selectMessagesInDest = this.SELECT_MESSAGES_IN_DEST_XARECOVERY;
                selectMessage = this.SELECT_MESSAGE_XARECOVERY;
            }
            c = this.getConnection();
            if (this.recoverMessagesChunk == 0) {
                stmt = c.prepareStatement(selectMessagesInDest);
            } else {
                stmt = c.prepareStatement(this.SELECT_MESSAGE_KEYS_IN_DEST);
                stmt2 = c.prepareStatement(selectMessage);
            }
            stmt.setString(1, dest.toString());
            long txid = 0L;
            String txop = null;
            rs = stmt.executeQuery();
            int counter = 0;
            int recovery = 0;
            while (rs.next()) {
                long msgid = rs.getLong(1);
                SpyMessage message = null;
                if (this.recoverMessagesChunk == 0) {
                    message = this.extractMessage(rs);
                    if (this.xaRecovery) {
                        txid = rs.getLong(3);
                        txop = rs.getString(4);
                    }
                } else {
                    ResultSet rs2 = null;
                    try {
                        stmt2.setLong(1, msgid);
                        stmt2.setString(2, dest.toString());
                        rs2 = stmt2.executeQuery();
                        if (rs2.next()) {
                            message = this.extractMessage(rs2);
                            if (this.xaRecovery) {
                                txid = rs.getLong(3);
                                txop = rs.getString(4);
                            }
                        } else {
                            this.log.warn("Failed to find message msgid=" + msgid + " dest=" + dest);
                        }
                    }
                    finally {
                        if (rs2 != null) {
                            try {
                                rs2.close();
                            }
                            catch (Exception ignored) {}
                        }
                    }
                }
                if (dest instanceof SpyTopic) {
                    message.header.durableSubscriberID = ((SpyTopic)dest).getDurableSubscriptionID();
                }
                if (!this.xaRecovery || txid == 0L || txop == null) {
                    jmsDest.restoreMessage(message);
                } else {
                    Tx tx = (Tx)prepared.get(new Long(txid));
                    if (tx == null) {
                        jmsDest.restoreMessage(message);
                    } else if ("A".equals(txop)) {
                        jmsDest.restoreMessage(message, tx, 1);
                        ++recovery;
                    } else if ("D".equals(txop)) {
                        jmsDest.restoreMessage(message, tx, -1);
                        ++recovery;
                    } else {
                        throw new IllegalStateException("Unknown txop=" + txop + " for msg=" + msgid + " dest=" + dest);
                    }
                }
                ++counter;
            }
            this.log.debug("Restored " + counter + " message(s) to: " + dest + " " + recovery + " need recovery.");
        }
        catch (IOException e) {
            tms.setRollbackOnly();
            throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
        }
        catch (SQLException e) {
            tms.setRollbackOnly();
            throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
        }
        finally {
            try {
                if (rs != null) {
                    rs.close();
                }
            }
            catch (Throwable ignore) {}
            try {
                if (stmt != null) {
                    stmt.close();
                }
            }
            catch (Throwable ignore) {}
            try {
                if (c != null) {
                    c.close();
                }
            }
            catch (Throwable ignore) {}
            tms.endTX();
            if (threadWasInterrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    SpyMessage extractMessage(ResultSet rs) throws SQLException, IOException {
        try {
            long messageid = rs.getLong(1);
            SpyMessage message = null;
            if (this.blobType == 0) {
                message = (SpyMessage)rs.getObject(2);
            } else if (this.blobType == 1) {
                byte[] st = rs.getBytes(2);
                ByteArrayInputStream baip = new ByteArrayInputStream(st);
                ObjectInputStream ois = new ObjectInputStream(baip);
                message = SpyMessage.readMessage(ois);
            } else if (this.blobType == 2) {
                ObjectInputStream ois = new ObjectInputStream(rs.getBinaryStream(2));
                message = SpyMessage.readMessage(ois);
            } else if (this.blobType == 3) {
                ObjectInputStream ois = new ObjectInputStream(rs.getBlob(2).getBinaryStream());
                message = SpyMessage.readMessage(ois);
            }
            message.header.messageId = messageid;
            return message;
        }
        catch (StreamCorruptedException e) {
            throw new IOException("Could not load the message: " + e);
        }
    }

    Xid extractXid(ResultSet rs, int column) throws SQLException, IOException, ClassNotFoundException {
        try {
            Xid xid = null;
            if (this.blobType == 0) {
                xid = (Xid)rs.getObject(column);
            } else if (this.blobType == 1) {
                byte[] st = rs.getBytes(column);
                ByteArrayInputStream baip = new ByteArrayInputStream(st);
                ObjectInputStream ois = new ObjectInputStream(baip);
                xid = (Xid)ois.readObject();
            } else if (this.blobType == 2) {
                ObjectInputStream ois = new ObjectInputStream(rs.getBinaryStream(column));
                xid = (Xid)ois.readObject();
            } else if (this.blobType == 3) {
                ObjectInputStream ois = new ObjectInputStream(rs.getBlob(column).getBinaryStream());
                xid = (Xid)ois.readObject();
            }
            return xid;
        }
        catch (StreamCorruptedException e) {
            throw new IOException("Could not load the message: " + e);
        }
    }

    public void commitPersistentTx(Tx txId) throws JMSException {
        if (!txId.wasPersisted()) {
            return;
        }
        TransactionManagerStrategy tms = new TransactionManagerStrategy();
        tms.startTX();
        Connection c = null;
        boolean threadWasInterrupted = Thread.interrupted();
        try {
            c = this.getConnection();
            this.removeMarkedMessages(c, txId, "D");
            this.removeTXRecord(c, txId.longValue());
        }
        catch (SQLException e) {
            tms.setRollbackOnly();
            throw new SpyJMSException("Could not commit tx: " + txId, e);
        }
        finally {
            try {
                if (c != null) {
                    c.close();
                }
            }
            catch (Throwable ignore) {}
            tms.endTX();
            if (threadWasInterrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeMarkedMessages(Connection c, Tx txid, String mark) throws SQLException {
        PreparedStatement stmt = null;
        try {
            stmt = c.prepareStatement(this.DELETE_MARKED_MESSAGES);
            stmt.setLong(1, txid.longValue());
            stmt.setString(2, mark);
            stmt.executeUpdate();
        }
        finally {
            try {
                if (stmt != null) {
                    stmt.close();
                }
            }
            catch (Throwable e) {}
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addTXRecord(Connection c, Tx txid) throws SQLException, IOException {
        Statement stmt = null;
        try {
            String insertTx = this.INSERT_TX;
            if (this.xaRecovery) {
                insertTx = this.INSERT_TX_XARECOVERY;
            }
            stmt = c.prepareStatement(insertTx);
            stmt.setLong(1, txid.longValue());
            if (this.xaRecovery) {
                Xid xid = txid.getXid();
                if (xid != null) {
                    this.setBlob((PreparedStatement)stmt, 2, xid);
                } else {
                    stmt.setNull(2, 2004);
                }
            }
            stmt.executeUpdate();
        }
        finally {
            try {
                if (stmt != null) {
                    stmt.close();
                }
            }
            catch (Throwable e) {}
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeTXRecord(Connection c, long txid) throws SQLException {
        PreparedStatement stmt = null;
        try {
            stmt = c.prepareStatement(this.DELETE_TX);
            stmt.setLong(1, txid);
            stmt.executeUpdate();
        }
        finally {
            try {
                if (stmt != null) {
                    stmt.close();
                }
            }
            catch (Throwable e) {}
        }
    }

    public void rollbackPersistentTx(Tx txId) throws JMSException {
        if (!txId.wasPersisted()) {
            return;
        }
        TransactionManagerStrategy tms = new TransactionManagerStrategy();
        tms.startTX();
        Connection c = null;
        Statement stmt = null;
        boolean threadWasInterrupted = Thread.interrupted();
        try {
            c = this.getConnection();
            this.removeMarkedMessages(c, txId, "A");
            this.removeTXRecord(c, txId.longValue());
            stmt = c.prepareStatement(this.UPDATE_MARKED_MESSAGES_WITH_TX);
            stmt.setNull(1, -5);
            stmt.setString(2, "A");
            stmt.setString(3, "D");
            stmt.setLong(4, txId.longValue());
            stmt.executeUpdate();
            stmt.close();
            stmt = null;
        }
        catch (SQLException e) {
            tms.setRollbackOnly();
            throw new SpyJMSException("Could not rollback tx: " + txId, e);
        }
        finally {
            try {
                if (stmt != null) {
                    stmt.close();
                }
            }
            catch (Throwable ignore) {}
            try {
                if (c != null) {
                    c.close();
                }
            }
            catch (Throwable ignore) {}
            tms.endTX();
            if (threadWasInterrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public Tx createPersistentTx() throws JMSException {
        Tx id = new Tx(this.nextTransactionId.increment());
        return id;
    }

    public void insertPersistentTx(TransactionManagerStrategy tms, Connection c, Tx tx) throws JMSException {
        try {
            if (tx != null && !tx.checkPersisted()) {
                this.addTXRecord(c, tx);
            }
        }
        catch (Exception e) {
            tms.setRollbackOnly();
            throw new SpyJMSException("Could not create tx: " + tx.longValue(), e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void add(MessageReference messageRef, Tx txId) throws JMSException {
        boolean trace = this.log.isTraceEnabled();
        if (trace) {
            this.log.trace("About to add message " + messageRef + " transaction=" + txId);
        }
        TransactionManagerStrategy tms = new TransactionManagerStrategy();
        tms.startTX();
        Connection c = null;
        boolean threadWasInterrupted = Thread.interrupted();
        try {
            c = this.getConnection();
            this.insertPersistentTx(tms, c, txId);
            MessageReference messageReference = messageRef;
            synchronized (messageReference) {
                SpyMessage message = messageRef.getMessage();
                if (messageRef.stored == 2) {
                    if (trace) {
                        this.log.trace("Updating message " + messageRef + " transaction=" + txId);
                    }
                    this.markMessage(c, messageRef.messageId, messageRef.getPersistentKey(), txId, "A");
                } else {
                    if (trace) {
                        this.log.trace("Inserting message " + messageRef + " transaction=" + txId);
                    }
                    this.add(c, messageRef.getPersistentKey(), message, txId, "A");
                    messageRef.setStored(2);
                }
                if (trace) {
                    this.log.trace("Added message " + messageRef + " transaction=" + txId);
                }
            }
        }
        catch (IOException e) {
            tms.setRollbackOnly();
            throw new SpyJMSException("Could not store message: " + messageRef, e);
        }
        catch (SQLException e) {
            tms.setRollbackOnly();
            throw new SpyJMSException("Could not store message: " + messageRef, e);
        }
        finally {
            try {
                if (c != null) {
                    c.close();
                }
            }
            catch (Throwable ignore) {}
            tms.endTX();
            if (threadWasInterrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void add(Connection c, String queue, SpyMessage message, Tx txId, String mark) throws SQLException, IOException {
        PreparedStatement stmt = null;
        try {
            stmt = c.prepareStatement(this.INSERT_MESSAGE);
            stmt.setLong(1, message.header.messageId);
            stmt.setString(2, queue);
            this.setBlob(stmt, 3, message);
            if (txId != null) {
                stmt.setLong(4, txId.longValue());
            } else {
                stmt.setNull(4, -5);
            }
            stmt.setString(5, mark);
            stmt.executeUpdate();
        }
        finally {
            try {
                if (stmt != null) {
                    stmt.close();
                }
            }
            catch (Throwable ignore) {}
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void markMessage(Connection c, long messageid, String destination, Tx txId, String mark) throws SQLException {
        PreparedStatement stmt = null;
        try {
            stmt = c.prepareStatement(this.MARK_MESSAGE);
            if (txId == null) {
                stmt.setNull(1, -5);
            } else {
                stmt.setLong(1, txId.longValue());
            }
            stmt.setString(2, mark);
            stmt.setLong(3, messageid);
            stmt.setString(4, destination);
            stmt.executeUpdate();
        }
        finally {
            try {
                if (stmt != null) {
                    stmt.close();
                }
            }
            catch (Throwable ignore) {}
        }
    }

    public void setBlob(PreparedStatement stmt, int column, SpyMessage message) throws IOException, SQLException {
        if (this.blobType == 0) {
            stmt.setObject(column, message);
        } else if (this.blobType == 1) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            SpyMessage.writeMessage(message, oos);
            oos.flush();
            byte[] messageAsBytes = baos.toByteArray();
            stmt.setBytes(column, messageAsBytes);
        } else if (this.blobType == 2) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            SpyMessage.writeMessage(message, oos);
            oos.flush();
            byte[] messageAsBytes = baos.toByteArray();
            ByteArrayInputStream bais = new ByteArrayInputStream(messageAsBytes);
            stmt.setBinaryStream(column, (InputStream)bais, messageAsBytes.length);
        } else if (this.blobType == 3) {
            throw new RuntimeException("BLOB_TYPE: BLOB_BLOB is not yet implemented.");
        }
    }

    public void setBlob(PreparedStatement stmt, int column, Xid xid) throws IOException, SQLException {
        if (this.blobType == 0) {
            stmt.setObject(column, xid);
        } else if (this.blobType == 1) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeObject(xid);
            oos.flush();
            byte[] messageAsBytes = baos.toByteArray();
            stmt.setBytes(column, messageAsBytes);
        } else if (this.blobType == 2) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeObject(xid);
            oos.flush();
            byte[] messageAsBytes = baos.toByteArray();
            ByteArrayInputStream bais = new ByteArrayInputStream(messageAsBytes);
            stmt.setBinaryStream(column, (InputStream)bais, messageAsBytes.length);
        } else if (this.blobType == 3) {
            throw new RuntimeException("BLOB_TYPE: BLOB_BLOB is not yet implemented.");
        }
    }

    public void update(MessageReference messageRef, Tx txId) throws JMSException {
        boolean trace = this.log.isTraceEnabled();
        if (trace) {
            this.log.trace("Updating message " + messageRef + " transaction=" + txId);
        }
        TransactionManagerStrategy tms = new TransactionManagerStrategy();
        tms.startTX();
        Connection c = null;
        Statement stmt = null;
        boolean threadWasInterrupted = Thread.interrupted();
        try {
            c = this.getConnection();
            if (txId == null) {
                stmt = c.prepareStatement(this.UPDATE_MESSAGE);
                this.setBlob((PreparedStatement)stmt, 1, messageRef.getMessage());
                stmt.setLong(2, messageRef.messageId);
                stmt.setString(3, messageRef.getPersistentKey());
                int rc = stmt.executeUpdate();
                if (rc != 1) {
                    throw new SpyJMSException("Could not update the message in the database: update affected " + rc + " rows");
                }
            } else {
                throw new SpyJMSException("NYI: Updating a message in a transaction is not currently used");
            }
            if (trace) {
                this.log.trace("Updated message " + messageRef + " transaction=" + txId);
            }
        }
        catch (IOException e) {
            tms.setRollbackOnly();
            throw new SpyJMSException("Could not update message: " + messageRef, e);
        }
        catch (SQLException e) {
            tms.setRollbackOnly();
            throw new SpyJMSException("Could not update message: " + messageRef, e);
        }
        finally {
            try {
                if (stmt != null) {
                    stmt.close();
                }
            }
            catch (Throwable ignore) {}
            try {
                if (c != null) {
                    c.close();
                }
            }
            catch (Throwable ignore) {}
            tms.endTX();
            if (threadWasInterrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void remove(MessageReference messageRef, Tx txId) throws JMSException {
        boolean trace = this.log.isTraceEnabled();
        if (trace) {
            this.log.trace("Removing message " + messageRef + " transaction=" + txId);
        }
        TransactionManagerStrategy tms = new TransactionManagerStrategy();
        tms.startTX();
        Connection c = null;
        Statement stmt = null;
        boolean threadWasInterrupted = Thread.interrupted();
        try {
            c = this.getConnection();
            this.insertPersistentTx(tms, c, txId);
            MessageReference messageReference = messageRef;
            synchronized (messageReference) {
                if (txId == null) {
                    stmt = c.prepareStatement(this.DELETE_MESSAGE);
                    stmt.setLong(1, messageRef.messageId);
                    stmt.setString(2, messageRef.getPersistentKey());
                    int rc = stmt.executeUpdate();
                    if (rc != 1) {
                        throw new SpyJMSException("Could not delete the message from the database: delete affected " + rc + " rows");
                    }
                    messageRef.setStored(1);
                    messageRef.removeDelayed();
                } else {
                    stmt = c.prepareStatement(this.MARK_MESSAGE);
                    stmt.setLong(1, txId.longValue());
                    stmt.setString(2, "D");
                    stmt.setLong(3, messageRef.messageId);
                    stmt.setString(4, messageRef.getPersistentKey());
                    int rc = stmt.executeUpdate();
                    if (rc != 1) {
                        throw new SpyJMSException("Could not mark the message as deleted in the database: update affected " + rc + " rows");
                    }
                }
                if (trace) {
                    this.log.trace("Removed message " + messageRef + " transaction=" + txId);
                }
            }
        }
        catch (SQLException e) {
            tms.setRollbackOnly();
            throw new SpyJMSException("Could not remove message: " + messageRef, e);
        }
        finally {
            try {
                if (stmt != null) {
                    stmt.close();
                }
            }
            catch (Throwable ignore) {}
            try {
                if (c != null) {
                    c.close();
                }
            }
            catch (Throwable ignore) {}
            tms.endTX();
            if (threadWasInterrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public TxManager getTxManager() {
        return this.txManager;
    }

    public void closeQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException {
    }

    public SpyMessage loadFromStorage(MessageReference messageRef) throws JMSException {
        if (this.log.isTraceEnabled()) {
            this.log.trace("Loading message from storage " + messageRef);
        }
        TransactionManagerStrategy tms = new TransactionManagerStrategy();
        tms.startTX();
        Connection c = null;
        PreparedStatement stmt = null;
        ResultSet rs = null;
        boolean threadWasInterrupted = Thread.interrupted();
        try {
            c = this.getConnection();
            stmt = c.prepareStatement(this.SELECT_MESSAGE);
            stmt.setLong(1, messageRef.messageId);
            stmt.setString(2, messageRef.getPersistentKey());
            rs = stmt.executeQuery();
            if (rs.next()) {
                SpyMessage spyMessage = this.extractMessage(rs);
                return spyMessage;
            }
            SpyMessage spyMessage = null;
            return spyMessage;
        }
        catch (IOException e) {
            tms.setRollbackOnly();
            throw new SpyJMSException("Could not load message : " + messageRef, e);
        }
        catch (SQLException e) {
            tms.setRollbackOnly();
            throw new SpyJMSException("Could not load message : " + messageRef, e);
        }
        finally {
            try {
                if (rs != null) {
                    rs.close();
                }
            }
            catch (Throwable ignore) {}
            try {
                if (stmt != null) {
                    stmt.close();
                }
            }
            catch (Throwable ignore) {}
            try {
                if (c != null) {
                    c.close();
                }
            }
            catch (Throwable ignore) {}
            tms.endTX();
            if (threadWasInterrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void removeFromStorage(MessageReference messageRef) throws JMSException {
        if (messageRef.isPersistent()) {
            return;
        }
        boolean trace = this.log.isTraceEnabled();
        if (trace) {
            this.log.trace("Removing message from storage " + messageRef);
        }
        TransactionManagerStrategy tms = new TransactionManagerStrategy();
        tms.startTX();
        Connection c = null;
        Statement stmt = null;
        boolean threadWasInterrupted = Thread.interrupted();
        try {
            c = this.getConnection();
            stmt = c.prepareStatement(this.DELETE_MESSAGE);
            stmt.setLong(1, messageRef.messageId);
            stmt.setString(2, messageRef.getPersistentKey());
            stmt.executeUpdate();
            messageRef.setStored(1);
            if (trace) {
                this.log.trace("Removed message from storage " + messageRef);
            }
        }
        catch (SQLException e) {
            tms.setRollbackOnly();
            throw new SpyJMSException("Could not remove message: " + messageRef, e);
        }
        finally {
            try {
                if (stmt != null) {
                    stmt.close();
                }
            }
            catch (Throwable ignore) {}
            try {
                if (c != null) {
                    c.close();
                }
            }
            catch (Throwable ignore) {}
            tms.endTX();
            if (threadWasInterrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void saveToStorage(MessageReference messageRef, SpyMessage message) throws JMSException {
        if (messageRef.isPersistent()) {
            return;
        }
        boolean trace = this.log.isTraceEnabled();
        if (trace) {
            this.log.trace("Saving message to storage " + messageRef);
        }
        TransactionManagerStrategy tms = new TransactionManagerStrategy();
        tms.startTX();
        Connection c = null;
        boolean threadWasInterrupted = Thread.interrupted();
        try {
            c = this.getConnection();
            this.add(c, messageRef.getPersistentKey(), message, null, "T");
            messageRef.setStored(2);
            if (trace) {
                this.log.trace("Saved message to storage " + messageRef);
            }
        }
        catch (IOException e) {
            tms.setRollbackOnly();
            throw new SpyJMSException("Could not store message: " + messageRef, e);
        }
        catch (SQLException e) {
            tms.setRollbackOnly();
            throw new SpyJMSException("Could not store message: " + messageRef, e);
        }
        finally {
            try {
                if (c != null) {
                    c.close();
                }
            }
            catch (Throwable ignore) {}
            tms.endTX();
            if (threadWasInterrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Connection getConnection() throws SQLException {
        int attempts = this.connectionRetryAttempts;
        int attemptCount = 0;
        SQLException sqlException = null;
        while (attempts-- > 0) {
            if (++attemptCount > 1) {
                this.log.debug("Retrying connection: attempt # " + attemptCount);
            }
            try {
                sqlException = null;
                Connection connection = this.datasource.getConnection();
                return connection;
            }
            catch (SQLException exception) {
                this.log.debug("Connection attempt # " + attemptCount + " failed with SQLException", exception);
                sqlException = exception;
            }
            finally {
                if (sqlException == null && attemptCount > 1) {
                    this.log.debug("Connection succeeded on attempt # " + attemptCount);
                }
            }
            if (attempts <= 0) continue;
            try {
                Thread.sleep(1500L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
                break;
            }
        }
        if (sqlException != null) {
            throw sqlException;
        }
        throw new SQLException("connection attempt interrupted");
    }

    public void startService() throws Exception {
        this.UPDATE_MARKED_MESSAGES = this.sqlProperties.getProperty("UPDATE_MARKED_MESSAGES", this.UPDATE_MARKED_MESSAGES);
        this.UPDATE_MARKED_MESSAGES_XARECOVERY = this.sqlProperties.getProperty("UPDATE_MARKED_MESSAGES_XARECOVERY", this.UPDATE_MARKED_MESSAGES_XARECOVERY);
        this.UPDATE_MARKED_MESSAGES_WITH_TX = this.sqlProperties.getProperty("UPDATE_MARKED_MESSAGES_WITH_TX", this.UPDATE_MARKED_MESSAGES_WITH_TX);
        this.DELETE_MARKED_MESSAGES_WITH_TX = this.sqlProperties.getProperty("DELETE_MARKED_MESSAGES_WITH_TX", this.DELETE_MARKED_MESSAGES_WITH_TX);
        this.DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY = this.sqlProperties.getProperty("DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY", this.DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY);
        this.DELETE_TX = this.sqlProperties.getProperty("DELETE_TX", this.DELETE_TX);
        this.DELETE_MARKED_MESSAGES = this.sqlProperties.getProperty("DELETE_MARKED_MESSAGES", this.DELETE_MARKED_MESSAGES);
        this.DELETE_TEMPORARY_MESSAGES = this.sqlProperties.getProperty("DELETE_TEMPORARY_MESSAGES", this.DELETE_TEMPORARY_MESSAGES);
        this.INSERT_TX = this.sqlProperties.getProperty("INSERT_TX", this.INSERT_TX);
        this.INSERT_TX_XARECOVERY = this.sqlProperties.getProperty("INSERT_TX_XARECOVERY", this.INSERT_TX_XARECOVERY);
        this.DELETE_ALL_TX = this.sqlProperties.getProperty("DELETE_ALL_TX", this.DELETE_ALL_TX);
        this.DELETE_ALL_TX_XARECOVERY = this.sqlProperties.getProperty("DELETE_ALL_TX_XARECOVERY", this.DELETE_ALL_TX_XARECOVERY);
        this.SELECT_ALL_TX_XARECOVERY = this.sqlProperties.getProperty("SELECT_ALL_TX_XARECOVERY", this.SELECT_ALL_TX_XARECOVERY);
        this.SELECT_MAX_TX = this.sqlProperties.getProperty("SELECT_MAX_TX", this.SELECT_MAX_TX);
        this.SELECT_MESSAGES_IN_DEST = this.sqlProperties.getProperty("SELECT_MESSAGES_IN_DEST", this.SELECT_MESSAGES_IN_DEST);
        this.SELECT_MESSAGES_IN_DEST_XARECOVERY = this.sqlProperties.getProperty("SELECT_MESSAGES_IN_DEST_XARECOVERY", this.SELECT_MESSAGES_IN_DEST_XARECOVERY);
        this.SELECT_MESSAGE_KEYS_IN_DEST = this.sqlProperties.getProperty("SELECT_MESSAGE_KEYS_IN_DEST", this.SELECT_MESSAGE_KEYS_IN_DEST);
        this.SELECT_MESSAGE = this.sqlProperties.getProperty("SELECT_MESSAGE", this.SELECT_MESSAGE);
        this.SELECT_MESSAGE_XARECOVERY = this.sqlProperties.getProperty("SELECT_MESSAGE_XARECOVERY", this.SELECT_MESSAGE_XARECOVERY);
        this.INSERT_MESSAGE = this.sqlProperties.getProperty("INSERT_MESSAGE", this.INSERT_MESSAGE);
        this.MARK_MESSAGE = this.sqlProperties.getProperty("MARK_MESSAGE", this.MARK_MESSAGE);
        this.DELETE_MESSAGE = this.sqlProperties.getProperty("DELETE_MESSAGE", this.DELETE_MESSAGE);
        this.UPDATE_MESSAGE = this.sqlProperties.getProperty("UPDATE_MESSAGE", this.UPDATE_MESSAGE);
        this.CREATE_MESSAGE_TABLE = this.sqlProperties.getProperty("CREATE_MESSAGE_TABLE", this.CREATE_MESSAGE_TABLE);
        this.CREATE_IDX_MESSAGE_TXOP_TXID = this.sqlProperties.getProperty("CREATE_IDX_MESSAGE_TXOP_TXID", this.CREATE_IDX_MESSAGE_TXOP_TXID);
        this.CREATE_IDX_MESSAGE_DESTINATION = this.sqlProperties.getProperty("CREATE_IDX_MESSAGE_DESTINATION", this.CREATE_IDX_MESSAGE_DESTINATION);
        this.CREATE_TX_TABLE = this.sqlProperties.getProperty("CREATE_TX_TABLE", this.CREATE_TX_TABLE);
        this.CREATE_TX_TABLE_XARECOVERY = this.sqlProperties.getProperty("CREATE_TX_TABLE_XARECOVERY", this.CREATE_TX_TABLE_XARECOVERY);
        this.createTables = this.sqlProperties.getProperty("CREATE_TABLES_ON_STARTUP", "true").equalsIgnoreCase("true");
        String s = this.sqlProperties.getProperty("BLOB_TYPE", "OBJECT_BLOB");
        if (s.equals("OBJECT_BLOB")) {
            this.blobType = 0;
        } else if (s.equals("BYTES_BLOB")) {
            this.blobType = 1;
        } else if (s.equals("BINARYSTREAM_BLOB")) {
            this.blobType = 2;
        } else if (s.equals("BLOB_BLOB")) {
            this.blobType = 3;
        }
        this.initializeFields();
        this.log.debug("Creating Schema");
        this.createSchema();
        this.log.debug("Resolving uncommited TXS");
        Throwable error = null;
        for (int i = 0; i <= this.recoveryRetries; ++i) {
            try {
                this.resolveAllUncommitedTXs();
                break;
            }
            catch (Throwable t) {
                if (i < this.recoveryRetries) {
                    this.log.warn("Error resolving transactions retries=" + i + " of " + this.recoveryRetries, t);
                    continue;
                }
                error = t;
                continue;
            }
        }
        if (error != null) {
            SpyJMSException.rethrowAsJMSException("Unable to resolve transactions retries=" + this.recoveryRetries, error);
        }
    }

    protected void initializeFields() throws MBeanException, AttributeNotFoundException, InstanceNotFoundException, ReflectionException, NamingException {
        String dsName = (String)this.getServer().getAttribute(this.connectionManagerName, "BindName");
        InitialContext ctx = new InitialContext();
        this.datasource = (DataSource)ctx.lookup(dsName);
        this.tm = (TransactionManager)ctx.lookup(TransactionManagerService.JNDI_NAME);
    }

    public Object getInstance() {
        return this;
    }

    public ObjectName getMessageCache() {
        throw new UnsupportedOperationException("This is now set on the destination manager");
    }

    public void setMessageCache(ObjectName messageCache) {
        throw new UnsupportedOperationException("This is now set on the destination manager");
    }

    public ObjectName getConnectionManager() {
        return this.connectionManagerName;
    }

    public void setConnectionManager(ObjectName connectionManagerName) {
        this.connectionManagerName = connectionManagerName;
    }

    public MessageCache getMessageCacheInstance() {
        throw new UnsupportedOperationException("This is now set on the destination manager");
    }

    public String getSqlProperties() {
        try {
            ByteArrayOutputStream boa = new ByteArrayOutputStream();
            this.sqlProperties.store(boa, "");
            return new String(boa.toByteArray());
        }
        catch (IOException shouldnothappen) {
            return "";
        }
    }

    public void setSqlProperties(String value) {
        try {
            ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes());
            this.sqlProperties = new Properties();
            this.sqlProperties.load(is);
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    public void setConnectionRetryAttempts(int value) {
        this.connectionRetryAttempts = value;
    }

    public int getConnectionRetryAttempts() {
        return this.connectionRetryAttempts;
    }

    public int getRecoveryTimeout() {
        return this.recoveryTimeout;
    }

    public void setRecoveryTimeout(int timeout) {
        this.recoveryTimeout = timeout;
    }

    public int getRecoveryRetries() {
        return this.recoveryRetries;
    }

    public void setRecoveryRetries(int retries) {
        this.recoveryRetries = retries;
    }

    public int getRecoverMessagesChunk() {
        return this.recoverMessagesChunk;
    }

    public void setRecoverMessagesChunk(int recoverMessagesChunk) {
        if (recoverMessagesChunk != 0 && recoverMessagesChunk != 1) {
            this.log.warn("Only the values 0 and 1 are currently support for chunk size, using chunk size=1");
            recoverMessagesChunk = 1;
        }
        this.recoverMessagesChunk = recoverMessagesChunk;
    }

    public boolean isXARecovery() {
        return this.xaRecovery;
    }

    public void setXARecovery(boolean xaRecovery) {
        this.xaRecovery = xaRecovery;
    }

    protected class TransactionManagerStrategy {
        Transaction threadTx;

        protected TransactionManagerStrategy() {
        }

        void startTX() throws JMSException {
            try {
                this.threadTx = PersistenceManager.this.tm.suspend();
                PersistenceManager.this.tm.begin();
            }
            catch (Exception e) {
                try {
                    if (this.threadTx != null) {
                        PersistenceManager.this.tm.resume(this.threadTx);
                    }
                }
                catch (Exception exception) {
                    // empty catch block
                }
                throw new SpyJMSException("Could not start a transaction with the transaction manager.", e);
            }
        }

        void setRollbackOnly() throws JMSException {
            try {
                PersistenceManager.this.tm.setRollbackOnly();
            }
            catch (Exception e) {
                throw new SpyJMSException("Could not start a mark the transaction for rollback .", e);
            }
        }

        void endTX() throws JMSException {
            try {
                if (PersistenceManager.this.tm.getStatus() == 1) {
                    PersistenceManager.this.tm.rollback();
                } else {
                    PersistenceManager.this.tm.commit();
                }
            }
            catch (Exception e) {
                throw new SpyJMSException("Could not start a transaction with the transaction manager.", e);
            }
            finally {
                try {
                    if (this.threadTx != null) {
                        PersistenceManager.this.tm.resume(this.threadTx);
                    }
                }
                catch (Exception exception) {}
            }
        }
    }
}

