Skip to content

Commit

Permalink
feat(ref-imp): #319 - Added ability to turn on or off Observer and Ba…
Browse files Browse the repository at this point in the history
…tch Writer

* feat(ref-imp): #319 - Added ability to turn on or off Observer and Batch Writer
* chore(ref-imp): many ESlint error fixes
  • Loading branch information
thehenrytsai authored Oct 16, 2020
1 parent 8c59559 commit 13e0563
Show file tree
Hide file tree
Showing 35 changed files with 96 additions and 75 deletions.
4 changes: 2 additions & 2 deletions lib/common/MongoDbTransactionStore.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Collection, Cursor, Db, Long, MongoClient } from 'mongodb';
import ITransactionStore from '../core/interfaces/ITransactionStore';
import TransactionModel from './models/TransactionModel';
import { Collection, Cursor, Db, Long, MongoClient } from 'mongodb';

/**
* Implementation of ITransactionStore that stores the transaction data in a MongoDB database.
Expand Down Expand Up @@ -153,7 +153,7 @@ export default class MongoDbTransactionStore implements ITransactionStore {
* @param transactionTimeHash the transaction time hash which the transactions should be removed for
*/
public async removeTransactionByTransactionTimeHash (transactionTimeHash: string) {
await this.transactionCollection!.deleteMany({ transactionTimeHash: { $eq: transactionTimeHash } });
await this.transactionCollection!.deleteMany({ transactionTimeHash: { $eq: transactionTimeHash } });
}

/**
Expand Down
2 changes: 1 addition & 1 deletion lib/common/ReadableStream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import SidetreeError from './SidetreeError';
import ErrorCode from '../common/SharedErrorCode';
import SidetreeError from './SidetreeError';

/**
* ReadableStream utilities
Expand Down
2 changes: 1 addition & 1 deletion lib/core/BatchScheduler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as timeSpan from 'time-span';
import IBlockchain from './interfaces/IBlockchain';
import IVersionManager from './interfaces/IVersionManager';
import timeSpan = require('time-span');

/**
* Class that performs periodic writing of batches of Sidetree operations to CAS and blockchain.
Expand Down
2 changes: 1 addition & 1 deletion lib/core/Blockchain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ import BlockchainTimeModel from './models/BlockchainTimeModel';
import CoreErrorCode from './ErrorCode';
import IBlockchain from './interfaces/IBlockchain';
import JsonAsync from './versions/latest/util/JsonAsync';
import nodeFetch from 'node-fetch';
import ReadableStream from '../common/ReadableStream';
import ServiceVersionFetcher from './ServiceVersionFetcher';
import ServiceVersionModel from '../common/models/ServiceVersionModel';
import SharedErrorCode from '../common/SharedErrorCode';
import SidetreeError from '../common/SidetreeError';
import TransactionModel from '../common/models/TransactionModel';
import ValueTimeLockModel from '../common/models/ValueTimeLockModel';
import nodeFetch from 'node-fetch';

/**
* Class that communicates with the underlying blockchain using REST API defined by the protocol document.
Expand Down
30 changes: 20 additions & 10 deletions lib/core/Core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import Blockchain from './Blockchain';
import Config from './models/Config';
import DownloadManager from './DownloadManager';
import ICas from './interfaces/ICas';
import LogColor from '../common/LogColor';
import MongoDbOperationStore from './MongoDbOperationStore';
import MongoDbTransactionStore from '../common/MongoDbTransactionStore';
import MongoDbUnresolvableTransactionStore from './MongoDbUnresolvableTransactionStore';
Expand Down Expand Up @@ -32,16 +33,18 @@ export default class Core {
/**
* Core constructor.
*/
public constructor (config: Config, versionModels: VersionModel[], private cas: ICas) {
public constructor (private config: Config, versionModels: VersionModel[], private cas: ICas) {
// Component dependency construction & injection.
this.versionManager = new VersionManager(config, versionModels); // `VersionManager` is first constructed component.
this.versionManager = new VersionManager(config, versionModels); // `VersionManager` is first constructed component as multiple components depend on it.
this.serviceInfo = new ServiceInfo('core');
this.operationStore = new MongoDbOperationStore(config.mongoDbConnectionString, config.databaseName);
this.blockchain = new Blockchain(config.blockchainServiceUri);
this.downloadManager = new DownloadManager(config.maxConcurrentDownloads, this.cas);
this.resolver = new Resolver(this.versionManager, this.operationStore);
this.batchScheduler = new BatchScheduler(this.versionManager, this.blockchain, config.batchingIntervalInSeconds);
this.transactionStore = new MongoDbTransactionStore(config.mongoDbConnectionString, config.databaseName);
this.unresolvableTransactionStore = new MongoDbUnresolvableTransactionStore(config.mongoDbConnectionString, config.databaseName);

this.batchScheduler = new BatchScheduler(this.versionManager, this.blockchain, config.batchingIntervalInSeconds);
this.observer = new Observer(
this.versionManager,
this.blockchain,
Expand All @@ -51,8 +54,6 @@ export default class Core {
this.unresolvableTransactionStore,
config.observingIntervalInSeconds
);

this.serviceInfo = new ServiceInfo('core');
}

/**
Expand All @@ -71,11 +72,20 @@ export default class Core {
this.operationStore,
this.resolver,
this.transactionStore
); // `VersionManager` is last initialized component.
); // `VersionManager` is last initialized component as it needs many shared/common components to be ready first.

if (this.config.observingIntervalInSeconds > 0) {
await this.observer.startPeriodicProcessing();
} else {
console.warn(LogColor.yellow(`Transaction observer is disabled.`));
}

await this.observer.startPeriodicProcessing();
if (this.config.batchingIntervalInSeconds > 0) {
this.batchScheduler.startPeriodicBatchWriting();
} else {
console.warn(LogColor.yellow(`Batch writing is disabled.`));
}

this.batchScheduler.startPeriodicBatchWriting();
this.blockchain.startPeriodicCachedBlockchainTimeRefresh();
this.downloadManager.start();
}
Expand Down Expand Up @@ -114,8 +124,8 @@ export default class Core {
];

return {
status : ResponseStatus.Succeeded,
body : JSON.stringify(responses)
status: ResponseStatus.Succeeded,
body: JSON.stringify(responses)
};
}
}
2 changes: 1 addition & 1 deletion lib/core/MongoDbOperationStore.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Binary, Collection, Long, MongoClient } from 'mongodb';
import AnchoredOperationModel from './models/AnchoredOperationModel';
import IOperationStore from './interfaces/IOperationStore';
import OperationType from './enums/OperationType';
import { Binary, Collection, Long, MongoClient } from 'mongodb';

/**
* Sidetree operation stored in MongoDb.
Expand Down
2 changes: 1 addition & 1 deletion lib/core/MongoDbUnresolvableTransactionStore.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Collection, Db, Long, MongoClient } from 'mongodb';
import IUnresolvableTransactionStore from './interfaces/IUnresolvableTransactionStore';
import TransactionModel from '../common/models/TransactionModel';
import { Collection, Db, Long, MongoClient } from 'mongodb';

interface IUnresolvableTransaction extends TransactionModel {
firstFetchTime: number;
Expand Down
4 changes: 2 additions & 2 deletions lib/core/Observer.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import * as timeSpan from 'time-span';
import TransactionUnderProcessingModel, { TransactionProcessingStatus } from './models/TransactionUnderProcessingModel';
import IBlockchain from './interfaces/IBlockchain';
import IOperationStore from './interfaces/IOperationStore';
import ITransactionProcessor from './interfaces/ITransactionProcessor';
Expand All @@ -6,10 +8,8 @@ import IUnresolvableTransactionStore from './interfaces/IUnresolvableTransaction
import IVersionManager from './interfaces/IVersionManager';
import SharedErrorCode from '../common/SharedErrorCode';
import SidetreeError from '../common/SidetreeError';
import timeSpan = require('time-span');
import ThroughputLimiter from './ThroughputLimiter';
import TransactionModel from '../common/models/TransactionModel';
import TransactionUnderProcessingModel, { TransactionProcessingStatus } from './models/TransactionUnderProcessingModel';

/**
* Class that performs periodic processing of batches of Sidetree operations anchored to the blockchain.
Expand Down
2 changes: 1 addition & 1 deletion lib/core/ServiceVersionFetcher.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import nodeFetch from 'node-fetch';
import ReadableStream from '../common/ReadableStream';
import ServiceVersionModel from '../common/models/ServiceVersionModel';
import nodeFetch from 'node-fetch';

/**
* Encapsulates the functionality of getting the version information from the dependent services.
Expand Down
2 changes: 1 addition & 1 deletion lib/core/ThroughputLimiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export default class ThroughputLimiter {
* @param transactions array of transactions to filter for
*/
public async getQualifiedTransactions (transactions: TransactionModel[]) {
let currentTransactionTime: number | undefined = undefined;
let currentTransactionTime: number | undefined;
const transactionsGroupedByTransactionTime: TransactionModel[][] = [];

for (const transaction of transactions) {
Expand Down
4 changes: 2 additions & 2 deletions lib/core/models/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
export default interface Config {
batchingIntervalInSeconds: number;
blockchainServiceUri: string;
databaseName: string | undefined;
didMethodName: string;
maxConcurrentDownloads: number;
observingIntervalInSeconds: number;
mongoDbConnectionString: string;
databaseName: string | undefined;
observingIntervalInSeconds: number;
}
4 changes: 2 additions & 2 deletions lib/core/versions/latest/BatchWriter.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import AnchorFile from './AnchorFile';
import AnchoredData from './models/AnchoredData';
import AnchoredDataSerializer from './AnchoredDataSerializer';
import AnchorFile from './AnchorFile';
import ChunkFile from './ChunkFile';
import CreateOperation from './CreateOperation';
import DeactivateOperation from './DeactivateOperation';
import FeeManager from './FeeManager';
import ICas from '../../interfaces/ICas';
import IBatchWriter from '../../interfaces/IBatchWriter';
import IBlockchain from '../../interfaces/IBlockchain';
import ICas from '../../interfaces/ICas';
import IOperationQueue from './interfaces/IOperationQueue';
import IVersionMetadataFetcher from '../../interfaces/IVersionMetadataFetcher';
import LogColor from '../../../common/LogColor';
Expand Down
2 changes: 1 addition & 1 deletion lib/core/versions/latest/ChunkFile.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as timeSpan from 'time-span';
import ChunkFileModel from './models/ChunkFileModel';
import Compressor from './util/Compressor';
import CreateOperation from './CreateOperation';
Expand All @@ -7,7 +8,6 @@ import JsonAsync from './util/JsonAsync';
import ProtocolParameters from './ProtocolParameters';
import RecoverOperation from './RecoverOperation';
import SidetreeError from '../../../common/SidetreeError';
import timeSpan = require('time-span');
import UpdateOperation from './UpdateOperation';

/**
Expand Down
2 changes: 1 addition & 1 deletion lib/core/versions/latest/Encoder.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import base64url from 'base64url';
import ErrorCode from './ErrorCode';
import SidetreeError from '../../../common/SidetreeError';
import base64url from 'base64url';

/**
* Class that encodes binary blobs into strings.
Expand Down
8 changes: 4 additions & 4 deletions lib/core/versions/latest/MongoDbOperationQueue.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Binary, Collection, Db, MongoClient } from 'mongodb';
import ErrorCode from './ErrorCode';
import IOperationQueue from './interfaces/IOperationQueue';
import SidetreeError from '../../../common/SidetreeError';
import { Binary, Collection, Db, MongoClient } from 'mongodb';
import QueuedOperationModel from './models/QueuedOperationModel';
import SidetreeError from '../../../common/SidetreeError';

/**
* Sidetree operation stored in MongoDb.
Expand Down Expand Up @@ -70,7 +70,7 @@ export default class MongoDbOperationQueue implements IOperationQueue {
return [];
}

const queuedOperations = await this.collection!.find().sort({ _id : 1 }).limit(count).toArray();
const queuedOperations = await this.collection!.find().sort({ _id: 1 }).limit(count).toArray();
const lastOperation = queuedOperations[queuedOperations.length - 1];
await this.collection!.deleteMany({ _id: { $lte: lastOperation._id } });

Expand All @@ -83,7 +83,7 @@ export default class MongoDbOperationQueue implements IOperationQueue {
}

// NOTE: `_id` is the default index that is sorted based by create time.
const queuedOperations = await this.collection!.find().sort({ _id : 1 }).limit(count).toArray();
const queuedOperations = await this.collection!.find().sort({ _id: 1 }).limit(count).toArray();

return queuedOperations.map((operation) => MongoDbOperationQueue.convertToQueuedOperationModel(operation));
}
Expand Down
2 changes: 1 addition & 1 deletion lib/core/versions/latest/OperationProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import AnchoredOperationModel from '../../models/AnchoredOperationModel';
import CreateOperation from './CreateOperation';
import DeactivateOperation from './DeactivateOperation';
import DocumentComposer from './DocumentComposer';
import DidState from '../../models/DidState';
import DocumentComposer from './DocumentComposer';
import ErrorCode from './ErrorCode';
import IOperationProcessor from '../../interfaces/IOperationProcessor';
import JsonCanonicalizer from './util/JsonCanonicalizer';
Expand Down
2 changes: 1 addition & 1 deletion lib/core/versions/latest/TransactionProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import AnchorFile from './AnchorFile';
import AnchoredDataSerializer from './AnchoredDataSerializer';
import AnchoredOperationModel from '../../models/AnchoredOperationModel';
import AnchorFile from './AnchorFile';
import ArrayMethods from './util/ArrayMethods';
import ChunkFile from './ChunkFile';
import ChunkFileModel from './models/ChunkFileModel';
Expand Down
2 changes: 1 addition & 1 deletion lib/core/versions/latest/util/Jwk.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ErrorCode from '../ErrorCode';
import { JWK } from 'jose';
import JwkEs256k from '../../../models/JwkEs256k';
import SidetreeError from '../../../../common/SidetreeError';
import { JWK } from 'jose';

/**
* Class containing reusable JWK operations.
Expand Down
2 changes: 1 addition & 1 deletion lib/core/versions/latest/util/Jws.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import Encoder from '../Encoder';
import ErrorCode from '../ErrorCode';
import { JWS } from 'jose';
import JwkEs256k from '../../../models/JwkEs256k';
import JwsModel from '../models/JwsModel';
import SidetreeError from '../../../../common/SidetreeError';
import { JWS } from 'jose';

/**
* Class containing reusable JWS operations.
Expand Down
11 changes: 5 additions & 6 deletions lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
// NOTE: Aliases to classes and interfaces are used for external consumption.

// Core service exports.
import ISidetreeBitcoinConfig from './bitcoin/IBitcoinConfig';
import ISidetreeBitcoinWallet from './bitcoin/interfaces/IBitcoinWallet';
import ISidetreeCas from './core/interfaces/ICas';
import SidetreeCore from './core/Core';
import SidetreeBitcoinProcessor from './bitcoin/BitcoinProcessor';
import SidetreeConfig from './core/models/Config';
import SidetreeCore from './core/Core';
import SidetreeResponse from './common/Response';
import SidetreeResponseModel from './common/models/ResponseModel';
import SidetreeVersionModel from './common/models/VersionModel';

// Core service exports.
export {
ISidetreeCas,
SidetreeConfig,
Expand All @@ -18,10 +21,6 @@ export {
};

// Blockchain service exports.
import SidetreeBitcoinProcessor from './bitcoin/BitcoinProcessor';
import ISidetreeBitcoinConfig from './bitcoin/IBitcoinConfig';
import ISidetreeBitcoinWallet from './bitcoin/interfaces/IBitcoinWallet';

export {
ISidetreeBitcoinConfig,
ISidetreeBitcoinWallet,
Expand Down
6 changes: 3 additions & 3 deletions lib/ipfs/Ipfs.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import * as crypto from 'crypto';
import * as HttpStatus from 'http-status';
import * as crypto from 'crypto';
import * as url from 'url';
import base64url from 'base64url';
import FetchResult from '../common/models/FetchResult';
import FetchResultCode from '../common/enums/FetchResultCode';
import ICas from '../core/interfaces/ICas';
import IpfsErrorCode from '../ipfs/IpfsErrorCode';
import nodeFetch from 'node-fetch';
import ReadableStream from '../common/ReadableStream';
import SharedErrorCode from '../common/SharedErrorCode';
import SidetreeError from '../common/SidetreeError';
import Timeout from './Util/Timeout';
import base64url from 'base64url';
import nodeFetch from 'node-fetch';

const multihashes = require('multihashes');

Expand Down
2 changes: 1 addition & 1 deletion tests/bitcoin/MongoDbBlockMetadataStore.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async function createBlockMetadataStore (storeUri: string, databaseName: string)
}

describe('MongoDbBlockMetadataStore', async () => {
const config: Config = require('../json/bitcoin-config-test.json');
const config: Config = require('../json/config-test.json');
const databaseName = 'sidetree-test';

let mongoServiceAvailable = false;
Expand Down
2 changes: 1 addition & 1 deletion tests/bitcoin/MongoDbServiceStateStore.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async function createStore (storeUri: string, databaseName: string): Promise<Mon

describe('MongoDbSeviceStateStore', async () => {

const config: Config = require('../json/bitcoin-config-test.json');
const config: Config = require('../json/config-test.json');
const databaseName = 'sidetree-test';

let mongoServiceAvailable = false;
Expand Down
2 changes: 1 addition & 1 deletion tests/core/Blockchain.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import ErrorCode from '../../lib/bitcoin/ErrorCode';
import JasmineSidetreeErrorValidator from '../JasmineSidetreeErrorValidator';
import ReadableStream from '../../lib/common/ReadableStream';
import ServiceVersionModel from '../../lib/common/models/ServiceVersionModel';
import SidetreeError from '../../lib/common/SidetreeError';
import SharedErrorCode from '../../lib/common/SharedErrorCode';
import SidetreeError from '../../lib/common/SidetreeError';
import TransactionModel from '../../lib/common/models/TransactionModel';
import ValueTimeLockModel from '../../lib/common/models/ValueTimeLockModel';

Expand Down
2 changes: 1 addition & 1 deletion tests/core/ChunkFile.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import * as crypto from 'crypto';
import ChunkFile from '../../lib/core/versions/latest/ChunkFile';
import Compressor from '../../lib/core/versions/latest/util/Compressor';
import Encoder from '../../lib/core/versions/latest/Encoder';
import ErrorCode from '../../lib/core/versions/latest/ErrorCode';
import JasmineSidetreeErrorValidator from '../JasmineSidetreeErrorValidator';
import Jwk from '../../lib/core/versions/latest/util/Jwk';
import Compressor from '../../lib/core/versions/latest/util/Compressor';
import OperationGenerator from '../generators/OperationGenerator';

describe('ChunkFile', async () => {
Expand Down
Loading

0 comments on commit 13e0563

Please sign in to comment.