Skip to content

Commit

Permalink
First commit
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryuk committed Jan 2, 2021
0 parents commit c81a368
Show file tree
Hide file tree
Showing 12 changed files with 601 additions and 0 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: CI

on:
push:
branches:
- '*'
- '!stable/**'

jobs:
build:
timeout-minutes: 7
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- uses: actions/setup-node@v1
with:
node-version: '15'
- name: Install npm dependencies
run: npm ci
- name: Run docker-compose
run: docker-compose up -d
- name: Run tests
run: docker-compose exec -T node npm run test
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package-lock.json
.idea
node_modules
7 changes: 7 additions & 0 deletions .mocharc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"extension": ["ts"],
"exit": true,
"recursive": true,
"require": ["ts-node/register"],
"inspect": "0.0.0.0:9231"
}
8 changes: 8 additions & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
test
tsconfig.json
.mocharc.json
LICENCE
node_modules
.gitignore
docker-compose.yml
package-lock.json
21 changes: 21 additions & 0 deletions LICENCE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2021 Alexander Dmitryuk

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
52 changes: 52 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Redis functions balancer
[![NPM](https://nodei.co/coden/redis-functions-balancer.png?downloads=true&downloadRank=true&stars=true)](https://nodei.co/npm/coden/redis-functions-balancer)

Balance executable NodeJs function with redis.

For example, if you have several functions (A, B, C) doing the same things (http requests, long-running code), and you want to execute it evenly.

Working in clusters (PM2, NodeJs Cluster).

Uses [Redis][0] list with rank and [Javascript iterators][1].

Ready to use with TypeScript and JavaScript.

## Installation
```
npm install @coden/redis-functions-balancer --save-prod
```

## Usage
```typescript
import CallableBalancer from "@coden/redis-functions-balancer";
const redis = require("redis");
const redisClient = redis.createClient(6379, 'redis');

// Your functions here
// ... //
const A = () => {};
const B = () => {};
const C = () => {};
// ... //
let balancer = new CallableBalancer([A, B, C], redisClient);
// or reuse balancer variable with another functions
balancer.setMethods([A, B]);
// ... //
// Get async iterator {done, value}
while ( (foo = await balancer.getAsyncIterator().next()) && !foo.done) {
// Your function A|B|C will be here evenly
let method = foo.value;

try {
// Executing on your way (
foo.value();
} catch (e) {
// something happen badly and you want to postpone executes of the function next 10 runs
balancer.increaseMethodRank(method, 10);
}
}

```

[0]: https://www.npmjs.com/package/redis
[1]: https://www.typescriptlang.org/docs/handbook/iterators-and-generators.html
14 changes: 14 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version: "3.5"
services:
redis:
image: redis:alpine
depends_on:
- node
node:
image: node:latest
volumes:
- .:/app
working_dir: /app
entrypoint: "sleep 100"
ports:
- 9231:9231
195 changes: 195 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
var __generator = (this && this.__generator) || function (thisArg, body) {
var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g;
return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g;
function verb(n) { return function (v) { return step([n, v]); }; }
function step(op) {
if (f) throw new TypeError("Generator is already executing.");
while (_) try {
if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t;
if (y = 0, t) op = [op[0] & 2, t.value];
switch (op[0]) {
case 0: case 1: t = op; break;
case 4: _.label++; return { value: op[1], done: false };
case 5: _.label++; y = op[1]; op = [0]; continue;
case 7: op = _.ops.pop(); _.trys.pop(); continue;
default:
if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; }
if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; }
if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; }
if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; }
if (t[2]) _.ops.pop();
_.trys.pop(); continue;
}
op = body.call(thisArg, _);
} catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; }
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true };
}
};
var __await = (this && this.__await) || function (v) { return this instanceof __await ? (this.v = v, this) : new __await(v); }
var __asyncGenerator = (this && this.__asyncGenerator) || function (thisArg, _arguments, generator) {
if (!Symbol.asyncIterator) throw new TypeError("Symbol.asyncIterator is not defined.");
var g = generator.apply(thisArg, _arguments || []), i, q = [];
return i = {}, verb("next"), verb("throw"), verb("return"), i[Symbol.asyncIterator] = function () { return this; }, i;
function verb(n) { if (g[n]) i[n] = function (v) { return new Promise(function (a, b) { q.push([n, v, a, b]) > 1 || resume(n, v); }); }; }
function resume(n, v) { try { step(g[n](v)); } catch (e) { settle(q[0][3], e); } }
function step(r) { r.value instanceof __await ? Promise.resolve(r.value.v).then(fulfill, reject) : settle(q[0][2], r); }
function fulfill(value) { resume("next", value); }
function reject(value) { resume("throw", value); }
function settle(f, v) { if (f(v), q.shift(), q.length) resume(q[0][0], q[0][1]); }
};
var __spreadArrays = (this && this.__spreadArrays) || function () {
for (var s = 0, i = 0, il = arguments.length; i < il; i++) s += arguments[i].length;
for (var r = Array(s), k = 0, i = 0; i < il; i++)
for (var a = arguments[i], j = 0, jl = a.length; j < jl; j++, k++)
r[k] = a[j];
return r;
};
Object.defineProperty(exports, "__esModule", { value: true });
var util_1 = require("util");
var CallableBalancer = /** @class */ (function () {
/**
*
* @param methods not empty array of functions
* @param redisClient
*/
function CallableBalancer(methods, redisClient) {
this._STORE_PREFIX = 'balancer';
this.INC_VALUE = 1;
this._redisClient = redisClient;
this._methods = methods;
this._storeKey = this.makeStoreKey(methods);
// Initialize Redis functions as async await
this._functions = {
delAsync: util_1.promisify(redisClient.DEL).bind(this._redisClient),
zAddAsync: util_1.promisify(redisClient.ZADD).bind(this._redisClient),
zRangeAsync: util_1.promisify(redisClient.zrange).bind(this._redisClient),
zIncRbyAsync: util_1.promisify(redisClient.zincrby).bind(this._redisClient),
};
}
CallableBalancer.prototype.setMethods = function (methods) {
this._methods = methods;
this._storeKey = this.makeStoreKey(methods);
};
CallableBalancer.prototype.increaseMethodRank = function (method, incValue) {
if (incValue === void 0) { incValue = this.INC_VALUE; }
return __awaiter(this, void 0, void 0, function () {
return __generator(this, function (_a) {
switch (_a.label) {
case 0: return [4 /*yield*/, this._functions.zIncRbyAsync(this._storeKey, incValue, method.name)];
case 1:
_a.sent();
return [2 /*return*/];
}
});
});
};
CallableBalancer.prototype.getAsyncIterator = function () {
return __asyncGenerator(this, arguments, function getAsyncIterator_1() {
var storedMethodNames, _i, storedMethodNames_1, methodName, _a, _b, method;
return __generator(this, function (_c) {
switch (_c.label) {
case 0: return [4 /*yield*/, __await(this.getRange())];
case 1:
storedMethodNames = _c.sent();
_i = 0, storedMethodNames_1 = storedMethodNames;
_c.label = 2;
case 2:
if (!(_i < storedMethodNames_1.length)) return [3 /*break*/, 9];
methodName = storedMethodNames_1[_i];
_a = 0, _b = this._methods;
_c.label = 3;
case 3:
if (!(_a < _b.length)) return [3 /*break*/, 8];
method = _b[_a];
if (!(method.name === methodName)) return [3 /*break*/, 7];
return [4 /*yield*/, __await(this.increaseMethodRank(method, this.INC_VALUE))];
case 4:
_c.sent();
return [4 /*yield*/, __await(method)];
case 5: return [4 /*yield*/, _c.sent()];
case 6:
_c.sent();
_c.label = 7;
case 7:
_a++;
return [3 /*break*/, 3];
case 8:
_i++;
return [3 /*break*/, 2];
case 9: return [2 /*return*/];
}
});
});
};
/**
* Clear store
*/
CallableBalancer.prototype.resetStore = function () {
return __awaiter(this, void 0, void 0, function () {
return __generator(this, function (_a) {
switch (_a.label) {
case 0: return [4 /*yield*/, this._functions.delAsync(this._storeKey)];
case 1:
_a.sent();
return [2 /*return*/];
}
});
});
};
CallableBalancer.prototype.getStoreKey = function () {
return this._storeKey;
};
/**
* Return redis key to store list of methods with ranks
* @param methods
* @protected
*/
CallableBalancer.prototype.makeStoreKey = function (methods) {
var storeKeyArray = [this._STORE_PREFIX];
methods.forEach(function (method) {
storeKeyArray.push(method.name);
});
return storeKeyArray.join('.');
};
/**
* Returns an Array stored in Redis in Rank order
* @private
*/
CallableBalancer.prototype.getRange = function () {
return __awaiter(this, void 0, void 0, function () {
var storedMethodNames, args_1, result_1;
var _a;
return __generator(this, function (_b) {
switch (_b.label) {
case 0: return [4 /*yield*/, this._functions.zRangeAsync(this._storeKey, 0, -1)];
case 1:
storedMethodNames = _b.sent();
if (!(storedMethodNames.length !== this._methods.length)) return [3 /*break*/, 3];
args_1 = [], result_1 = [];
this._methods.forEach(function (method) {
// Default rank is 1
args_1.push("1", method.name);
result_1.push(method.name);
});
return [4 /*yield*/, (_a = this._functions).zAddAsync.apply(_a, __spreadArrays([this._storeKey, 'NX'], args_1))];
case 2:
_b.sent();
return [2 /*return*/, result_1];
case 3: return [2 /*return*/, storedMethodNames];
}
});
});
};
return CallableBalancer;
}());
exports.default = CallableBalancer;
Loading

0 comments on commit c81a368

Please sign in to comment.