add queue length operation

This commit is contained in:
Jörg Prante 2023-07-26 17:04:55 +02:00
parent 26bbb56b35
commit b84de6995f
11 changed files with 100 additions and 22 deletions

View file

@ -187,6 +187,12 @@ public class MariaDB implements Flavor {
return false; return false;
} }
@Override
public String queueLength(String table) {
return "select count(*) from " + table
+ " where channel = ? and state is NULL";
}
@Override @Override
public String writeNextIntoQueue(String table) { public String writeNextIntoQueue(String table) {
return "insert into " + table return "insert into " + table

View file

@ -198,6 +198,12 @@ public class Oracle implements Flavor {
return false; return false;
} }
@Override
public String queueLength(String table) {
return "select count(*) from " + table
+ " where channel = ? and state is NULL";
}
@Override @Override
public String writeNextIntoQueue(String table) { public String writeNextIntoQueue(String table) {
return "insert into " + table return "insert into " + table

View file

@ -184,6 +184,12 @@ public class Postgresql implements Flavor {
return false; return false;
} }
@Override
public String queueLength(String table) {
return "select count(*) from " + table
+ " where channel = ? and state is NULL";
}
@Override @Override
public String writeNextIntoQueue(String table) { public String writeNextIntoQueue(String table) {
return "insert into " + table return "insert into " + table
@ -195,7 +201,7 @@ public class Postgresql implements Flavor {
public String readNextFromQueue(String table) { public String readNextFromQueue(String table) {
return "select id, data from " + table + return "select id, data from " + table +
" where channel = ? and state is NULL and pushed_at <= (CAST (? as TIMESTAMP) - (INTERVAL '1 minute' * delay))" + " where channel = ? and state is NULL and pushed_at <= (CAST (? as TIMESTAMP) - (INTERVAL '1 minute' * delay))" +
" order by id asc limit 1 for update skip lock"; " order by id asc limit ? for update skip lock";
} }
@Override @Override

View file

@ -232,7 +232,7 @@ public interface Database extends Supplier<Database> {
void delete(String statement, Map<String, Object> params); void delete(String statement, Map<String, Object> params);
void delete(String statement, List<Map<String, Object>> params); void delete(String statement, List<Map<String, Object>> params);
/** /**
* Write data into a queue table, select a channel, and consume the returned primary keys. * Write data into a queue table, select a channel, and consume the returned primary keys.
* *
@ -245,7 +245,16 @@ public interface Database extends Supplier<Database> {
void writeQueue(String table, String channel, String data, Consumer<Long> consumer) throws SQLException; void writeQueue(String table, String channel, String data, Consumer<Long> consumer) throws SQLException;
/** /**
* Read from a queue table and lock rows in a transaction until processing by a consumer * Proble queue for length.
* @param table the queue table
* @param channel the queue table channel. A queue table can have many channels
* @return the queue length
* @throws SQLException if statement fails
*/
long probeQueue(String table, String channel) throws SQLException;
/**
* Read from a queue table until limit is reached and lock rows in a transaction until processing by a consumer
* has succeeded or failed. * has succeeded or failed.
* Commit only if processing was successful, otherwise roll back the transaction. * Commit only if processing was successful, otherwise roll back the transaction.
* The connection is switched into manual commit and afterwards, auto-commit is enabled again. * The connection is switched into manual commit and afterwards, auto-commit is enabled again.
@ -254,5 +263,5 @@ public interface Database extends Supplier<Database> {
* @param consumer a consumer for the queue table data column or null * @param consumer a consumer for the queue table data column or null
* @throws SQLException if rollback fails * @throws SQLException if rollback fails
*/ */
public void readQueue(String table, String channel, Consumer<String> consumer) throws SQLException; public void consumeQueue(String table, String channel, int limit, Consumer<String> consumer) throws SQLException;
} }

View file

@ -533,29 +533,44 @@ public class DatabaseImpl implements Database {
} }
@Override @Override
public void readQueue(String table, String channel, Consumer<String> consumer) throws SQLException { public long probeQueue(String table, String channel) throws SQLException {
try (ResultSet resultSet = queueLength(connection, table, channel)) {
if (resultSet.next()) {
return resultSet.getLong(0);
} else {
return -1L;
}
}
}
@Override
public void consumeQueue(String table, String channel, int limit, Consumer<String> consumer) throws SQLException {
List<Long> consumedKeys = new ArrayList<>();
try { try {
connection.setAutoCommit(false); connection.setAutoCommit(false);
while (true) { try (ResultSet resultSet = readNextFromQueue(connection, table, channel, limit)) {
try (ResultSet resultSet = readNextFromQueue(connection, table, channel)) { while (resultSet.next()) {
if (resultSet.next()) { Long key = resultSet.getLong("key");
Long key = resultSet.getLong("key"); consumedKeys.add(key);
try { try {
if (consumer != null) { if (consumer != null) {
consumer.accept(resultSet.getString("data")); consumer.accept(resultSet.getString("data"));
}
succeedInQueue(connection, table, key);
} catch (Exception e) {
failInQueue(connection, table, key);
throw e;
} }
succeedInQueue(connection, table, key);
} catch (QueueException e) {
connection.rollback();
failInQueue(connection, table, key);
throw e;
} }
} finally {
connection.commit();
} }
} finally {
connection.commit();
} }
} catch (Exception e) { } catch (Exception e) {
connection.rollback(); connection.rollback();
for (Long key : consumedKeys) {
failInQueue(connection, table, key);
}
throw e; throw e;
} finally { } finally {
connection.setAutoCommit(true); connection.setAutoCommit(true);
@ -585,10 +600,17 @@ public class DatabaseImpl implements Database {
} }
} }
private ResultSet readNextFromQueue(Connection connection, String table, String channel) throws SQLException { private ResultSet queueLength(Connection connection, String table, String channel) throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement(flavor().queueLength(table));
preparedStatement.setString(1, channel);
return preparedStatement.executeQuery();
}
private ResultSet readNextFromQueue(Connection connection, String table, String channel, int limit) throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement(flavor().readNextFromQueue(table)); PreparedStatement preparedStatement = connection.prepareStatement(flavor().readNextFromQueue(table));
preparedStatement.setString(1, channel); preparedStatement.setString(1, channel);
preparedStatement.setTimestamp(2, Timestamp.valueOf(LocalDateTime.now())); preparedStatement.setTimestamp(2, Timestamp.valueOf(LocalDateTime.now()));
preparedStatement.setInt(3, limit);
return preparedStatement.executeQuery(); return preparedStatement.executeQuery();
} }

View file

@ -77,11 +77,12 @@ public interface Flavor {
/** /**
* In JDBC, all drivers are set to autocommit by default. * In JDBC, all drivers are set to autocommit by default.
* Returns true if we let the JDBC driver exclusively decide about commit(). If false, it allows explicit commit() calls. * @return Returns true if we let the JDBC driver exclusively decide about commit(). If false, it allows explicit commit() calls.
* @return
*/ */
boolean isAutoCommitOnly(); boolean isAutoCommitOnly();
String queueLength(String table);
String writeNextIntoQueue(String table); String writeNextIntoQueue(String table);
String readNextFromQueue(String table); String readNextFromQueue(String table);

View file

@ -0,0 +1,8 @@
package org.xbib.jdbc.query;
public class QueueException extends DatabaseException {
public QueueException(String message, Throwable cause) {
super(message, cause);
}
}

View file

@ -180,6 +180,11 @@ public class Derby implements Flavor {
return false; return false;
} }
@Override
public String queueLength(String table) {
throw new UnsupportedOperationException();
}
@Override @Override
public String writeNextIntoQueue(String table) { public String writeNextIntoQueue(String table) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();

View file

@ -180,6 +180,11 @@ public class H2 implements Flavor {
return false; return false;
} }
@Override
public String queueLength(String table) {
throw new UnsupportedOperationException();
}
@Override @Override
public String writeNextIntoQueue(String table) { public String writeNextIntoQueue(String table) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();

View file

@ -180,6 +180,11 @@ public class Hsql implements Flavor {
return false; return false;
} }
@Override
public String queueLength(String table) {
throw new UnsupportedOperationException();
}
@Override @Override
public String writeNextIntoQueue(String table) { public String writeNextIntoQueue(String table) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();

View file

@ -187,6 +187,11 @@ public class SqlServer implements Flavor {
return false; return false;
} }
@Override
public String queueLength(String table) {
throw new UnsupportedOperationException();
}
@Override @Override
public String writeNextIntoQueue(String table) { public String writeNextIntoQueue(String table) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();