Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JDBC-level pagination/streaming of large queries #142

Open
andrus opened this issue Feb 8, 2018 · 5 comments
Open

JDBC-level pagination/streaming of large queries #142

andrus opened this issue Feb 8, 2018 · 5 comments

Comments

@andrus
Copy link
Contributor

andrus commented Feb 8, 2018

When reading large datasets (e.g. inside CreateOrUpdateTask.getRowReader() / JdbcExtractor.getReader()), we are using Cayenne iterator, still the underlying DB may read the entire ResultSet in memory. This causes two problems:

  • Excessive memory use (and potential OutOfMemory exceptions)
  • Read connection inactivity timeouts (as the read connection sits idle for the duration of the LinkMove job).

A Cayenne side of this should use SQLSelect.statementFetchSize(batchSize). But some DBs may require extra settings at the JDBC level for this flag to take effect. Specifically MySQL requires this:

stmt = conn.createStatement(
              ResultSet.TYPE_FORWARD_ONLY,
              ResultSet.CONCUR_READ_ONLY);

So the goal of this task is to either figure out a transparent solution for JDBC-level result streaming or develop a set of simple recipes for MySQL, PostgreSQL, SQLServer as most common DB engines on how to solve it (e.g. via URL parameters, etc.) Or provide some combination of the two. (See a MySQL recipe below in comments).

@andrus
Copy link
Contributor Author

andrus commented Feb 8, 2018

On MySQL the magic sauce seems to be useCursorFetch=true&defaultFetchSize=500 parameters added to the URL that switches the ResultSet to the batch mode. Here are the before and after memory profiles:

image
screen shot 2018-02-08 at 5 27 39 pm

@vitalz
Copy link

vitalz commented Jan 8, 2020

I experienced connectivity timeouts for Delete task (imagine, there was a quite slow stage listener causing data have been processed about 2-3 hours). Though useCursorFetch=true&defaultFetchSize=500 params were on JDBC URL. Per a log I noted that failing it complained on the first query which selected all targets: com.nhl.link.move.runtime.task.delete.DeleteTask.createTargetSelect()

@vitalz
Copy link

vitalz commented Jan 9, 2020

Changed behaviour in decorated DeleteTask resolves a connection timeout issue: when com.nhl.link.move.runtime.task.delete.DeleteTask.createTargetSelect() returns com.nhl.link.move.runtime.task.delete.CollectionResultIterator as org.apache.cayenne.ResultIterator then job result is successful.

Otherwise it might fail fail like this way with MySQL & HikariCP:

[04/Jan/2020:06:44:09,864] bootique-job-1 KOaLIhs-80000000 ? WARN  c.z.h.p.ProxyConnection: <datasourceName> - Connection com.mysql.jdbc.JDBC4Connection@447be6c4 marked as broken because of SQLSTATE(08007), ErrorCode(0)
com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Communications link failure during commit(). Transaction resolution unknown.
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
	at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
	at com.mysql.jdbc.Util.getInstance(Util.java:387)
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:919)
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:862)
	at com.mysql.jdbc.ConnectionImpl.commit(ConnectionImpl.java:1621)
	at com.zaxxer.hikari.pool.ProxyConnection.commit(ProxyConnection.java:368)
	at com.zaxxer.hikari.pool.HikariProxyConnection.commit(HikariProxyConnection.java)
	at org.apache.cayenne.tx.TransactionConnectionDecorator.commit(TransactionConnectionDecorator.java:76)
	at org.apache.cayenne.tx.CayenneTransaction.processCommit(CayenneTransaction.java:84)
	at org.apache.cayenne.tx.BaseTransaction.commit(BaseTransaction.java:149)
	at org.apache.cayenne.access.TransactionResultIteratorDecorator.close(TransactionResultIteratorDecorator.java:68)
	at org.apache.cayenne.access.DataContext$DataRowResultIterator.close(DataContext.java:1247)
	at com.nhl.link.move.runtime.task.delete.DeleteTask.run(DeleteTask.java:60)

@andrus
Copy link
Contributor Author

andrus commented Jan 9, 2020

@vitalz : could you post your code changes that help to address the problem?

@vitalz
Copy link

vitalz commented Jan 10, 2020

package com.nhl.link.move.runtime.task.delete;

import java.lang.reflect.Field;
import java.util.Map;

import org.apache.cayenne.DataObject;
import org.apache.cayenne.ResultIterator;
import org.apache.cayenne.query.ObjectSelect;
import org.apache.commons.lang.NotImplementedException;

import com.nhl.link.move.batch.BatchProcessor;
import com.nhl.link.move.batch.BatchRunner;
import com.nhl.link.move.runtime.cayenne.ITargetCayenneService;
import com.nhl.link.move.Execution;
import com.nhl.link.move.SyncToken;


public final class DeleteTaskDecorator<T extends DataObject> extends DeleteTask<T> {
	private final DeleteTask<T> task;

	public DeleteTaskDecorator(DeleteTask<T> task) {
		super(null, new Integer(0), null, null, null, null, null);
		this.task = task;
	}
	
	@Override
	public Execution run() {
		throw new NotImplementedException(); // fail fast
	};
	
	
	@Override
	public Execution run(@SuppressWarnings("rawtypes") Map params) {

		try (
			@SuppressWarnings("unchecked")
			Execution execution = new Execution("DeleteTask:" + extractorName, params);) {

			BatchProcessor<T> batchProcessor = this.createBatchProcessor(execution); 
			ResultIterator<T> data = this.createTargetSelect();

			try {
				BatchRunner.create(batchProcessor).withBatchSize(this.task.batchSize).run(data);
			} finally {
				data.close();
			}

			return execution;
		}
	}
	
	@Override
	public Execution run(SyncToken token) {
		throw new NotImplementedException(); // fail fast
	}
	
	@Override
	public Execution run(SyncToken token, Map<String, ?> params) {
		throw new NotImplementedException(); // fail fast
	}
	
	private ITargetCayenneService getTargetCayenneService() throws RuntimeException {
		
		ITargetCayenneService targetCayenneService = null;
		
		try {
			Field f = this.task.getClass().getDeclaredField("targetCayenneService");
			f.setAccessible(true);
			targetCayenneService = (ITargetCayenneService) f.get(this.task);
		} catch (Throwable t) {
			throw new RuntimeException(t);
		}
		
		return targetCayenneService;
	}
	
	@Override
	protected ResultIterator<T> createTargetSelect() {
		ObjectSelect<T> query = ObjectSelect.query(this.task.type).where(this.task.targetFilter);
		return new CollectionResultIterator<>(query.select(getTargetCayenneService().newContext()));
	};
	
	@Override
	protected BatchProcessor<T> createBatchProcessor(Execution execution) {
		return this.task.createBatchProcessor(execution);
	};
	
	
	
}
result = new DeleteTaskDecorator<MyEntity>(
      (DeleteTask<MyEntity>)
        lmRuntime.service(ITaskService.class)
        .delete(MyEntity.class)
        .sourceMatchExtractor(EXTRACTOR_PATH)
        .stageListener(processorService.processor(MyEntity.class, EXTRACTOR_PATH).failOnErrors().build())
        .matchBy(new CustomMapper())
        .batchSize(10000) // what limit should be there?
        .task()
    )
    .run(parameters)
    .createReport();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants