From 7e73a497fa065ce3223f7bd66b5e5059d483024b Mon Sep 17 00:00:00 2001 From: Adolfo Segura Hall Date: Fri, 23 Aug 2019 01:43:14 +0200 Subject: [PATCH] mongo store servers users, and logs --- laravel-echo-server.json | 2 +- src/channels/baseAuthChannel.ts | 3 +- src/channels/channel.ts | 10 +- src/channels/commandChannel.ts | 3 +- src/channels/presence-channel.ts | 9 +- src/channels/private-channel.ts | 3 +- src/channels/rootChannel.ts | 3 +- src/database/database-driver.ts | 21 +++++ src/database/database.ts | 17 +++- src/database/mongo.ts | 152 ++++++++++++++++++++++++------- src/default-options.ts | 2 +- src/echo-server.ts | 36 ++++++-- src/log/log_interface.ts | 1 - src/log/logger.ts | 24 +++++ src/server.ts | 4 +- src/utils/ioUtils.ts | 20 +++- 16 files changed, 246 insertions(+), 64 deletions(-) create mode 100644 src/log/logger.ts diff --git a/laravel-echo-server.json b/laravel-echo-server.json index 81b22ac4..c9f3ab21 100644 --- a/laravel-echo-server.json +++ b/laravel-echo-server.json @@ -20,7 +20,7 @@ "mongo": { "host": "127.0.0.1", "port": "27017", - "dbName": "presence", + "dbName": "echo", "user": null, "password": null } diff --git a/src/channels/baseAuthChannel.ts b/src/channels/baseAuthChannel.ts index bf8b708d..f28a2945 100644 --- a/src/channels/baseAuthChannel.ts +++ b/src/channels/baseAuthChannel.ts @@ -1,4 +1,5 @@ import {Log} from "../log"; +import {Logger} from "../log/logger"; let url = require('url'); const request = require('request'); @@ -22,7 +23,7 @@ export class BaseAuthChannel { * @param options * @param log */ - constructor(protected options: any, protected log: any) { + constructor(protected options: any, protected log: Logger) { this.request = request; } diff --git a/src/channels/channel.ts b/src/channels/channel.ts index 8afcd5ec..4fd0b59f 100644 --- a/src/channels/channel.ts +++ b/src/channels/channel.ts @@ -2,6 +2,8 @@ import {PresenceChannel} from './presence-channel'; import {PrivateChannel} from './private-channel'; import {Log} from './../log'; import {RootChannel} from "./rootChannel"; +import {Database} from "../database"; +import {Logger} from "../log/logger"; export class Channel { /** @@ -32,11 +34,11 @@ export class Channel { /** * Create a new channel instance. */ - constructor(private io, private options, protected log) { + constructor(private io: any, private options: any, protected log: Logger, protected db: Database) { - this.private = new PrivateChannel(options, this.log); - this.rootChannel = new RootChannel(options, this.log); - this.presence = new PresenceChannel(io, options, this.log); + this.private = new PrivateChannel(this.options, this.log); + this.rootChannel = new RootChannel(this.options, this.log); + this.presence = new PresenceChannel(this.io, this.options, this.log, this.db); Log.success('Channels are ready.'); diff --git a/src/channels/commandChannel.ts b/src/channels/commandChannel.ts index d767f4bb..b6953ba1 100644 --- a/src/channels/commandChannel.ts +++ b/src/channels/commandChannel.ts @@ -1,9 +1,10 @@ import {IoUtils} from "../utils/ioUtils"; import {Log} from "../log"; +import {Logger} from "../log/logger"; export class CommandChannel { - constructor(private options: any, protected io: any, protected log: any){ + constructor(private options: any, protected io: any, protected log: Logger){ } diff --git a/src/channels/presence-channel.ts b/src/channels/presence-channel.ts index c9e9d1d6..d859634e 100644 --- a/src/channels/presence-channel.ts +++ b/src/channels/presence-channel.ts @@ -1,19 +1,16 @@ +import {Logger} from "../log/logger"; + var _ = require('lodash'); import { Channel } from './channel'; import { Database } from './../database'; import { Log } from './../log'; export class PresenceChannel { - /** - * Database instance. - */ - db: Database; /** * Create a new Presence channel instance. */ - constructor(private io, private options: any, protected log: any) { - this.db = new Database(this.options); + constructor(private io, private options: any, protected log: Logger, protected db: Database) { } /** diff --git a/src/channels/private-channel.ts b/src/channels/private-channel.ts index 88ada871..da788d04 100644 --- a/src/channels/private-channel.ts +++ b/src/channels/private-channel.ts @@ -1,11 +1,12 @@ import {BaseAuthChannel} from "./baseAuthChannel"; +import {Logger} from "../log/logger"; export class PrivateChannel extends BaseAuthChannel { /** * Create a new private channel instance. */ - constructor(protected options: any, protected log: any) { + constructor(protected options: any, protected log: Logger) { super(options, log) } } diff --git a/src/channels/rootChannel.ts b/src/channels/rootChannel.ts index 9c69ff82..e2a13d7d 100644 --- a/src/channels/rootChannel.ts +++ b/src/channels/rootChannel.ts @@ -1,11 +1,12 @@ import {BaseAuthChannel} from "./baseAuthChannel"; +import {Logger} from "../log/logger"; export class RootChannel extends BaseAuthChannel { /** * Create a new private channel instance. */ - constructor(options: any, log: any) { + constructor(options: any, log: Logger) { super(options, log); } diff --git a/src/database/database-driver.ts b/src/database/database-driver.ts index 3f7febea..de8e86a2 100644 --- a/src/database/database-driver.ts +++ b/src/database/database-driver.ts @@ -3,6 +3,19 @@ */ export interface DatabaseDriver { + /** + * set user on new Root channel Auth connection + */ + setUserInServer(key: string, value: any): void; + + /** + * Delete user from DB base on socket_id + * + * @param collection + * @param socket_id + */ + delUserInServerBySocketId(collection: string, socket_id: any): void; + /** * get all members in channel */ @@ -45,4 +58,12 @@ export interface DatabaseDriver { * @param sockets array, active array socketsId on Io Channel */ removeInactive(channel: string, sockets: any): Promise; + + /** + * Remove inactive sockets from this Io server + * + * @param collection + * @param sockets + */ + removeInactiveSocketsInThisServer(collection: string, sockets: any): Promise; } diff --git a/src/database/database.ts b/src/database/database.ts index 2cbd2ad3..b72cadf2 100644 --- a/src/database/database.ts +++ b/src/database/database.ts @@ -1,5 +1,6 @@ import { DatabaseDriver } from './database-driver'; import {MongoDatabase} from "./mongo"; +import {Logger} from "../log/logger"; /** * Class that controls the key/value data store. @@ -13,8 +14,8 @@ export class Database implements DatabaseDriver { /** * Create a new Mongo database instance. */ - constructor(private options: any) { - this.driver = new MongoDatabase(this.options); + constructor(private options: any, protected log: Logger) { + this.driver = new MongoDatabase(this.options, this.log); } @@ -45,4 +46,16 @@ export class Database implements DatabaseDriver { removeInactive(channel: string, member: any): Promise{ return this.driver.removeInactive(channel, member); } + + removeInactiveSocketsInThisServer(collection: string, sockets: any): Promise{ + return this.driver.removeInactiveSocketsInThisServer(collection, sockets); + } + + setUserInServer(collection: string, user: any): void{ + return this.driver.setUserInServer(collection, user); + }; + + delUserInServerBySocketId(collection: string, socket_id: any): void { + return this.driver.delUserInServerBySocketId(collection, socket_id); + } } diff --git a/src/database/mongo.ts b/src/database/mongo.ts index b4b41e38..12194c7c 100644 --- a/src/database/mongo.ts +++ b/src/database/mongo.ts @@ -13,16 +13,13 @@ export class MongoDatabase implements DatabaseDriver { /** * Create a new cache instance. */ - constructor(private options) { + constructor(private options: any, protected log: any) { - this._mongo = new MongoClient( - this.url(), - { + this._mongo = new MongoClient(this.url(), { useNewUrlParser: true, useUnifiedTopology: true }); - this.connect(this.options.databaseConfig.mongo.dbName); } @@ -35,7 +32,9 @@ export class MongoDatabase implements DatabaseDriver { this._mongo.connect((err) => { if(err) { - Log.error('Connection to Mongo has Failed') + const msg = `Mongo: Connection to Mongo Server:${this.url()} Database:${dbName} has Failed`; + Log.error(msg); + this.log.error(msg); } Log.success("Connected successfully to Mongo server"); @@ -45,12 +44,14 @@ export class MongoDatabase implements DatabaseDriver { } /** - * Set up Url mongo + * Set up Mongo URL + * + * @return string 'mongodb://user:pass@host:port' */ private url(): string{ if(! this.options.databaseConfig.mongo) - throw new Error('No database Mongo config found!'); + throw new Error('Mongo: Url Error: database Mongo config found!'); let url = 'mongodb://'; @@ -73,42 +74,40 @@ export class MongoDatabase implements DatabaseDriver { /** - * Store data to cache. + * Set new Member on presence channel + * + * @param channel + * @param member */ setMember(channel: string, member: any): void { - Log.success(`MONGO SetMember on Channel: ${channel}, Member: ${JSON.stringify(member)}`) - this.db.collection(channel).insertOne(member, (err, res) =>{ - if(err) { - Log.error('Mongo SetMember ' + err.message) - } - }); + Log.success(`MONGO SetMember on Channel: ${channel}, Member: ${JSON.stringify(member)}`); + this.insertOne(channel, member); } /** - * Store data to cache. + * Delete Member from presence channel + * + * @param channel + * @param member */ delMember(channel: string, member: any): void { - Log.success(`MONGO DelMember on Channel: ${channel}, Member: ${JSON.stringify(member)}`) - Log.success(`MONGO DElELETE Member MemberID: ${member[0].user_id} , SOcketId: ${member[0].socketId}`) - this.db.collection(channel).deleteOne({user_id: member[0].user_id}, (err, res) =>{ - if(err) { - Log.error('Mongo DelMember ' + err.message) - } + Log.success(`MONGO Delete Member MemberID: ${member[0].user_id} , SocketId: ${member[0].socketId}`); - Log.success(`MONGO Delete Member on Channel: ${channel}, RESPONSE: ${JSON.stringify(res)}`) - }); + this.deleteOne(channel, {user_id: member[0].user_id}); } /** * Retrieve data from redis. */ isMember(channel: string, member: any): Promise { - Log.success(`MONGO IS_Member on Channel: ${channel}, Member: ${JSON.stringify(member)}`) + Log.success(`MONGO IS_Member on Channel: ${channel}, Member: ${JSON.stringify(member)}`); return new Promise((resolve, reject) => { this.db.collection(channel).find({user_id: member.user_id}).toArray((err, res) =>{ if(err) { - Log.error('Mongo IsMember ' + err.message) + const msg = `Mongo: isMember Error:${err.message}`; + this.log.error(msg); + Log.error(msg); return reject(err) } return resolve(res) @@ -120,11 +119,13 @@ export class MongoDatabase implements DatabaseDriver { * Retrieve data from redis. */ getMember(channel: string, member: any): Promise { - Log.success(`MONGO GETMember on Channel: ${channel}, Member: ${JSON.stringify(member)}`) + Log.success(`MONGO GETMember on Channel: ${channel}, Member: ${JSON.stringify(member)}`); return new Promise((resolve, reject) => { this.db.collection(channel).find({user_id: member.user_id}).toArray((err, res) =>{ if(err) { - Log.error('Mongo IsMember ' + err.message) + const msg = `Mongo: getMember Error:${err.message}`; + Log.error(msg); + this.log.error(msg); return reject(err) } return resolve(res) @@ -140,7 +141,9 @@ export class MongoDatabase implements DatabaseDriver { return new Promise((resolve, reject) => { this.db.collection(channel).find({socketId: socketId}).toArray((err, res) =>{ if(err) { - Log.error('Mongo getMemberBySocketId ' + err.message) + const msg = `Mongo: getMemberBySocketId Error:${err.message}`; + Log.error(msg); + this.log.error(msg); return reject(err) } return resolve(res) @@ -152,21 +155,23 @@ export class MongoDatabase implements DatabaseDriver { * Retrieve data from redis. */ getMembers(channel: string): Promise { - Log.success(`MONGO GETMemberS on Channel: ${channel}`) + Log.success(`MONGO GETMemberS on Channel: ${channel}`); return new Promise((resolve, reject) => { this.db.collection(channel).find({}).toArray((err, res) =>{ if(err) { - Log.error('Mongo getMembers ' + err.message) + const msg = `Mongo: getMembers Error:${err.message}`; + Log.error(msg); + this.log.error(msg); return reject(err) } - Log.success(`MONGO GET MEMBERS on Channel: ${channel}, Members: ${JSON.stringify(res)}`) + Log.success(`MONGO GET MEMBERS on Channel: ${channel}, Members: ${JSON.stringify(res)}`); return resolve(res) }); }); } /** - * removeInactive + * remove Inactive sockets from presence channel * * @param channel * @param sockets array active io sockets on this channel @@ -174,9 +179,88 @@ export class MongoDatabase implements DatabaseDriver { removeInactive(channel: string, sockets: any): Promise{ return new Promise((resolve, reject) => { this.db.collection(channel).deleteMany({socketId: { $nin: sockets }}, (err, res) => { - if(err) return reject(err); + if(err) { + const msg = `Mongo: removeInactive Error:${err.message}`; + Log.error(msg); + this.log.error(msg); + return reject(err); + } + return resolve(); + }) + }); + } + + /** + * remove Inactive sockets from presence channel + * + * @param collection + * @param sockets array active io sockets on this channel + */ + removeInactiveSocketsInThisServer(collection: string, sockets: any): Promise{ + return new Promise((resolve, reject) => { + this.db.collection(collection).deleteMany({socket_id: { $nin: sockets }}, (err, res) => { + if(err) { + const msg = `Mongo: removeInactiveSocketsInThisServer Error:${err.message}`; + Log.error(msg); + this.log.error(msg); + return reject(err); + } return resolve(); }) }); } + + /** + * set user in server + * + * @param collection + * @param user + */ + setUserInServer(collection: string, user: any): void{ + this.insertOne(collection, user); + }; + + /** + * Delete user in server by SocketId + * + * @param collection + * @param socket_id + */ + delUserInServerBySocketId(collection: string, socket_id: any): void { + this.deleteOne(collection, {socket_id: socket_id}) + } + + /** + * Insert One + * + * @param collection + * @param data + */ + private insertOne(collection: string, data: any): void{ + this.db.collection(collection).insertOne(data, (err, res) =>{ + if(err) { + const msg = `Mongo: insertOne Error:${err.message}`; + Log.error(msg); + this.log.error(msg); + throw new Error(msg); + } + }); + } + + /** + * Delete One + * + * @param collection + * @param data {foo: bar} + */ + private deleteOne(collection: string, data: any): void{ + this.db.collection(collection).deleteOne(data, (err, res) =>{ + if(err) { + const msg = `Mongo: deleteOne Error:${err.message}`; + Log.error(msg); + this.log.error(msg); + throw new Error(msg); + } + }); + } } diff --git a/src/default-options.ts b/src/default-options.ts index 3b8a54de..00ee18c9 100644 --- a/src/default-options.ts +++ b/src/default-options.ts @@ -17,7 +17,7 @@ export const options = { "mongo": { "host": "127.0.0.1", "port": "27017", - "dbName": "presence", + "dbName": "echo", "user": null, "password": null }, diff --git a/src/echo-server.ts b/src/echo-server.ts index b50415f1..86d1c83a 100644 --- a/src/echo-server.ts +++ b/src/echo-server.ts @@ -8,6 +8,8 @@ import {IoUtils} from "./utils/ioUtils"; const packageFile = require('../package.json'); import {FsUtils} from "./utils/fsUtils"; import {CommandChannel} from "./channels/commandChannel"; +import {Database} from "./database"; +import {Logger} from "./log/logger"; const defaultOptions = FsUtils.getConfigfile(); @@ -38,7 +40,10 @@ export class EchoServer { private httpApi: HttpApi; /** Log to syslog */ - protected log: any; + protected log: Logger; + + /** Database instance .*/ + protected db: Database; /** Create a new instance. */ constructor() { @@ -55,12 +60,14 @@ export class EchoServer { this.options = Object.assign(this.defaultOptions, options); - this.log = new Bunyan(this.options); + this.log = new Logger(this.options); this.startup(); + this.db = new Database(this.options, this.log); this.server = new Server(this.options, this.log); + this.server.init().then(io => { this.init(io).then(() => { Log.info('\nServer ready!\n'); @@ -87,7 +94,8 @@ export class EchoServer { */ init(io: any): Promise { return new Promise((resolve, reject) => { - this.channel = new Channel(io, this.options, this.log); + + this.channel = new Channel(io, this.options, this.log, this.db); this.commandChannel = new CommandChannel(this.options, io, this.log); this.subscribers = []; @@ -109,9 +117,10 @@ export class EchoServer { */ startup(): void { Log.title(`\nL A R A V E L E C H O S E R V E R C L U S T E R\n`); - Log.info(`version ${packageFile.version}\n`); + Log.info(`version ${packageFile.version} Cluster \n`); Log.info(`Starting server in ${this.options.devMode ? 'DEV' : 'PROD'} mode`, true); + Log.success(`Log Mode is ${this.options.log} mode`, true); this.log.info(`Starting server in ${this.options.devMode ? 'DEV' : 'PROD'} mode`) } @@ -211,12 +220,23 @@ export class EchoServer { //console.log(socket.adapter.nsp.sockets) socket.user_id = auth.channel_data.user_id; + const ip = IoUtils.getIp(socket, this.options); Log.success(`LOG Success on Server: ${this.server.getServerId()}`); - //IoUtils.setActiveUserOnServer(this.server.getServerId(), auth.channel_data.user_id) + //IoUtils.setActiveUserOnServer(this.server.getServerId(), + // {user_id: auth.channel_data.user_id, socket_id: socket.id}) + // collection echo_users, {user_id:1, socket_id:2ff, server_id: foo1 }) + this.db.setUserInServer('echo_users', { + user_id: auth.channel_data.user_id, + socket_id: socket.id, + server_id: this.server.getServerId(), + ip: ip, + date: new Date() + }); if(this.options.multiple_sockets == false) { Log.success(`close_all_user_sockets_except_this_socket ${socket.id}`); + IoUtils.close_all_user_sockets_except_this_socket( auth.channel_data.user_id, socket.id, @@ -225,7 +245,10 @@ export class EchoServer { ); } - const ip = IoUtils.getIp(socket, this.options); + this.db.removeInactiveSocketsInThisServer( + 'echo_users', + IoUtils.getAllActiveSocketsInThisIoServer(this.server.io) + ); Log.success(`AUTH Success ON NSP / User Id:${socket.user_id} SocketID: ${socket.id} with IP:${ip}`); this.log.info(`Auth Success ON NSP / User Id:${socket.user_id} with Socket:${socket.id} with IP:${ip}`); @@ -277,6 +300,7 @@ export class EchoServer { */ onDisconnecting(socket: any): void { socket.on('disconnecting', (reason) => { + this.db.delUserInServerBySocketId('echo_users', socket.id); Object.keys(socket.rooms).forEach(room => { if (room !== socket.id) { this.channel.leave(socket, room, reason); diff --git a/src/log/log_interface.ts b/src/log/log_interface.ts index 1a0bb7c4..b54b8ea7 100644 --- a/src/log/log_interface.ts +++ b/src/log/log_interface.ts @@ -2,5 +2,4 @@ export interface Log_interface { info(data: any): void; error(data: any): void; - } diff --git a/src/log/logger.ts b/src/log/logger.ts new file mode 100644 index 00000000..e561c963 --- /dev/null +++ b/src/log/logger.ts @@ -0,0 +1,24 @@ +import {Log_interface} from "./log_interface"; +import {Bunyan} from "./bunyan"; + +export class Logger implements Log_interface { + + /** log driver */ + private driver: Bunyan; + + /** + * Create a new Mongo database instance. + */ + constructor(private options: any) { + this.driver = new Bunyan(this.options); + } + + error(data: any): void { + this.driver.error(data); + } + + info(data: any): void { + this.driver.info(data); + } + +} diff --git a/src/server.ts b/src/server.ts index 56eea870..078717c5 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,3 +1,5 @@ +import {Logger} from "./log/logger"; + var fs = require('fs'); var http = require('http'); var https = require('https'); @@ -32,7 +34,7 @@ export class Server { /** * Create a new server instance. */ - constructor(private options, protected log) { + constructor(private options: any, protected log: Logger) { this.server_id = this.generateServerId(); } diff --git a/src/utils/ioUtils.ts b/src/utils/ioUtils.ts index 0345e78a..f5186bb5 100644 --- a/src/utils/ioUtils.ts +++ b/src/utils/ioUtils.ts @@ -1,4 +1,5 @@ import {Log} from "../log"; +import {Logger} from "../log/logger"; export class IoUtils { @@ -85,7 +86,7 @@ export class IoUtils { * @param reason * @param logger */ - static disconnect(socket: any, logger: any, reason: string){ + static disconnect(socket: any, logger: Logger, reason: string){ Log.error(`Disconnect socket:${socket.id}, reason:${reason}`); logger.info(`Disconnect socket:${socket.id}, reason:${reason}`); socket.disconnect(true) @@ -100,7 +101,7 @@ export class IoUtils { static getIp(socket: any, options: any){ if(options.behind_proxy) - return socket.handshake.headers['x-forwarded-for'] + return socket.handshake.headers['x-forwarded-for']; return socket.handshake.address; } @@ -112,7 +113,7 @@ export class IoUtils { * @param io * @param log */ - static close_all_user_sockets(user_id: number, io: any, log: any): void{ + static close_all_user_sockets(user_id: number, io: any, log: Logger): void{ let user = this.findUser(user_id, io); @@ -137,7 +138,7 @@ export class IoUtils { * @param io * @param log */ - static close_all_user_sockets_except_this_socket(user_id: number, socket_id: string, io: any, log: any): void{ + static close_all_user_sockets_except_this_socket(user_id: number, socket_id: string, io: any, log: Logger): void{ let user = this.findUser(user_id, io); @@ -154,4 +155,15 @@ export class IoUtils { ); }); } + + /** + * get All Active Sockets in This IoServer + * + * @param io + * @return array sockets_id + */ + static getAllActiveSocketsInThisIoServer(io: any): any{ + const sockets = io.sockets.clients(); + return Object.keys(sockets.connected); + } }