Skip to content

Commit

Permalink
Prolong connection timeout, make option description clearer.
Browse files Browse the repository at this point in the history
Release 1.6.0.
The prolonged timeout is an attempt to better handle short database downtimes.
  • Loading branch information
khituras committed Sep 28, 2021
1 parent b944b21 commit de46089
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 30 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>costosys</artifactId>
<version>1.6.0-SNAPSHOT</version>
<version>1.6.0</version>
<name>Corpus Storage System</name>
<description>A utility for managing documents stored in a PostgreSQL database. The documents are imported into a
PostgreSQL DB as full texts with the goal to be able to retrieve the documents by their PubMedID efficiently.
Expand Down Expand Up @@ -225,7 +225,7 @@
<parent>
<groupId>de.julielab</groupId>
<artifactId>julielab-parent</artifactId>
<version>2.4.1</version>
<version>2.5.0-SNAPSHOT</version>
</parent>
<organization>
<name>JULIE Lab, Germany</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ private static void addModes(Options options) {
modes.addOption(buildOption("mp", "mark-processed", "Sets the is_processed state of a subset table to true. The -f argument can be used to deliver a file that lists document primary keys, one per line. If such a file is given, only the entries in the file are marked as processed.", "the subset table name"));

modes.addOption(buildOption("q", "query", "Query a table (default: " + Constants.DEFAULT_DATA_TABLE_NAME
+ ") for XMLs. You can enter the primary keys directly or use -f to specify a file. If you define none of both, the whole table will be returned.\n"
+ ") for XMLs. You can enter the primary keys directly or use -f to specify a file. In this case, some dummy query must be specified - just any string - to satisfy the option parser. If you define none of both, the whole table will be returned.\n"
+ "Use -f to provide a file with document IDs to return.\n"
+ "Use -d to display delimiters between the results.\n"
+ "Use -z to specify the target table. If the table is a subset, only documents in this subset will be returned.\n"
+ "Use -l to set a limit of returned documents.\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class CoStoSysConnection implements AutoCloseable {
private final static Logger log = LoggerFactory.getLogger(CoStoSysConnection.class);
Expand Down Expand Up @@ -47,11 +49,17 @@ public synchronized void decreaseUsageCounter() throws SQLException {
}
}


public Connection getConnection() {
return connection;
}

private String getCaller() {
StackWalker walker = StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE);
String walk = walker.walk(frames -> frames
.map(f -> f.getClassName() + "#" + f.getMethodName()).limit(3)
.collect(Collectors.joining(", ")));
return walk;
}

@Override
public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ Connection getConn() {
hikariConfig.setPassword(password);
hikariConfig.setConnectionTestQuery("SELECT TRUE");
hikariConfig.setMaximumPoolSize(dbConfig.getMaxConnections());
hikariConfig.setConnectionTimeout(60000);
hikariConfig.setConnectionTimeout(120000);
// required to be able to get the number of idle connections, see below
hikariConfig.setRegisterMbeans(true);
dataSource = pools.compute(dbURL, (url, source) -> source == null ? new HikariDataSource(hikariConfig) : source);
Expand All @@ -359,7 +359,7 @@ Connection getConn() {
}
}

try {
// try {
int retries = 0;
do {
try {
Expand Down Expand Up @@ -391,7 +391,7 @@ Connection getConn() {
stm.execute(String.format("SET search_path TO %s", dbConfig.getActivePGSchema()));
stm.close();
} catch (SQLException e) {
LOG.warn("SQLException occurred:", e);
// LOG.warn("SQLException occurred:", e);
LOG.warn("Could not obtain a database connection within the timeout for thread {}. Trying again. Number of try: {}", Thread.currentThread().getName(), ++retries);
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
try {
Expand All @@ -415,25 +415,26 @@ Connection getConn() {
LOG.warn("Could not retrieve connection pool statistics: {}. More information can be found on DEBUG level.", t.getMessage());
LOG.debug("Could not retrieve connection pool statistics:", t);
}
if (retries == 3)
throw e;
LOG.warn("Currently active CoSoSysConnections are total: {}, shared: {}", connectionCache.asMap().values().stream().flatMap(Collection::stream).count(), connectionCache.asMap().values().stream().flatMap(Collection::stream).filter(CoStoSysConnection::isShared).count());
// if (retries == 3)
// throw e;
}
} while (conn == null);
if (retries > 0)
LOG.warn("It took {} retries to obtain a connection", retries);
} catch (SQLException e) {
LOG.error("Could not connect with " + dbURL);
throw new UnobtainableConnectionException("No database connection could be obtained from the connection " +
"pool. This can have one of two causes: Firstly, the application might just use all connections " +
"concurrently. Then, a higher number of maximum active database connections in the CoStoSys " +
"configuration might help. This " +
"number is currently set to " + config.getDatabaseConfig().getMaxConnections() + ". The other " +
"possibility are programming errors where connections are retrieved but not closed. Closing " +
"connections means to return them to the pool. It must always be made sure that connections are " +
"closed when they are no longer required. If database iterators are used. i.e. subclasses of " +
"DBCIterator, make sure to fully read the iterators. Otherwise, they might keep a permanent " +
"connection to the database while waiting to be consumed.", e);
}
// } catch (SQLException e) {
// LOG.error("Could not connect with " + dbURL);
// throw new UnobtainableConnectionException("No database connection could be obtained from the connection " +
// "pool. This can have one of two causes: Firstly, the application might just use all connections " +
// "concurrently. Then, a higher number of maximum active database connections in the CoStoSys " +
// "configuration might help. This " +
// "number is currently set to " + config.getDatabaseConfig().getMaxConnections() + ". The other " +
// "possibility are programming errors where connections are retrieved but not closed. Closing " +
// "connections means to return them to the pool. It must always be made sure that connections are " +
// "closed when they are no longer required. If database iterators are used. i.e. subclasses of " +
// "DBCIterator, make sure to fully read the iterators. Otherwise, they might keep a permanent " +
// "connection to the database while waiting to be consumed.", e);
// }
return conn;
}

Expand Down Expand Up @@ -587,8 +588,8 @@ public List<Object[]> retrieveAndMark(String subsetTableName, String schemaName,
// following
// http://dba.stackexchange.com/questions/69471/postgres-update-limit-1
sql = "UPDATE " + subsetTableName + " AS t SET " + Constants.IN_PROCESS + " = TRUE, "
+ Constants.LAST_COMPONENT + " = '" + readerComponent + "', " + Constants.HOST_NAME + " = \'"
+ hostName + "\', " + Constants.PID + " = \'" + pid + "\'," + Constants.PROCESSING_TIMESTAMP
+ Constants.LAST_COMPONENT + " = '" + readerComponent + "', " + Constants.HOST_NAME + " = '"
+ hostName + "', " + Constants.PID + " = '" + pid + "'," + Constants.PROCESSING_TIMESTAMP
+ " = 'now' FROM (SELECT " + fieldConfig.getPrimaryKeyString() + " FROM " + subsetTableName
+ " WHERE " + Constants.IN_PROCESS + " = FALSE AND "
// eigentlich wollen wir anstelle von FOR UPDATE sogar:
Expand Down Expand Up @@ -618,7 +619,7 @@ public List<Object[]> retrieveAndMark(String subsetTableName, String schemaName,
sql, e);
SQLException nextException = e.getNextException();
if (null != nextException)
LOG.error("Next exception: {}", nextException);
LOG.error("Next exception: ", nextException);
// this is not the deadlock error; break the loop
break;
} else {
Expand Down
12 changes: 7 additions & 5 deletions src/main/java/de/julielab/costosys/medline/Updater.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public Updater(HierarchicalConfiguration<ImmutableNode> configuration) {
log.debug("Loaded the following document deleters: {}", documentDeleterLoader.stream().map(d -> d.get().getName()).collect(Collectors.joining(", ")));
}

private static List<File> getUnprocessedMedlineUpdates(File[] updateFiles, DataBaseConnector dbc) {
private static List<File> getUnprocessedMedlineUpdates(File[] updateFiles, DataBaseConnector dbc) throws MedlineUpdateException {
List<File> unprocessedFiles = new ArrayList<>();
try (CoStoSysConnection coStoSysConnection = dbc.obtainOrReserveConnection(true)) {

Expand Down Expand Up @@ -107,14 +107,14 @@ private static List<File> getUnprocessedMedlineUpdates(File[] updateFiles, DataB
unprocessedFiles.add(updateFile);
}
} catch (SQLException e) {
e.printStackTrace();
throw new MedlineUpdateException(e);
}
}
return unprocessedFiles;
}

private static List<String> getPmidsToDelete(File file) {
List<String> pmidsToDelete = new ArrayList<String>();
List<String> pmidsToDelete = new ArrayList<>();
String forEachXpath = "/PubmedArticleSet/DeleteCitation/PMID";
List<Map<String, String>> fields = new ArrayList<Map<String, String>>();
Map<String, String> field = new HashMap<String, String>();
Expand Down Expand Up @@ -232,18 +232,20 @@ private void configureDocumentDeleters() throws DocumentDeletionException {
* @param file
* @param dbc
*/
private void markFileAsImported(File file, DataBaseConnector dbc) {
private void markFileAsImported(File file, DataBaseConnector dbc) throws MedlineUpdateException {
try (CoStoSysConnection coStoSysConnection = dbc.obtainOrReserveConnection(true)) {
Connection conn = coStoSysConnection.getConnection();
String sql = null;
try {
log.debug("Marking update file {} as imported.", file);
sql = String.format(
"UPDATE %s SET %s = TRUE, %s = '" + new Timestamp(System.currentTimeMillis()) + "' WHERE %s = '%s'",
UPDATE_TABLE, COLUMN_IS_IMPORTED, COLUMN_TIMESTAMP, COLUMN_FILENAME, file.getName());
conn.createStatement().execute(sql);
coStoSysConnection.commit();
} catch (SQLException e) {
e.printStackTrace();
log.error("SQL command was: {}", sql);
throw new MedlineUpdateException(e);
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions src/test/java/de/julielab/costosys/Test.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package de.julielab.costosys;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class Test {
@org.testng.annotations.Test
public void test() {
StackWalker walker = StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE);
List<String> walk = walker.walk(frames -> frames
.map(f -> f.getClassName() + "#" + f.getMethodName()).limit(3)
.collect(Collectors.toList()));
System.out.println(walk);
}
}

0 comments on commit de46089

Please sign in to comment.