Skip to content
This repository has been archived by the owner on Feb 12, 2023. It is now read-only.

Commit

Permalink
Merge with uiexperiment
Browse files Browse the repository at this point in the history
  • Loading branch information
mitchellsundt committed Jun 4, 2012
2 parents f1df102 + 96fa189 commit cdf6057
Show file tree
Hide file tree
Showing 6 changed files with 388 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ private void initHelper(MultiPartFormData uploadedFormItems, MultiPartFormItem x
boolean locked = false;
while (!locked) {
if ((++i) % 10 == 0) {
log.warn("excessive wait count for startup serialization lock. Count: " + i);
log.warn("excessive wait count for form creation lock. Count: " + i);
try {
Thread.sleep(PersistConsts.MIN_SETTLE_MILLISECONDS);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,13 @@ public boolean releaseLock(String lockId, String formId, ITaskLockType taskType)
}
}
}
if ( !result ) {
// if there was contention and the other party hasn't removed its lock
// yet, then our queryForLock() will fail. Call delete, which has
// less restrictive logic than queryForLock().
deleteLock(lockId, formId, taskType);
System.out.println("releaseLock -- FALLBACK: deleteLock : " + lockId + " " + formId + " " + taskType.getName());
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.opendatakit.common.persistence.engine.mysql;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
Expand Down Expand Up @@ -48,7 +49,7 @@
public class TaskLockImpl implements TaskLock {

private static final String PERSISTENCE_LAYER_PROBLEM = "Persistence layer failure";

final DatastoreAccessMetrics dam;
final DatastoreImpl datastore;
final User user;
Expand All @@ -67,57 +68,45 @@ private TaskLockTable doTransaction(TaskLockTable entity, long l)

final List<String> stmts = new ArrayList<String>();

final String uri = entity.getUri();
StringBuilder b = new StringBuilder();
String tableName = K_BQ + datastore.getDefaultSchemaName() + K_BQ + "." + K_BQ
+ TaskLockTable.TABLE_NAME + K_BQ;
// String tableName= TaskLockTable.TABLE_NAME;

b.append("SET @present = NOW()");
b.append("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE");
stmts.add(b.toString());
b.setLength(0);
b.append("SET @uriUser = '");
b.append(StringEscapeUtils.escapeSql(user.getUriUser()));
b.append("'");
b.append("START TRANSACTION WITH CONSISTENT SNAPSHOT");
stmts.add(b.toString());
b.setLength(0);
b.append("SET @uriLock = '");
b.append(StringEscapeUtils.escapeSql(entity.getUri()));
b.append("'");
stmts.add(b.toString());
b.setLength(0);
b.append("SET @formId = '");
b.append("SET @present := NOW(),");
b.append(" @uriUser := '");
b.append(StringEscapeUtils.escapeSql(user.getUriUser()));
b.append("',");
b.append(" @uriLock := '");
b.append(StringEscapeUtils.escapeSql(uri));
b.append("',");
b.append(" @formId := '");
b.append(StringEscapeUtils.escapeSql(entity.getFormId()));
b.append("'");
stmts.add(b.toString());
b.setLength(0);
b.append("SET @taskType = '");
b.append("',");
b.append(" @taskType := '");
b.append(StringEscapeUtils.escapeSql(entity.getTaskType()));
b.append("'");
stmts.add(b.toString());
b.setLength(0);
b.append("SET @lifetimeMicroseconds = ");
b.append("',");
b.append(" @lifetimeMicroseconds := ");
b.append(1000L * l);
stmts.add(b.toString());
b.setLength(0);
b.append("LOCK TABLES ");
b.append(tableName);
b.append(" WRITE ");
b.append(", ");
b.append(tableName);
b.append(" AS t1 WRITE ");
b.append(", ");
b.append(tableName);
b.append(" AS t3 READ ");
b.append(", ");
b.append(tableName);
b.append(" AS t4 WRITE ");
stmts.add(b.toString());
b.setLength(0);

dam.recordPutUsage(TaskLockTable.TABLE_NAME);
if (!entity.isFromDatabase()) {
// insert a new record
b.append("INSERT INTO ");
b.append("REPLACE INTO ");
b.append(tableName);
b.append(" (");
first = true;
Expand Down Expand Up @@ -148,7 +137,7 @@ private TaskLockTable doTransaction(TaskLockTable entity, long l)
} else if (f.equals(entity.primaryKey)) {
b.append("@uriLock");
} else if (f.equals(entity.expirationDateTime)) {
b.append(" TIMESTAMPADD(MICROSECOND, @lifetimeMicroseconds, @present)");
b.append(" DATE_ADD(CAST(@present AS DATETIME), INTERVAL @lifetimeMicroseconds MICROSECOND)");
} else {
throw new IllegalStateException("unexpected case " + f.getName());
}
Expand Down Expand Up @@ -184,7 +173,7 @@ private TaskLockTable doTransaction(TaskLockTable entity, long l)
} else if (f.equals(entity.primaryKey)) {
b.append("@uriLock");
} else if (f.equals(entity.expirationDateTime)) {
b.append(" TIMESTAMPADD(MICROSECOND, @lifetimeMicroseconds, @present)");
b.append(" DATE_ADD(CAST(@present AS DATETIME), INTERVAL @lifetimeMicroseconds MICROSECOND)");
} else {
throw new IllegalStateException("unexpected case " + f.getName());
}
Expand All @@ -200,100 +189,197 @@ private TaskLockTable doTransaction(TaskLockTable entity, long l)
}
// delete stale locks (don't care who's)
dam.recordDeleteUsage(TaskLockTable.TABLE_NAME);
b.append("DELETE t1 FROM ");
b.append("DELETE FROM ");
b.append(tableName);
b.append(" AS t1 WHERE t1.");
b.append(" WHERE ");
b.append(K_BQ);
b.append(entity.expirationDateTime.getName());
b.append(K_BQ);
b.append(" <= @present");
b.append(" <= CAST(@present AS DATETIME)");
stmts.add(b.toString());
b.setLength(0);
// determine the time of the oldest lock for this resource and task type...
// do this by querying for the minimum expiration time
// BUT, first, set the minimum time to the present time in case there are no locks.
b.append("SET @minExpiration = @present");
stmts.add(b.toString());
b.setLength(0);

dam.recordQueryUsage(TaskLockTable.TABLE_NAME, 1);
b.append("SELECT @minExpiration:=MIN(t3.");
b.append("SELECT @minExpiration := MIN(");
b.append(K_BQ);
b.append(entity.expirationDateTime.getName());
b.append(K_BQ);
b.append(") FROM ");
b.append(tableName);
b.append(" AS t3 WHERE t3.");
b.append(" WHERE ");
b.append(K_BQ);
b.append(entity.formId.getName());
b.append(K_BQ);
b.append(" = @formId AND t3.");
b.append(" = @formId AND ");
b.append(K_BQ);
b.append(entity.taskType.getName());
b.append(K_BQ);
b.append(" = @taskType");
stmts.add(b.toString());
b.setLength(0);
// determine if there are two or more records matching
// the minimum expiration time. If there are, we must
// release the locks and retry, as we cannot determine
// which one is first.
b.append("SET @uriCount = 0");
stmts.add(b.toString());
b.setLength(0);
b.append("SELECT @uriCount := COUNT(");
b.append(K_BQ);
b.append(entity.primaryKey.getName());
b.append(K_BQ);
b.append(") FROM ");
b.append(tableName);
b.append(" WHERE ");

b.append(K_BQ);
b.append(entity.formId.getName());
b.append(K_BQ);
b.append(" = @formId AND ");

b.append(K_BQ);
b.append(entity.taskType.getName());
b.append(K_BQ);
b.append(" = @taskType AND ");

b.append(K_BQ);
b.append(entity.expirationDateTime.getName());
b.append(K_BQ);
b.append(" = CAST(@minExpiration AS DATETIME)");
stmts.add(b.toString());
b.setLength(0);
// delete all locks except the oldest one for this resource and task type...
// or, if we have two or more old locks, release ours
// whatever lock exists identifies the owner of the resource.
dam.recordDeleteUsage(TaskLockTable.TABLE_NAME);
b.append("DELETE t4 FROM ");
b.append("DELETE FROM ");
b.append(tableName);
b.append(" AS t4 WHERE t4.");
b.append(" WHERE ");

b.append(K_BQ);
b.append(entity.formId.getName());
b.append(K_BQ);
b.append(" = @formId AND t4.");
b.append(" = @formId AND ");

b.append(K_BQ);
b.append(entity.taskType.getName());
b.append(K_BQ);
b.append(" = @taskType AND TIMESTAMPDIFF(MICROSECOND, t4.");
b.append(" = @taskType AND ( ");

b.append(K_BQ);
b.append(entity.expirationDateTime.getName());
b.append(K_BQ);
b.append(",@minExpiration) > 0");
b.append(" > CAST(@minExpiration AS DATETIME)");
b.append(" OR ( @uriCount > 1 AND ");
b.append(K_BQ);
b.append(entity.expirationDateTime.getName());
b.append(K_BQ);
b.append(" = CAST(@minExpiration AS DATETIME)");
b.append(" AND ");
b.append(K_BQ);
b.append(entity.primaryKey.getName());
b.append(K_BQ);
b.append(" = @uriLock ) )");
stmts.add(b.toString());
b.setLength(0);
// and within the transaction, see if the primary key of the winning lock is ours...
b.append("SELECT ");
b.append(K_BQ);
b.append(entity.primaryKey.getName());
b.append(K_BQ);
b.append(" FROM ");
b.append(tableName);
b.append(" WHERE ");
b.append(K_BQ);
b.append(entity.formId.getName());
b.append(K_BQ);
b.append(" = @formId AND ");

b.append(K_BQ);
b.append(entity.taskType.getName());
b.append(K_BQ);
b.append(" = @taskType AND ");
b.append(K_BQ);
b.append(entity.expirationDateTime.getName());
b.append(K_BQ);
b.append(" = CAST(@minExpiration AS DATETIME)");
stmts.add(b.toString());
b.setLength(0);
b.append("UNLOCK TABLES");
stmts.add(b.toString());
b.setLength(0);
b.append("COMMIT");
stmts.add(b.toString());
b.setLength(0);

TaskLockTable relation;
try {
relation = TaskLockTable.assertRelation(datastore, user);
} catch (ODKDatastoreException e) {
throw new ODKTaskLockException(PERSISTENCE_LAYER_PROBLEM, e);
}

boolean success = false;
try {

JdbcTemplate jdbc = datastore.getJdbcConnection();
jdbc.execute(new ConnectionCallback<Object>() {
Object o = jdbc.execute(new ConnectionCallback<Object>() {

@Override
public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
boolean success = false;
boolean oldValue = conn.getAutoCommit();
try {
conn.setAutoCommit(false);
conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
Statement stmt = conn.createStatement();
String lastResult = null;
for (String s : stmts) {
// for debugging: LogFactory.getLog(TaskLockImpl.class).info(s);
stmt.execute(s);
Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.CLOSE_CURSORS_AT_COMMIT);
if ( s.startsWith("SELECT") ) {
ResultSet rs = stmt.executeQuery(s);
if ( rs.first() ) {
lastResult = rs.getString(1);
}
rs.close();
} else {
stmt.executeUpdate(s);
}
stmt.close();
}
conn.commit();
success = uri.equals(lastResult);
} catch (Exception e) {
e.printStackTrace();
conn.rollback();
} finally {
Statement stmt = conn.createStatement();
LogFactory.getLog(TaskLockImpl.class).info("UNLOCK TABLES");
stmt.execute("UNLOCK TABLES");
conn.commit();
if ( !success ) {
Statement stmt = conn.createStatement();
LogFactory.getLog(TaskLockImpl.class).info("UNLOCK TABLES");
stmt.execute("UNLOCK TABLES");
conn.commit();
}
}
conn.setAutoCommit(oldValue);
return null;
return success ? uri : null;
}

});

success = o != null && uri.equals((String) o);

relation = TaskLockTable.assertRelation(datastore, user);
} catch (Exception e) {
throw new ODKTaskLockException(PERSISTENCE_LAYER_PROBLEM, e);
}
return (TaskLockTable) datastore.getEntity(relation, entity.getUri(), user);
if ( success ) {
return (TaskLockTable) datastore.getEntity(relation, uri, user);
} else {
throw new ODKEntityNotFoundException();
}
}

@Override
Expand Down
Loading

0 comments on commit cdf6057

Please sign in to comment.