diff --git a/src/main/java/org/opendatakit/aggregate/parser/FormParserForJavaRosa.java b/src/main/java/org/opendatakit/aggregate/parser/FormParserForJavaRosa.java index 3c689afef2..3b5a275a1a 100644 --- a/src/main/java/org/opendatakit/aggregate/parser/FormParserForJavaRosa.java +++ b/src/main/java/org/opendatakit/aggregate/parser/FormParserForJavaRosa.java @@ -552,7 +552,7 @@ private void initHelper(MultiPartFormData uploadedFormItems, MultiPartFormItem x boolean locked = false; while (!locked) { if ((++i) % 10 == 0) { - log.warn("excessive wait count for startup serialization lock. Count: " + i); + log.warn("excessive wait count for form creation lock. Count: " + i); try { Thread.sleep(PersistConsts.MIN_SETTLE_MILLISECONDS); } catch (InterruptedException e) { diff --git a/src/main/java/org/opendatakit/common/persistence/engine/gae/TaskLockImpl.java b/src/main/java/org/opendatakit/common/persistence/engine/gae/TaskLockImpl.java index 2258f09058..4deaee0b4d 100644 --- a/src/main/java/org/opendatakit/common/persistence/engine/gae/TaskLockImpl.java +++ b/src/main/java/org/opendatakit/common/persistence/engine/gae/TaskLockImpl.java @@ -308,6 +308,13 @@ public boolean releaseLock(String lockId, String formId, ITaskLockType taskType) } } } + if ( !result ) { + // if there was contention and the other party hasn't removed its lock + // yet, then our queryForLock() will fail. Call delete, which has + // less restrictive logic than queryForLock(). + deleteLock(lockId, formId, taskType); + System.out.println("releaseLock -- FALLBACK: deleteLock : " + lockId + " " + formId + " " + taskType.getName()); + } return result; } diff --git a/src/main/java/org/opendatakit/common/persistence/engine/mysql/TaskLockImpl.java b/src/main/java/org/opendatakit/common/persistence/engine/mysql/TaskLockImpl.java index 458a860b01..353a84d7e3 100644 --- a/src/main/java/org/opendatakit/common/persistence/engine/mysql/TaskLockImpl.java +++ b/src/main/java/org/opendatakit/common/persistence/engine/mysql/TaskLockImpl.java @@ -16,6 +16,7 @@ package org.opendatakit.common.persistence.engine.mysql; import java.sql.Connection; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; @@ -48,7 +49,7 @@ public class TaskLockImpl implements TaskLock { private static final String PERSISTENCE_LAYER_PROBLEM = "Persistence layer failure"; - + final DatastoreAccessMetrics dam; final DatastoreImpl datastore; final User user; @@ -67,57 +68,45 @@ private TaskLockTable doTransaction(TaskLockTable entity, long l) final List stmts = new ArrayList(); + final String uri = entity.getUri(); StringBuilder b = new StringBuilder(); String tableName = K_BQ + datastore.getDefaultSchemaName() + K_BQ + "." + K_BQ + TaskLockTable.TABLE_NAME + K_BQ; // String tableName= TaskLockTable.TABLE_NAME; - b.append("SET @present = NOW()"); + b.append("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE"); stmts.add(b.toString()); b.setLength(0); - b.append("SET @uriUser = '"); - b.append(StringEscapeUtils.escapeSql(user.getUriUser())); - b.append("'"); + b.append("START TRANSACTION WITH CONSISTENT SNAPSHOT"); stmts.add(b.toString()); b.setLength(0); - b.append("SET @uriLock = '"); - b.append(StringEscapeUtils.escapeSql(entity.getUri())); - b.append("'"); - stmts.add(b.toString()); - b.setLength(0); - b.append("SET @formId = '"); + b.append("SET @present := NOW(),"); + b.append(" @uriUser := '"); + b.append(StringEscapeUtils.escapeSql(user.getUriUser())); + b.append("',"); + b.append(" @uriLock := '"); + b.append(StringEscapeUtils.escapeSql(uri)); + b.append("',"); + b.append(" @formId := '"); b.append(StringEscapeUtils.escapeSql(entity.getFormId())); - b.append("'"); - stmts.add(b.toString()); - b.setLength(0); - b.append("SET @taskType = '"); + b.append("',"); + b.append(" @taskType := '"); b.append(StringEscapeUtils.escapeSql(entity.getTaskType())); - b.append("'"); - stmts.add(b.toString()); - b.setLength(0); - b.append("SET @lifetimeMicroseconds = "); + b.append("',"); + b.append(" @lifetimeMicroseconds := "); b.append(1000L * l); stmts.add(b.toString()); b.setLength(0); b.append("LOCK TABLES "); b.append(tableName); b.append(" WRITE "); - b.append(", "); - b.append(tableName); - b.append(" AS t1 WRITE "); - b.append(", "); - b.append(tableName); - b.append(" AS t3 READ "); - b.append(", "); - b.append(tableName); - b.append(" AS t4 WRITE "); stmts.add(b.toString()); b.setLength(0); dam.recordPutUsage(TaskLockTable.TABLE_NAME); if (!entity.isFromDatabase()) { // insert a new record - b.append("INSERT INTO "); + b.append("REPLACE INTO "); b.append(tableName); b.append(" ("); first = true; @@ -148,7 +137,7 @@ private TaskLockTable doTransaction(TaskLockTable entity, long l) } else if (f.equals(entity.primaryKey)) { b.append("@uriLock"); } else if (f.equals(entity.expirationDateTime)) { - b.append(" TIMESTAMPADD(MICROSECOND, @lifetimeMicroseconds, @present)"); + b.append(" DATE_ADD(CAST(@present AS DATETIME), INTERVAL @lifetimeMicroseconds MICROSECOND)"); } else { throw new IllegalStateException("unexpected case " + f.getName()); } @@ -184,7 +173,7 @@ private TaskLockTable doTransaction(TaskLockTable entity, long l) } else if (f.equals(entity.primaryKey)) { b.append("@uriLock"); } else if (f.equals(entity.expirationDateTime)) { - b.append(" TIMESTAMPADD(MICROSECOND, @lifetimeMicroseconds, @present)"); + b.append(" DATE_ADD(CAST(@present AS DATETIME), INTERVAL @lifetimeMicroseconds MICROSECOND)"); } else { throw new IllegalStateException("unexpected case " + f.getName()); } @@ -200,100 +189,197 @@ private TaskLockTable doTransaction(TaskLockTable entity, long l) } // delete stale locks (don't care who's) dam.recordDeleteUsage(TaskLockTable.TABLE_NAME); - b.append("DELETE t1 FROM "); + b.append("DELETE FROM "); b.append(tableName); - b.append(" AS t1 WHERE t1."); + b.append(" WHERE "); b.append(K_BQ); b.append(entity.expirationDateTime.getName()); b.append(K_BQ); - b.append(" <= @present"); + b.append(" <= CAST(@present AS DATETIME)"); stmts.add(b.toString()); b.setLength(0); // determine the time of the oldest lock for this resource and task type... + // do this by querying for the minimum expiration time + // BUT, first, set the minimum time to the present time in case there are no locks. b.append("SET @minExpiration = @present"); stmts.add(b.toString()); b.setLength(0); - dam.recordQueryUsage(TaskLockTable.TABLE_NAME, 1); - b.append("SELECT @minExpiration:=MIN(t3."); + b.append("SELECT @minExpiration := MIN("); b.append(K_BQ); b.append(entity.expirationDateTime.getName()); b.append(K_BQ); b.append(") FROM "); b.append(tableName); - b.append(" AS t3 WHERE t3."); + b.append(" WHERE "); b.append(K_BQ); b.append(entity.formId.getName()); b.append(K_BQ); - b.append(" = @formId AND t3."); + b.append(" = @formId AND "); b.append(K_BQ); b.append(entity.taskType.getName()); b.append(K_BQ); b.append(" = @taskType"); stmts.add(b.toString()); b.setLength(0); + // determine if there are two or more records matching + // the minimum expiration time. If there are, we must + // release the locks and retry, as we cannot determine + // which one is first. + b.append("SET @uriCount = 0"); + stmts.add(b.toString()); + b.setLength(0); + b.append("SELECT @uriCount := COUNT("); + b.append(K_BQ); + b.append(entity.primaryKey.getName()); + b.append(K_BQ); + b.append(") FROM "); + b.append(tableName); + b.append(" WHERE "); + + b.append(K_BQ); + b.append(entity.formId.getName()); + b.append(K_BQ); + b.append(" = @formId AND "); + + b.append(K_BQ); + b.append(entity.taskType.getName()); + b.append(K_BQ); + b.append(" = @taskType AND "); + + b.append(K_BQ); + b.append(entity.expirationDateTime.getName()); + b.append(K_BQ); + b.append(" = CAST(@minExpiration AS DATETIME)"); + stmts.add(b.toString()); + b.setLength(0); // delete all locks except the oldest one for this resource and task type... + // or, if we have two or more old locks, release ours // whatever lock exists identifies the owner of the resource. dam.recordDeleteUsage(TaskLockTable.TABLE_NAME); - b.append("DELETE t4 FROM "); + b.append("DELETE FROM "); b.append(tableName); - b.append(" AS t4 WHERE t4."); + b.append(" WHERE "); b.append(K_BQ); b.append(entity.formId.getName()); b.append(K_BQ); - b.append(" = @formId AND t4."); + b.append(" = @formId AND "); b.append(K_BQ); b.append(entity.taskType.getName()); b.append(K_BQ); - b.append(" = @taskType AND TIMESTAMPDIFF(MICROSECOND, t4."); + b.append(" = @taskType AND ( "); b.append(K_BQ); b.append(entity.expirationDateTime.getName()); b.append(K_BQ); - b.append(",@minExpiration) > 0"); + b.append(" > CAST(@minExpiration AS DATETIME)"); + b.append(" OR ( @uriCount > 1 AND "); + b.append(K_BQ); + b.append(entity.expirationDateTime.getName()); + b.append(K_BQ); + b.append(" = CAST(@minExpiration AS DATETIME)"); + b.append(" AND "); + b.append(K_BQ); + b.append(entity.primaryKey.getName()); + b.append(K_BQ); + b.append(" = @uriLock ) )"); stmts.add(b.toString()); b.setLength(0); + // and within the transaction, see if the primary key of the winning lock is ours... + b.append("SELECT "); + b.append(K_BQ); + b.append(entity.primaryKey.getName()); + b.append(K_BQ); + b.append(" FROM "); + b.append(tableName); + b.append(" WHERE "); + b.append(K_BQ); + b.append(entity.formId.getName()); + b.append(K_BQ); + b.append(" = @formId AND "); + b.append(K_BQ); + b.append(entity.taskType.getName()); + b.append(K_BQ); + b.append(" = @taskType AND "); + b.append(K_BQ); + b.append(entity.expirationDateTime.getName()); + b.append(K_BQ); + b.append(" = CAST(@minExpiration AS DATETIME)"); + stmts.add(b.toString()); + b.setLength(0); + b.append("UNLOCK TABLES"); + stmts.add(b.toString()); + b.setLength(0); + b.append("COMMIT"); + stmts.add(b.toString()); + b.setLength(0); + TaskLockTable relation; + try { + relation = TaskLockTable.assertRelation(datastore, user); + } catch (ODKDatastoreException e) { + throw new ODKTaskLockException(PERSISTENCE_LAYER_PROBLEM, e); + } + + boolean success = false; try { JdbcTemplate jdbc = datastore.getJdbcConnection(); - jdbc.execute(new ConnectionCallback() { + Object o = jdbc.execute(new ConnectionCallback() { @Override public Object doInConnection(Connection conn) throws SQLException, DataAccessException { + boolean success = false; boolean oldValue = conn.getAutoCommit(); try { conn.setAutoCommit(false); conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); - Statement stmt = conn.createStatement(); + String lastResult = null; for (String s : stmts) { - // for debugging: LogFactory.getLog(TaskLockImpl.class).info(s); - stmt.execute(s); + Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.CLOSE_CURSORS_AT_COMMIT); + if ( s.startsWith("SELECT") ) { + ResultSet rs = stmt.executeQuery(s); + if ( rs.first() ) { + lastResult = rs.getString(1); + } + rs.close(); + } else { + stmt.executeUpdate(s); + } + stmt.close(); } conn.commit(); + success = uri.equals(lastResult); } catch (Exception e) { e.printStackTrace(); conn.rollback(); } finally { - Statement stmt = conn.createStatement(); - LogFactory.getLog(TaskLockImpl.class).info("UNLOCK TABLES"); - stmt.execute("UNLOCK TABLES"); - conn.commit(); + if ( !success ) { + Statement stmt = conn.createStatement(); + LogFactory.getLog(TaskLockImpl.class).info("UNLOCK TABLES"); + stmt.execute("UNLOCK TABLES"); + conn.commit(); + } } conn.setAutoCommit(oldValue); - return null; + return success ? uri : null; } }); + + success = o != null && uri.equals((String) o); - relation = TaskLockTable.assertRelation(datastore, user); } catch (Exception e) { throw new ODKTaskLockException(PERSISTENCE_LAYER_PROBLEM, e); } - return (TaskLockTable) datastore.getEntity(relation, entity.getUri(), user); + if ( success ) { + return (TaskLockTable) datastore.getEntity(relation, uri, user); + } else { + throw new ODKEntityNotFoundException(); + } } @Override diff --git a/src/main/java/org/opendatakit/common/persistence/engine/pgres/TaskLockImpl.java b/src/main/java/org/opendatakit/common/persistence/engine/pgres/TaskLockImpl.java index 402c490cd1..480aad270c 100644 --- a/src/main/java/org/opendatakit/common/persistence/engine/pgres/TaskLockImpl.java +++ b/src/main/java/org/opendatakit/common/persistence/engine/pgres/TaskLockImpl.java @@ -66,7 +66,9 @@ private TaskLockTable doTransaction(TaskLockTable entity, long l) boolean first; final List stmts = new ArrayList(); - + + String uri = entity.getUri(); + StringBuilder b = new StringBuilder(); String tableName = K_BQ + datastore.getDefaultSchemaName() + K_BQ + "." + K_BQ + TaskLockTable.TABLE_NAME + K_BQ; @@ -74,7 +76,7 @@ private TaskLockTable doTransaction(TaskLockTable entity, long l) b.append("'").append(StringEscapeUtils.escapeSql(user.getUriUser())).append("'"); String uriUserInline = b.toString(); b.setLength(0); - b.append("'").append(StringEscapeUtils.escapeSql(entity.getUri())).append("'"); + b.append("'").append(StringEscapeUtils.escapeSql(uri)).append("'"); String uriLockInline = b.toString(); b.setLength(0); b.append("'").append(StringEscapeUtils.escapeSql(entity.getFormId())).append("'"); @@ -187,8 +189,8 @@ private TaskLockTable doTransaction(TaskLockTable entity, long l) // task type dam.recordDeleteUsage(TaskLockTable.TABLE_NAME); b.append("DELETE FROM ").append(tableName).append(" WHERE "); - b.append(K_BQ).append(entity.formId.getName()).append(K_BQ).append(" = ").append(formIdInline) - .append(" AND "); + b.append(K_BQ).append(entity.formId.getName()).append(K_BQ).append(" = ") + .append(formIdInline).append(" AND "); b.append(K_BQ).append(entity.taskType.getName()).append(K_BQ).append(" = ") .append(taskTypeInline).append(" AND "); b.append(K_BQ).append(entity.expirationDateTime.getName()).append(K_BQ); @@ -201,6 +203,24 @@ private TaskLockTable doTransaction(TaskLockTable entity, long l) .append(taskTypeInline).append(")"); stmts.add(b.toString()); b.setLength(0); + // delete our entry if it collides with another entry with exactly + // this time. + b.append("DELETE FROM ").append(tableName).append(" WHERE "); + b.append(K_BQ).append(entity.formId.getName()).append(K_BQ).append(" = ") + .append(formIdInline).append(" AND "); + b.append(K_BQ).append(entity.taskType.getName()).append(K_BQ).append(" = ") + .append(taskTypeInline).append(" AND "); + b.append(K_BQ).append(entity.primaryKey.getName()).append(K_BQ).append(" = ") + .append(uriLockInline).append(" AND "); + b.append("1 < (SELECT COUNT(t3.").append(K_BQ).append(entity.expirationDateTime.getName()) + .append(K_BQ); + b.append(") FROM ").append(tableName).append(" AS t3 WHERE t3."); + b.append(K_BQ).append(entity.formId.getName()).append(K_BQ).append(" = ").append(formIdInline) + .append(" AND t3."); + b.append(K_BQ).append(entity.taskType.getName()).append(K_BQ).append(" = ") + .append(taskTypeInline).append(")"); + stmts.add(b.toString()); + b.setLength(0); // assert: only the lock that holds the resource for that task type appears // in the task lock table TaskLockTable relation; diff --git a/src/test/java/org/opendatakit/aggregate/task/testing/gae/WatchdogImpl.java b/src/test/java/org/opendatakit/aggregate/task/testing/gae/WatchdogImpl.java index 4b1a1244de..c27d23e428 100644 --- a/src/test/java/org/opendatakit/aggregate/task/testing/gae/WatchdogImpl.java +++ b/src/test/java/org/opendatakit/aggregate/task/testing/gae/WatchdogImpl.java @@ -254,7 +254,7 @@ public void setJsonFileGenerator(JsonFileGenerator jsonFileGenerator) { this.jsonFileGenerator = jsonFileGenerator; } - public JsonFileGenerator setJsonFileGenerator() { + public JsonFileGenerator getJsonFileGenerator() { return jsonFileGenerator; } diff --git a/src/test/java/org/opendatakit/common/persistence/TaskLockTest.java b/src/test/java/org/opendatakit/common/persistence/TaskLockTest.java new file mode 100644 index 0000000000..4c6e146e6e --- /dev/null +++ b/src/test/java/org/opendatakit/common/persistence/TaskLockTest.java @@ -0,0 +1,215 @@ +/** + * Copyright (C) 2012 University of Washington + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.opendatakit.common.persistence; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.opendatakit.aggregate.constants.TaskLockType; +import org.opendatakit.common.persistence.exception.ODKDatastoreException; +import org.opendatakit.common.persistence.exception.ODKTaskLockException; +import org.opendatakit.common.security.User; +import org.opendatakit.common.testing.ICommonTestSetup; +import org.opendatakit.common.web.CallingContext; +import org.opendatakit.common.web.TestContextFactory; + +/** + * Tests the task lock mechanism and reports its performance statistics. + * + * @author mitchellsundt@gmail.com + * + */ +@RunWith(org.junit.runners.JUnit4.class) +public class TaskLockTest { + + static AtomicBoolean inside = new AtomicBoolean(false); + + @Ignore + static class TaskLockThread extends Thread { + CyclicBarrier launchBarrier; + CallingContext cc; + + static int ENTRY_ATTEMPTS = 30; + + boolean failed = false; + int entryCount = 0; + int declinedEntryCount = 0; + int lockFailedCount = 0; + + TaskLockThread(CyclicBarrier launchBarrier, CallingContext cc) { + this.launchBarrier = launchBarrier; + this.cc = cc; + } + + @Override + public void run() { + try { + Object o = cc.getBean("testing"); + if ( o != null ) { + ICommonTestSetup i = (ICommonTestSetup) o; + i.setup(); + } + } catch ( Exception ex ) { + System.out.println("GAE setup exception " + ex.toString()); + } + + try { + launchBarrier.await(); + } catch (Exception e) { + failed = true; + System.out.println("Premature exception from barrier " + e.toString()); + } + try { + if (!failed) { + + for (int j = 0; j < ENTRY_ATTEMPTS; ++j) { + Thread.sleep(0); + System.out.println("Entry Attempt " + j + " Thread " + getId()); + // gain single-access lock record in database... + String lockedResourceName = "TASK_LOCK_TESTING"; + String creationLockId = UUID.randomUUID().toString(); + Datastore ds = cc.getDatastore(); + User user = cc.getCurrentUser(); + + int i = 0; + boolean locked = false; + while (!locked) { + if ((++i) % 10 == 0) { + System.out.println("excessive wait count for startup serialization lock. Count: " + + i); + try { + Thread.sleep(PersistConsts.MIN_SETTLE_MILLISECONDS); + } catch (InterruptedException e) { + // we remain in the loop even if we get kicked out. + } + } else if (i != 1) { + try { + Thread.sleep(PersistConsts.MIN_SETTLE_MILLISECONDS); + } catch (InterruptedException e) { + // we remain in the loop even if we get kicked out. + } + } + try { + TaskLock formCreationTaskLock = ds.createTaskLock(user); + if (formCreationTaskLock.obtainLock(creationLockId, lockedResourceName, + TaskLockType.CREATE_FORM)) { + locked = true; + } + formCreationTaskLock = null; + } catch (ODKTaskLockException e) { + e.printStackTrace(); + } + } + declinedEntryCount += i; + System.out.println("Entered " + j + " Thread " + getId()); + + // we hold the lock while we toggle inside value here... + try { + if (!locked) { + lockFailedCount++; + } else { + if (inside.get()) { + System.out.println("Thread " + this.getId() + " finds inside true!"); + failed = true; + } + inside.set(true); + Thread.sleep(0); // give some other thread a spin... + inside.set(false); + ++entryCount; + System.out.println("Thread " + this.getId() + " inside " + entryCount); + } + } finally { + // release the form creation serialization lock + try { + for (i = 0; i < 10; i++) { + TaskLock formCreationTaskLock = ds.createTaskLock(user); + if (formCreationTaskLock.releaseLock(creationLockId, lockedResourceName, + TaskLockType.CREATE_FORM)) { + break; + } + formCreationTaskLock = null; + try { + Thread.sleep(PersistConsts.MIN_SETTLE_MILLISECONDS); + } catch (InterruptedException e) { + // just move on, this retry mechanism + // is to make things nice + } + } + } catch (ODKTaskLockException e) { + e.printStackTrace(); + } + } + + } + } + } catch (Exception e) { + failed = true; + System.out.println("FAILED Entry Attempt " + e.toString() + " Thread " + getId()); + } + } + } + + @Test + public void verifyLock() throws ODKDatastoreException { + + int MAX_THREADS = 8; + CyclicBarrier launchBarrier = new CyclicBarrier(MAX_THREADS); + CallingContext cc = TestContextFactory.getCallingContext(); + + List lockTesters = new ArrayList(); + for (int i = 0; i < MAX_THREADS; ++i) { + TaskLockThread t = new TaskLockThread(launchBarrier, cc); + t.start(); + lockTesters.add(t); + } + + boolean failure = false; + for (int i = 0; i < MAX_THREADS; ++i) { + TaskLockThread t = lockTesters.get(i); + try { + t.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + failure = true; + } + } + + int entryTally = 0; + int declinedEntryTally = 0; + + for (int i = 0; i < MAX_THREADS; ++i) { + TaskLockThread t = lockTesters.get(i); + if (t.failed) { + System.out.println("FAILED Thread " + t.getId()); + failure = true; + } + entryTally += t.entryCount; + declinedEntryTally += t.declinedEntryCount; + } + + System.out.println("entryCount " + entryTally + " of " + TaskLockThread.ENTRY_ATTEMPTS + * MAX_THREADS); + System.out.println("declinedEntryCount " + declinedEntryTally); + + assertEquals(failure, false); + } +}