Skip to content

Commit

Permalink
feat: 实现 Storage 挂载
Browse files Browse the repository at this point in the history
  • Loading branch information
LokiSharp committed Oct 24, 2023
1 parent 32f4f35 commit 3554e2b
Show file tree
Hide file tree
Showing 12 changed files with 321 additions and 23 deletions.
1 change: 1 addition & 0 deletions common/src/Rpc/JSONFrameStream.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Frame } from "@/types";
import { Writable, WritableOptions } from "stream";

export class JSONFrameStream extends Writable {
Expand Down
1 change: 1 addition & 0 deletions common/src/Rpc/RpcClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Socket } from "net";
import { EventEmitter } from "events";
import { JSONFrameStream } from "@/Rpc/JSONFrameStream";
import { Defer, Reject, Resolve, RpcClientFrameObj } from "@/types";

export class RpcClient {
public socket: Socket;
Expand Down
1 change: 1 addition & 0 deletions common/src/Rpc/RpcServer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Socket } from "net";
import { JSONFrameStream } from "@/Rpc/JSONFrameStream";
import { RpcMethods, RpcResponse, RpcServerFrameObj } from "@/types";

export class RpcServer {
public socket: Socket;
Expand Down
151 changes: 151 additions & 0 deletions common/src/Storage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import net from "net";
import { RpcClient } from "@/Rpc/RpcClient";
import { config } from "@/ConfigManager";
import { DataBaseStorage, Defer } from "@/types";
export let connected = false;
export let resetAllData: () => Defer;
export const db: DataBaseStorage = {} as DataBaseStorage;
export const queue = {};
export const env = {};
export let socket: net.Socket;
export const pubSub = {
keys: {
QUEUE_DONE: "queueDone:",
RUNTIME_RESTART: "runtimeRestart",
TICK_STARTED: "tickStarted",
ROOMS_DONE: "roomsDone",
},
};
export function storageConnect(): Promise<unknown> {
if (connected) {
return new Promise<void>(() => {});
}

if (!process.env.STORAGE_PORT) {
throw new Error("STORAGE_PORT environment variable is not set!");
}

console.log("Connecting to storage");

socket = net.connect(
parseInt(process.env.STORAGE_PORT!),
process.env.STORAGE_HOST,
);
const rpcClient = new RpcClient(socket);

const defer = RpcClient.defer();
const resetDefer = RpcClient.defer();

function resetInterceptor(
fn: (...args: unknown[]) => Defer,

Check warning on line 40 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (macos-latest, 18.x)

'args' is defined but never used

Check warning on line 40 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (macos-latest, 20.x)

'args' is defined but never used

Check warning on line 40 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (macos-latest, 16.x)

'args' is defined but never used

Check warning on line 40 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (macos-latest, latest)

'args' is defined but never used

Check warning on line 40 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (windows-latest, 16.x)

'args' is defined but never used

Check warning on line 40 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (windows-latest, 18.x)

'args' is defined but never used

Check warning on line 40 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (windows-latest, 20.x)

'args' is defined but never used

Check warning on line 40 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (windows-latest, latest)

'args' is defined but never used

Check warning on line 40 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 18.x)

'args' is defined but never used

Check warning on line 40 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 20.x)

'args' is defined but never used

Check warning on line 40 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 16.x)

'args' is defined but never used

Check warning on line 40 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, latest)

'args' is defined but never used
): (...args: unknown[]) => Defer {

Check warning on line 41 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (macos-latest, 18.x)

'args' is defined but never used

Check warning on line 41 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (macos-latest, 20.x)

'args' is defined but never used

Check warning on line 41 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (macos-latest, 16.x)

'args' is defined but never used

Check warning on line 41 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (macos-latest, latest)

'args' is defined but never used

Check warning on line 41 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (windows-latest, 16.x)

'args' is defined but never used

Check warning on line 41 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (windows-latest, 18.x)

'args' is defined but never used

Check warning on line 41 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (windows-latest, 20.x)

'args' is defined but never used

Check warning on line 41 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (windows-latest, latest)

'args' is defined but never used

Check warning on line 41 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 18.x)

'args' is defined but never used

Check warning on line 41 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 20.x)

'args' is defined but never used

Check warning on line 41 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 16.x)

'args' is defined but never used

Check warning on line 41 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, latest)

'args' is defined but never used
return fn;
}

function wrapCollection(collectionName: string): {
[name: string]: (...args: unknown[]) => Defer;

Check warning on line 46 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (macos-latest, 18.x)

'args' is defined but never used

Check warning on line 46 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (macos-latest, 20.x)

'args' is defined but never used

Check warning on line 46 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (macos-latest, 16.x)

'args' is defined but never used

Check warning on line 46 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (macos-latest, latest)

'args' is defined but never used

Check warning on line 46 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (windows-latest, 16.x)

'args' is defined but never used

Check warning on line 46 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (windows-latest, 18.x)

'args' is defined but never used

Check warning on line 46 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (windows-latest, 20.x)

'args' is defined but never used

Check warning on line 46 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (windows-latest, latest)

'args' is defined but never used

Check warning on line 46 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 18.x)

'args' is defined but never used

Check warning on line 46 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 20.x)

'args' is defined but never used

Check warning on line 46 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 16.x)

'args' is defined but never used

Check warning on line 46 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, latest)

'args' is defined but never used
} {
const wrap: { [name: string]: (...args: unknown[]) => Defer } = {};

Check warning on line 48 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (macos-latest, 18.x)

'args' is defined but never used

Check warning on line 48 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (macos-latest, 20.x)

'args' is defined but never used

Check warning on line 48 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (macos-latest, 16.x)

'args' is defined but never used

Check warning on line 48 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (macos-latest, latest)

'args' is defined but never used

Check warning on line 48 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (windows-latest, 16.x)

'args' is defined but never used

Check warning on line 48 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (windows-latest, 18.x)

'args' is defined but never used

Check warning on line 48 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (windows-latest, 20.x)

'args' is defined but never used

Check warning on line 48 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (windows-latest, latest)

'args' is defined but never used

Check warning on line 48 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 18.x)

'args' is defined but never used

Check warning on line 48 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 20.x)

'args' is defined but never used

Check warning on line 48 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 16.x)

'args' is defined but never used

Check warning on line 48 in common/src/Storage.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, latest)

'args' is defined but never used
[
"find",
"findOne",
"by",
"clear",
"count",
"ensureIndex",
"removeWhere",
"insert",
].forEach((method) => {
wrap[method] = function (): Defer {
return rpcClient.request(
"dbRequest",
collectionName,
method,
// eslint-disable-next-line prefer-rest-params
Array.prototype.slice.call(arguments),
);
};
});
wrap.update = resetInterceptor(function (query, update, params) {
return rpcClient.request(
"dbUpdate",
collectionName,
query,
update,
params,
);
});
wrap.bulk = resetInterceptor(function (bulk) {
return rpcClient.request("dbBulk", collectionName, bulk);
});
wrap.findEx = resetInterceptor(function (query, opts) {
return rpcClient.request("dbFindEx", collectionName, query, opts);
});
return wrap;
}

config.common.dbCollections.forEach(
(i) => (exports.db[i] = wrapCollection(i)),
);

resetAllData = (): Defer => rpcClient.request("dbResetAllData");

Object.assign(queue, {
fetch: resetInterceptor(rpcClient.request.bind(rpcClient, "queueFetch")),
add: resetInterceptor(rpcClient.request.bind(rpcClient, "queueAdd")),
addMulti: resetInterceptor(
rpcClient.request.bind(rpcClient, "queueAddMulti"),
),
markDone: resetInterceptor(
rpcClient.request.bind(rpcClient, "queueMarkDone"),
),
whenAllDone: resetInterceptor(
rpcClient.request.bind(rpcClient, "queueWhenAllDone"),
),
reset: resetInterceptor(rpcClient.request.bind(rpcClient, "queueReset")),
});

Object.assign(env, {
get: resetInterceptor(rpcClient.request.bind(rpcClient, "dbEnvGet")),
mget: resetInterceptor(rpcClient.request.bind(rpcClient, "dbEnvMget")),
set: resetInterceptor(rpcClient.request.bind(rpcClient, "dbEnvSet")),
setex: resetInterceptor(rpcClient.request.bind(rpcClient, "dbEnvSetex")),
expire: resetInterceptor(rpcClient.request.bind(rpcClient, "dbEnvExpire")),
ttl: resetInterceptor(rpcClient.request.bind(rpcClient, "dbEnvTtl")),
del: resetInterceptor(rpcClient.request.bind(rpcClient, "dbEnvDel")),
hmget: resetInterceptor(rpcClient.request.bind(rpcClient, "dbEnvHmget")),
hmset: resetInterceptor(rpcClient.request.bind(rpcClient, "dbEnvHmset")),
hget: resetInterceptor(rpcClient.request.bind(rpcClient, "dbEnvHget")),
hset: resetInterceptor(rpcClient.request.bind(rpcClient, "dbEnvHset")),
sadd: resetInterceptor(rpcClient.request.bind(rpcClient, "dbEnvSadd")),
smembers: resetInterceptor(
rpcClient.request.bind(rpcClient, "dbEnvSmembers"),
),
});

Object.assign(pubSub, {
publish: resetInterceptor(rpcClient.request.bind(rpcClient, "publish")),
subscribe(channel: string, cb: (...args: unknown[]) => void) {
rpcClient.subscribe(channel, cb);
},
});

connected = true;

defer.resolve();

socket.on("error", (err) => {
console.error("Storage connection lost", err);
resetDefer.resolve("reset");
exports._connected = false;
setTimeout(exports._connect, 1000);
});
socket.on("end", () => {
console.error("Storage connection lost");
resetDefer.resolve("reset");
exports._connected = false;
setTimeout(exports._connect, 1000);
});

return defer.defer;
}
2 changes: 1 addition & 1 deletion common/src/configs/dbCollections.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export const dbCollections: string[] = ["users", "rooms"];
export const dbCollections = ["users", "rooms"] as const;
39 changes: 31 additions & 8 deletions common/src/types.d.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
type Frame = { data: Buffer; pointer: number; size?: number };
import { dbCollections } from "@/configs/dbCollections";

type RpcServerFrameObj = {
export type Frame = { data: Buffer; pointer: number; size?: number };

export type RpcServerFrameObj = {
method: string;
channel: string;
id: string;
args: unknown[];
};

type RpcClientFrameObj = {
export type RpcClientFrameObj = {
id: string;
error?: Error;
result?: object;
pubSub: ChildProcess;
};

type RpcMethods = {
export type RpcMethods = {
subscribe(
channel: string,
listener: (data: { channel: string; data: unknown }) => void,
Expand All @@ -23,12 +25,33 @@ type RpcMethods = {
[name: string]: (cb?: CallBack, ...args) => void;
};

type RpcResponse = {
export type RpcResponse = {
id: string;
error?: Error;
result?: object;
};

type Defer = { defer: Promise<unknown>; reject: Reject; resolve: Resolve };
type Reject = (reason?: Error) => void;
type Resolve = (value?: string) => void;
export type Defer = {
defer: Promise<unknown>;
reject: Reject;
resolve: Resolve;
};
export type Reject = (reason?: Error) => void;
export type Resolve = (value?: string) => void;

export type DataBaseStorage = {
[K in (typeof dbCollections)[number]]: DataBaseObject;
};
export type DataBaseObject = { [K in DataBaseMethod]: (...args) => Defer };
export type DataBaseMethod =
| "find"
| "findOne"
| "by"
| "clear"
| "count"
| "ensureIndex"
| "removeWhere"
| "insert"
| "update"
| "bulk"
| "findEx";
55 changes: 55 additions & 0 deletions common/tests/Storage.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { RpcServer } from "@/Rpc/RpcServer";
import * as storage from "@/Storage";
import { Server, createServer } from "net";
import _ from "lodash";
import { DataBase, PubSub, Queue } from "@neo-screeps/storage";
import fs from "fs";

const OLD_ENV = process.env;
let server: Server;
let db: DataBase;
beforeAll(() => {
process.env = { ...OLD_ENV }; // Make a copy
process.env.STORAGE_PORT = "8080";
process.env.STORAGE_HOST = "localhost";
process.env.DB_PATH = "tmp/db.json";
});
beforeEach(() => {
fs.mkdirSync("tmp/", { recursive: true });

server = createServer((socket) => {
const pubSub = new PubSub();
const pubSubConnection = pubSub.create();
db = new DataBase();
db.loadDb();
const queue = new Queue();
new RpcServer(
socket,
_.extend({}, pubSubConnection.methods, db.toObject(), queue.toObject()),
);
});

server.listen(
parseInt(process.env.STORAGE_PORT!),
process.env.STORAGE_HOST || "localhost",
);
});

afterEach(async () => {
db.db!.close();
server.close();
fs.rmSync("tmp/db.json", { recursive: true, force: true });
}, 5000);

afterAll(() => {
process.env = OLD_ENV; // Restore old environment
fs.rmSync("tmp/", { recursive: true, force: true });
});

test("Storage", async () => {
await storage.storageConnect();
await storage.db.rooms.insert({ name: "Loki", age: 1 }).defer;
await storage.db.rooms
.find({ age: { $gt: 0 } })
.defer.then((result) => expect((result as Array<unknown>).length).toBe(1));
});
33 changes: 33 additions & 0 deletions storage/src/Database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,39 @@ export class DataBase {
this.recursReplaceNeNull(val[i]);
}
}

public toObject(): object {
const originalClass = this || {};
const keys = [
"dbRequest",
"dbUpdate",
"dbBulk",
"dbFindEx",
"dbEnvGet",
"dbEnvMGet",
"dbEnvSet",
"dbEnvSetEx",
"dbEnvExpire",
"dbEnvTtl",
"dbEnvDel",
"dbEnvHMGet",
"dbEnvHMGet",
"dbEnvHGet",
"dbEnvHSet",
"dbEnvSAdd",
"dbEnvSMembers",
];
return keys.reduce(
(classAsObj: { [name: string]: () => void }, key: string) => {
classAsObj[key] = (
originalClass as unknown as { [name: string]: () => void }
)[key].bind(originalClass);
return classAsObj;
},
{},
);
}

public dbRequest(
collectionName: string,
method: Method,
Expand Down
Loading

0 comments on commit 3554e2b

Please sign in to comment.