diff --git a/jdbc-mariadb/src/main/java/org/xbib/jdbc/mariadb/MariaDB.java b/jdbc-mariadb/src/main/java/org/xbib/jdbc/mariadb/MariaDB.java index 5b6e279..01b60c1 100644 --- a/jdbc-mariadb/src/main/java/org/xbib/jdbc/mariadb/MariaDB.java +++ b/jdbc-mariadb/src/main/java/org/xbib/jdbc/mariadb/MariaDB.java @@ -187,6 +187,12 @@ public class MariaDB implements Flavor { return false; } + @Override + public String queueLength(String table) { + return "select count(*) from " + table + + " where channel = ? and state is NULL"; + } + @Override public String writeNextIntoQueue(String table) { return "insert into " + table diff --git a/jdbc-oracle/src/main/java/org/xbib/jdbc/oracle/Oracle.java b/jdbc-oracle/src/main/java/org/xbib/jdbc/oracle/Oracle.java index c714ba5..8a88d2d 100644 --- a/jdbc-oracle/src/main/java/org/xbib/jdbc/oracle/Oracle.java +++ b/jdbc-oracle/src/main/java/org/xbib/jdbc/oracle/Oracle.java @@ -198,6 +198,12 @@ public class Oracle implements Flavor { return false; } + @Override + public String queueLength(String table) { + return "select count(*) from " + table + + " where channel = ? and state is NULL"; + } + @Override public String writeNextIntoQueue(String table) { return "insert into " + table diff --git a/jdbc-postgresql/src/main/java/org/xbib/jdbc/postgresql/Postgresql.java b/jdbc-postgresql/src/main/java/org/xbib/jdbc/postgresql/Postgresql.java index 33ab4b7..2b0bb08 100644 --- a/jdbc-postgresql/src/main/java/org/xbib/jdbc/postgresql/Postgresql.java +++ b/jdbc-postgresql/src/main/java/org/xbib/jdbc/postgresql/Postgresql.java @@ -184,6 +184,12 @@ public class Postgresql implements Flavor { return false; } + @Override + public String queueLength(String table) { + return "select count(*) from " + table + + " where channel = ? and state is NULL"; + } + @Override public String writeNextIntoQueue(String table) { return "insert into " + table @@ -195,7 +201,7 @@ public class Postgresql implements Flavor { public String readNextFromQueue(String table) { return "select id, data from " + table + " where channel = ? and state is NULL and pushed_at <= (CAST (? as TIMESTAMP) - (INTERVAL '1 minute' * delay))" + - " order by id asc limit 1 for update skip lock"; + " order by id asc limit ? for update skip lock"; } @Override diff --git a/jdbc-query/src/main/java/org/xbib/jdbc/query/Database.java b/jdbc-query/src/main/java/org/xbib/jdbc/query/Database.java index c3de0f0..728fc90 100644 --- a/jdbc-query/src/main/java/org/xbib/jdbc/query/Database.java +++ b/jdbc-query/src/main/java/org/xbib/jdbc/query/Database.java @@ -232,7 +232,7 @@ public interface Database extends Supplier { void delete(String statement, Map params); void delete(String statement, List> params); - + /** * Write data into a queue table, select a channel, and consume the returned primary keys. * @@ -245,7 +245,16 @@ public interface Database extends Supplier { void writeQueue(String table, String channel, String data, Consumer consumer) throws SQLException; /** - * Read from a queue table and lock rows in a transaction until processing by a consumer + * Proble queue for length. + * @param table the queue table + * @param channel the queue table channel. A queue table can have many channels + * @return the queue length + * @throws SQLException if statement fails + */ + long probeQueue(String table, String channel) throws SQLException; + + /** + * Read from a queue table until limit is reached and lock rows in a transaction until processing by a consumer * has succeeded or failed. * Commit only if processing was successful, otherwise roll back the transaction. * The connection is switched into manual commit and afterwards, auto-commit is enabled again. @@ -254,5 +263,5 @@ public interface Database extends Supplier { * @param consumer a consumer for the queue table data column or null * @throws SQLException if rollback fails */ - public void readQueue(String table, String channel, Consumer consumer) throws SQLException; + public void consumeQueue(String table, String channel, int limit, Consumer consumer) throws SQLException; } diff --git a/jdbc-query/src/main/java/org/xbib/jdbc/query/DatabaseImpl.java b/jdbc-query/src/main/java/org/xbib/jdbc/query/DatabaseImpl.java index 550cc50..767edd3 100644 --- a/jdbc-query/src/main/java/org/xbib/jdbc/query/DatabaseImpl.java +++ b/jdbc-query/src/main/java/org/xbib/jdbc/query/DatabaseImpl.java @@ -533,29 +533,44 @@ public class DatabaseImpl implements Database { } @Override - public void readQueue(String table, String channel, Consumer consumer) throws SQLException { + public long probeQueue(String table, String channel) throws SQLException { + try (ResultSet resultSet = queueLength(connection, table, channel)) { + if (resultSet.next()) { + return resultSet.getLong(0); + } else { + return -1L; + } + } + } + + @Override + public void consumeQueue(String table, String channel, int limit, Consumer consumer) throws SQLException { + List consumedKeys = new ArrayList<>(); try { connection.setAutoCommit(false); - while (true) { - try (ResultSet resultSet = readNextFromQueue(connection, table, channel)) { - if (resultSet.next()) { - Long key = resultSet.getLong("key"); - try { - if (consumer != null) { - consumer.accept(resultSet.getString("data")); - } - succeedInQueue(connection, table, key); - } catch (Exception e) { - failInQueue(connection, table, key); - throw e; + try (ResultSet resultSet = readNextFromQueue(connection, table, channel, limit)) { + while (resultSet.next()) { + Long key = resultSet.getLong("key"); + consumedKeys.add(key); + try { + if (consumer != null) { + consumer.accept(resultSet.getString("data")); } + succeedInQueue(connection, table, key); + } catch (QueueException e) { + connection.rollback(); + failInQueue(connection, table, key); + throw e; } - } finally { - connection.commit(); } + } finally { + connection.commit(); } } catch (Exception e) { connection.rollback(); + for (Long key : consumedKeys) { + failInQueue(connection, table, key); + } throw e; } finally { connection.setAutoCommit(true); @@ -585,10 +600,17 @@ public class DatabaseImpl implements Database { } } - private ResultSet readNextFromQueue(Connection connection, String table, String channel) throws SQLException { + private ResultSet queueLength(Connection connection, String table, String channel) throws SQLException { + PreparedStatement preparedStatement = connection.prepareStatement(flavor().queueLength(table)); + preparedStatement.setString(1, channel); + return preparedStatement.executeQuery(); + } + + private ResultSet readNextFromQueue(Connection connection, String table, String channel, int limit) throws SQLException { PreparedStatement preparedStatement = connection.prepareStatement(flavor().readNextFromQueue(table)); preparedStatement.setString(1, channel); preparedStatement.setTimestamp(2, Timestamp.valueOf(LocalDateTime.now())); + preparedStatement.setInt(3, limit); return preparedStatement.executeQuery(); } diff --git a/jdbc-query/src/main/java/org/xbib/jdbc/query/Flavor.java b/jdbc-query/src/main/java/org/xbib/jdbc/query/Flavor.java index 244cb7b..df2b073 100644 --- a/jdbc-query/src/main/java/org/xbib/jdbc/query/Flavor.java +++ b/jdbc-query/src/main/java/org/xbib/jdbc/query/Flavor.java @@ -77,11 +77,12 @@ public interface Flavor { /** * In JDBC, all drivers are set to autocommit by default. - * Returns true if we let the JDBC driver exclusively decide about commit(). If false, it allows explicit commit() calls. - * @return + * @return Returns true if we let the JDBC driver exclusively decide about commit(). If false, it allows explicit commit() calls. */ boolean isAutoCommitOnly(); + String queueLength(String table); + String writeNextIntoQueue(String table); String readNextFromQueue(String table); diff --git a/jdbc-query/src/main/java/org/xbib/jdbc/query/QueueException.java b/jdbc-query/src/main/java/org/xbib/jdbc/query/QueueException.java new file mode 100644 index 0000000..708b879 --- /dev/null +++ b/jdbc-query/src/main/java/org/xbib/jdbc/query/QueueException.java @@ -0,0 +1,8 @@ +package org.xbib.jdbc.query; + +public class QueueException extends DatabaseException { + + public QueueException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/jdbc-query/src/main/java/org/xbib/jdbc/query/flavor/Derby.java b/jdbc-query/src/main/java/org/xbib/jdbc/query/flavor/Derby.java index 2eb7d4a..50f05ff 100644 --- a/jdbc-query/src/main/java/org/xbib/jdbc/query/flavor/Derby.java +++ b/jdbc-query/src/main/java/org/xbib/jdbc/query/flavor/Derby.java @@ -180,6 +180,11 @@ public class Derby implements Flavor { return false; } + @Override + public String queueLength(String table) { + throw new UnsupportedOperationException(); + } + @Override public String writeNextIntoQueue(String table) { throw new UnsupportedOperationException(); diff --git a/jdbc-query/src/main/java/org/xbib/jdbc/query/flavor/H2.java b/jdbc-query/src/main/java/org/xbib/jdbc/query/flavor/H2.java index a615e4d..352e74f 100644 --- a/jdbc-query/src/main/java/org/xbib/jdbc/query/flavor/H2.java +++ b/jdbc-query/src/main/java/org/xbib/jdbc/query/flavor/H2.java @@ -180,6 +180,11 @@ public class H2 implements Flavor { return false; } + @Override + public String queueLength(String table) { + throw new UnsupportedOperationException(); + } + @Override public String writeNextIntoQueue(String table) { throw new UnsupportedOperationException(); diff --git a/jdbc-query/src/main/java/org/xbib/jdbc/query/flavor/Hsql.java b/jdbc-query/src/main/java/org/xbib/jdbc/query/flavor/Hsql.java index 6c1b452..e05de29 100644 --- a/jdbc-query/src/main/java/org/xbib/jdbc/query/flavor/Hsql.java +++ b/jdbc-query/src/main/java/org/xbib/jdbc/query/flavor/Hsql.java @@ -180,6 +180,11 @@ public class Hsql implements Flavor { return false; } + @Override + public String queueLength(String table) { + throw new UnsupportedOperationException(); + } + @Override public String writeNextIntoQueue(String table) { throw new UnsupportedOperationException(); diff --git a/jdbc-sqlserver/src/main/java/org/xbib/jdbc/sqlserver/SqlServer.java b/jdbc-sqlserver/src/main/java/org/xbib/jdbc/sqlserver/SqlServer.java index 3d555a8..2d47540 100644 --- a/jdbc-sqlserver/src/main/java/org/xbib/jdbc/sqlserver/SqlServer.java +++ b/jdbc-sqlserver/src/main/java/org/xbib/jdbc/sqlserver/SqlServer.java @@ -187,6 +187,11 @@ public class SqlServer implements Flavor { return false; } + @Override + public String queueLength(String table) { + throw new UnsupportedOperationException(); + } + @Override public String writeNextIntoQueue(String table) { throw new UnsupportedOperationException();