Skip to content

Commit

Permalink
feat: 增加分布式服务注册支持
Browse files Browse the repository at this point in the history
  • Loading branch information
chaz6chez committed Aug 2, 2024
1 parent 6a5a8bf commit 5f2414c
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 13 deletions.
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@
┌─────────────┐ 2 | 3
┌───> | Push-server | ─── ─ · ─
| └─────────────┘ 1 | 4 ··· n
| Hash |
| Hash | register
| | PUB | SUB
┌────────────────────┐ ──┘ ┌──────────────┐ <────┘
| webman-push-server | ──────> | Redis-server |
└────────────────────┘ ──┐ └──────────────┘ <────┐
| | PUB | SUB
| Hash |
| Hash | register
| ┌────────────┐ 2 | 3
└────> | API-server | ─── ─ · ─
└────────────┘ 1 | 4 ··· n
Expand All @@ -77,6 +77,7 @@
|-- process.php # 启动进程
|-- redis.php # redis配置
|-- route.php # APIs路由信息
|-- registrar.php # 分布式服务注册器配置
```

### 频道说明
Expand Down Expand Up @@ -150,8 +151,6 @@ composer require workbunny/webman-push-server
| POST | /apps/[app_id]/users/[user_id]/terminate_connections | [对应的pusher文档地址](https://pusher.com/docs/channels/library_auth_reference/rest-api/#post-terminate-user-connections) |
| GET | /apps/[app_id]/channels/[channel_name]/users | [对应的pusher文档地址](https://pusher.com/docs/channels/library_auth_reference/rest-api/#get-users) |



## 客户端

### javascript客户端
Expand Down
22 changes: 15 additions & 7 deletions src/Registrars/RedisRegistrar.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class RedisRegistrar implements RegistrarInterface
/** @inheritDoc */
public function get(string $name, string $ip, int $port): ?array
{
$client = Redis::connection('plugin.workbunny.webman-push-server.server-register')->client();
$client = Redis::connection('plugin.workbunny.webman-push-server.server-registrar')->client();
try {
$result = $client->hGetAll($this->_registrarKey($name, $ip, $port));
foreach ($result as &$value) {
Expand All @@ -36,7 +36,7 @@ public function get(string $name, string $ip, int $port): ?array
/** @inheritDoc */
public function query(?string $name): ?array
{
$client = Redis::connection('plugin.workbunny.webman-push-server.server-register')->client();
$client = Redis::connection('plugin.workbunny.webman-push-server.server-registrar')->client();
$hash = [];
try {
while(
Expand Down Expand Up @@ -68,8 +68,12 @@ public function register(string $name, string $ip, int $port, string|null $worke
{
$workerId = $workerId ?: '';
try {
$client = Redis::connection('plugin.workbunny.webman-push-server.server-register')->client();
$res = $client->hSet($key = $this->_registrarKey($name, $ip, $port), $workerId, json_encode($metadata, JSON_UNESCAPED_UNICODE));
$client = Redis::connection('plugin.workbunny.webman-push-server.server-registrar')->client();
$res = $client->hSet(
$key = $this->_registrarKey($name, $ip, $port),
$workerId,
$metadata ? json_encode($metadata, JSON_UNESCAPED_UNICODE) : '{}'
);
// 如果存在定时间隔,则存在定时上报,则开启键值过期
if ($interval = config('plugin.workbunny.webman-push-server.registrar.interval')) {
$client->expire($key, $interval * 1.5);
Expand All @@ -92,8 +96,12 @@ public function report(string $name, string $ip, int $port, string|null $workerI
{
$workerId = $workerId ?: '';
try {
$client = Redis::connection('plugin.workbunny.webman-push-server.server-register')->client();
$res = $client->hSet($key = $this->_registrarKey($name, $ip, $port), $workerId, json_encode($metadata, JSON_UNESCAPED_UNICODE));
$client = Redis::connection('plugin.workbunny.webman-push-server.server-registrar')->client();
$res = $client->hSet(
$key = $this->_registrarKey($name, $ip, $port),
$workerId,
$metadata ? json_encode($metadata, JSON_UNESCAPED_UNICODE) : '{}'
);
// 如果存在定时间隔,则存在定时上报,则开启键值过期
if ($interval = config('plugin.workbunny.webman-push-server.registrar.interval')) {
$client->expire($key, $interval * 1.5);
Expand All @@ -116,7 +124,7 @@ public function unregister(string $name, string $ip, int $port, string|null $wor
{
$workerId = $workerId ?: '';
try {
$client = Redis::connection('plugin.workbunny.webman-push-server.server-register')->client();
$client = Redis::connection('plugin.workbunny.webman-push-server.server-registrar')->client();
return boolval(
$client->hDel($this->_registrarKey($name, $ip, $port), $workerId)
);
Expand Down
4 changes: 2 additions & 2 deletions src/Traits/RegistrarMethods.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public function registrarStart(Worker $worker): void
$port = $this->registrarGetHostPort()
) {
// 注册
$registrar->register($serverName, $ip, $port, $id = (string)$worker->id);
$registrar->register($serverName, $ip, $port, $id = ($worker->id === 0 ? 'master' : strval($worker->id)));
// 定时上报
if ($interval = config('plugin.workbunny.webman-push-server.registrar.interval')) {
$this->registrarTimerId = Timer::add($interval, function () use ($registrar, $serverName, $ip, $port, $id) {
Expand All @@ -70,7 +70,7 @@ public function registrarStop(Worker $worker): void
$ip = $this->registrarGetHostIp() and
$port = $this->registrarGetHostPort()
) {
$registrar->unregister($serverName, $ip, $port, (string)$worker->id);
$registrar->unregister($serverName, $ip, $port, ($worker->id === 0 ? 'master' : strval($worker->id)));
}
}

Expand Down
21 changes: 21 additions & 0 deletions tests/ApiServerBaseTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@

namespace Tests;

use support\Redis;
use Tests\MockClass\MockTcpConnection;
use Webman\Http\Request;
use Webman\Http\Response;
use Workbunny\WebmanPushServer\ApiRoute;
use Workerman\Worker;

/**
* @runTestsInSeparateProcesses
Expand Down Expand Up @@ -131,4 +133,23 @@ public function testApiServerOnMessageWithArrayData()
$this->assertEquals(400, $mockConnection->getSendBuffer()->getStatusCode());
$this->assertEquals('application/json', $mockConnection->getSendBuffer()->getHeader('Content-Type'));
}

public function testApiServerRegistrar()
{
$client = Redis::connection('plugin.workbunny.webman-push-server.server-registrar')->client();

$this->assertEquals([], $client->keys("registrar:*"));

$this->getPushServer()->registrarStart($worker = new Worker());

$this->assertNotEquals([], $keys = $client->keys("registrar:*"));
$this->assertEquals([
'master' => '{}'
], $client->hGetAll($key = $keys[0]));

$this->getPushServer()->registrarStop($worker);

$this->assertEquals(false, $client->exists($key));
$this->assertEquals([], $client->keys("registrar:*"));
}
}
21 changes: 21 additions & 0 deletions tests/PushServerBaseTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@

namespace Tests;

use support\Redis;
use Tests\MockClass\MockTcpConnection;
use Workbunny\WebmanPushServer\Events\Ping;
use Workbunny\WebmanPushServer\Events\Subscribe;
use Workbunny\WebmanPushServer\PublishTypes\AbstractPublishType;
use Workbunny\WebmanPushServer\PushServer;
use Workerman\Worker;
use const Workbunny\WebmanPushServer\EVENT_CONNECTION_ESTABLISHED;
use const Workbunny\WebmanPushServer\EVENT_ERROR;
use const Workbunny\WebmanPushServer\EVENT_PONG;
Expand Down Expand Up @@ -485,4 +487,23 @@ public function testPushServerSubscribeResponse()
$wsConnection->setSendBuffer(null);
}

public function testPushServerRegistrar()
{
$client = Redis::connection('plugin.workbunny.webman-push-server.server-registrar')->client();

$this->assertEquals([], $client->keys("registrar:*"));

$this->getPushServer()->registrarStart($worker = new Worker());

$this->assertNotEquals([], $keys = $client->keys("registrar:*"));
$this->assertEquals([
'master' => '{}'
], $client->hGetAll($key = $keys[0]));

$this->getPushServer()->registrarStop($worker);

$this->assertEquals(false, $client->exists($key));
$this->assertEquals([], $client->keys("registrar:*"));
}

}

0 comments on commit 5f2414c

Please sign in to comment.