Skip to content

Commit

Permalink
force poll after queued operations finish
Browse files Browse the repository at this point in the history
Summary:
We had a bug with forcing polling after operations run. This behavior is important to make the UI update as things are run.

We had code to force a refresh after every operation. But, runOrQueueOperation would exit immediately when an operation is queued, causing the force refresh to happen right away too (fairly uselessly).

Here, I change the semantics so that runOrQueueOperation resolves only when an operation exits... or if it's determined it will never run because of an earlier error.

I think this was always broken, but made much more apparent by the `configHoldOffRefreshMs` that reduces overfetching during the first 10s of a command running.

Note: it seems there's still some stuff to polish here, I've seen cases where progress of the queued submit is not making it to the client.

Reviewed By: quark-zju

Differential Revision: D54338494

fbshipit-source-id: f0cfbbbf51d61295c92f82d72e0e4efe5dccda4d
  • Loading branch information
evangrayk authored and facebook-github-bot committed Feb 29, 2024
1 parent 71513eb commit 3473e19
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 14 deletions.
35 changes: 32 additions & 3 deletions addons/isl-server/src/OperationQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import type {
OperationProgress,
RunnableOperation,
} from 'isl/src/types';
import type {Deferred} from 'shared/utils';

import {clearTrackedCache} from 'shared/LRU';
import {newAbortController} from 'shared/compat';
import {defer} from 'shared/utils';

/**
* Handle running & queueing all Operations so that only one Operation runs at once.
Expand All @@ -35,17 +37,26 @@ export class OperationQueue {
private runningOperation: RunnableOperation | undefined = undefined;
private runningOperationStartTime: Date | undefined = undefined;
private abortController: AbortController | undefined = undefined;
private deferredOperations = new Map<string, Deferred<'ran' | 'skipped'>>();

/**
* Run an operation, or if one is already running, add it to the queue.
* Promise resolves with:
* - 'ran', when the operation exits (no matter success/failure), even if it was enqueued.
* - 'skipped', when the operation is never going to be run, since an earlier queued command errored.
*/
async runOrQueueOperation(
operation: RunnableOperation,
onProgress: (progress: OperationProgress) => void,
tracker: ServerSideTracker,
cwd: string,
): Promise<void> {
): Promise<'ran' | 'skipped'> {
if (this.runningOperation != null) {
this.queuedOperations.push({...operation, tracker});
const deferred = defer<'ran' | 'skipped'>();
this.deferredOperations.set(operation.id, deferred);
onProgress({id: operation.id, kind: 'queue', queue: this.queuedOperations.map(o => o.id)});
return;
return deferred.promise;
}
this.runningOperation = operation;
this.runningOperationStartTime = new Date();
Expand Down Expand Up @@ -90,11 +101,19 @@ export class OperationQueue {
const errString = (err as Error).toString();
this.logger.log('error running operation: ', operation.args[0], errString);
onProgress({id: operation.id, kind: 'error', error: errString});
// clear queue to run when we hit an error

// clear queue to run when we hit an error,
// which also requires resolving all their promises
for (const queued of this.queuedOperations) {
this.resolveDeferredPromise(queued.id, 'skipped');
}
this.queuedOperations = [];
} finally {
this.runningOperationStartTime = undefined;
this.runningOperation = undefined;

// resolve original enqueuer's promise
this.resolveDeferredPromise(operation.id, 'ran');
}

// now that we successfully ran this operation, dequeue the next
Expand All @@ -114,6 +133,16 @@ export class OperationQueue {
// Attempt to free some memory.
clearTrackedCache();
}

return 'ran';
}

private resolveDeferredPromise(id: string, kind: 'ran' | 'skipped') {
const found = this.deferredOperations.get(id);
if (found != null) {
found.resolve(kind);
this.deferredOperations.delete(id);
}
}

/**
Expand Down
17 changes: 13 additions & 4 deletions addons/isl-server/src/Repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ export class Repository {
if (pollKind !== 'force' && shouldWait()) {
// Do nothing. This is fine because after the operation
// there will be a refresh.
this.logger.info('polling prevented from shouldWait');
return;
}
if (kind === 'uncommitted changes') {
Expand Down Expand Up @@ -465,18 +466,26 @@ export class Repository {
* Run long-lived command which mutates the repository state.
* Progress is streamed back as it comes in.
* Operations are run immediately. For queueing, see OperationQueue.
* This promise resolves when the operation exits.
*/
async runOrQueueOperation(
operation: RunnableOperation,
onProgress: (progress: OperationProgress) => void,
tracker: ServerSideTracker,
cwd: string,
): Promise<void> {
await this.operationQueue.runOrQueueOperation(operation, onProgress, tracker, cwd);
const result = await this.operationQueue.runOrQueueOperation(
operation,
onProgress,
tracker,
cwd,
);

// After any operation finishes, make sure we poll right away,
// so the UI is guarnateed to get the latest data.
this.watchForChanges.poll('force');
if (result !== 'skipped') {
// After any operation finishes, make sure we poll right away,
// so the UI is guarnateed to get the latest data.
this.watchForChanges.poll('force');
}
}

/**
Expand Down
22 changes: 15 additions & 7 deletions addons/isl-server/src/__tests__/OperationQueue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ describe('OperationQueue', () => {
expect(runCallback).toHaveBeenCalledTimes(1);

p.resolve(undefined);
await runPromise;
const result = await runPromise;
expect(result).toEqual('ran');

expect(runCallback).toHaveBeenCalledTimes(1);

Expand Down Expand Up @@ -76,7 +77,8 @@ describe('OperationQueue', () => {
'cwd',
);

await runPromise;
const result = await runPromise;
expect(result).toEqual('ran');

expect(onProgress).toHaveBeenCalledWith(
expect.objectContaining({id: '1', kind: 'spawn', queue: []}),
Expand Down Expand Up @@ -113,7 +115,9 @@ describe('OperationQueue', () => {
queue.abortRunningOperation('wrong-id');
expect(onProgress).not.toHaveBeenCalled();
queue.abortRunningOperation(id);
await op;
const result = await op;
expect(result).toEqual('ran');

expect(onProgress).toHaveBeenCalledWith(
expect.objectContaining({id, kind: 'exit', exitCode: 130}),
);
Expand Down Expand Up @@ -162,13 +166,15 @@ describe('OperationQueue', () => {
expect(onProgress).toHaveBeenCalledWith(expect.objectContaining({kind: 'queue', queue: ['2']}));

p1.resolve(undefined);
await runPromise1;
const result1 = await runPromise1;
expect(result1).toEqual('ran');

// now it's dequeued
expect(runP2).toHaveBeenCalled();

p2.resolve(undefined);
await runPromise2;
const result2 = await runPromise2;
expect(result2).toEqual('ran');

expect(runCallback).toHaveBeenCalledTimes(2);
});
Expand Down Expand Up @@ -214,14 +220,16 @@ describe('OperationQueue', () => {

p1.reject(new Error('fake error'));
// run promise still resolves, but error message was sent
await runPromise1;
const result1 = await runPromise1;
expect(result1).toEqual('ran');
expect(onProgress).toHaveBeenCalledWith(
expect.objectContaining({id: '1', kind: 'error', error: 'Error: fake error'}),
);

// p2 was cancelled by p1 failing
expect(runP2).not.toHaveBeenCalled();
await runPromise2;
const result2 = await runPromise2;
expect(result2).toEqual('skipped');
expect(runCallback).toHaveBeenCalledTimes(1);
});

Expand Down

0 comments on commit 3473e19

Please sign in to comment.