Skip to content

Commit

Permalink
修改分布式服务器间文件同步丢包的bug
Browse files Browse the repository at this point in the history
修改分布式服务器间文件同步丢包的bug
  • Loading branch information
qieangel2013 committed Aug 30, 2016
1 parent 667393a commit be41346
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 32 deletions.
18 changes: 5 additions & 13 deletions distributed/server/DistributedClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ public function __construct()
public function addServerClient($address)
{
$client = new swoole_client(SWOOLE_TCP, SWOOLE_SOCK_ASYNC);
$client->set(array(
'socket_buffer_size' => 1024 * 1024 * 2,
'open_eof_check' => true,
'package_eof' => "\r\n\r\n"
));
$client->on('Connect', array(
&$this,
'onConnect'
Expand Down Expand Up @@ -76,14 +71,11 @@ public function onConnect($serv)

public function onReceive($client, $data)
{
/*$remote_info=json_decode($data, true);
if($remote_info['code']==10002){
$this->c_client_pool[ip2long($remote_info['fd'])]= array('fd' =>$fd,'client'=>$client);
}*/
//$remote_info=json_decode($data, true)
// start a task
//$serv->task(json_encode($param));
$remote_info = json_decode($data, true);
if ($remote_info['type'] == 'filesizemes') {
if ($client->sendfile(MYPATH . $remote_info['data']['path'])) {
}
}
}
public function onTask($serv, $task_id, $from_id, $data)
{
Expand Down
69 changes: 50 additions & 19 deletions distributed/server/DistributedServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class DistributedServer
private $localip;
private $connectioninfo;
private $curpath;
private $filesizes;
private $tmpdata;
private $oldpath;
public function __construct()
{
$this->table = new swoole_table(1024);
Expand All @@ -38,25 +41,15 @@ public function __construct()
$server->set(array(
'worker_num' => 4,
'task_worker_num' => 4,
'dispatch_mode' => 4, //1: 轮循, 3: 争抢
//'open_length_check' => true, //打开包长检测
//'package_max_length' => 8192000, //最大的请求包长度,8M
//'package_length_type' => 'N', //长度的类型,参见PHP的pack函数
//'package_length_offset' => 0, //第N个字节是包长度的值
//'package_body_offset' => 0, //从第几个字节计算长度
'dispatch_mode' => 4,
'daemonize' => true,
'log_file' => $distributed_config['logfile']
));
} else {
$server->set(array(
'worker_num' => 4,
'task_worker_num' => 4,
'dispatch_mode' => 4, //1: 轮循, 3: 争抢
//'open_length_check' => true, //打开包长检测
//'package_max_length' => 8192000, //最大的请求包长度,8M
//'package_length_type' => 'N', //长度的类型,参见PHP的pack函数
// 'package_length_offset' => 0, //第N个字节是包长度的值
//'package_body_offset' => 0, //从第几个字节计算长度
'dispatch_mode' => 4,
'daemonize' => true
));
}
Expand Down Expand Up @@ -147,11 +140,42 @@ public function onReceive($serv, $fd, $from_id, $data)
$remote_info = json_decode($data, true);
//判断是否为二进制图片流
if (!is_array($remote_info)) {
if (is_dir(MYPATH . dirname($this->curpath['path'])) && is_readable(MYPATH . dirname($this->curpath['path']))) {
} else {
mkdir(MYPATH . dirname($this->curpath['path']), 0777, true);
if (isset($this->curpath['path'])) {
if (is_dir(dirname(MYPATH . $this->curpath['path'])) && is_readable(dirname(MYPATH . $this->curpath['path']))) {
} else {
mkdir(dirname(MYPATH . $this->curpath['path']), 0777, true);
}
if ($this->oldpath != $this->curpath['path']) {
$this->tmpdata .= $data;
}
if (strlen($this->tmpdata) == $this->filesizes) {
$infofile = pathinfo(MYPATH . $this->curpath['path']);
if (in_array($infofile['extension'], array(
'txt',
'log'
))) {
if (file_put_contents(MYPATH . $this->curpath['path'], $this->tmpdata)) {
$this->tmpdata = '';
$this->oldpath = $this->curpath['path'];
}
} else {
if (in_array($infofile['extension'], array(
'jpg',
'png',
'jpeg',
'JPG',
'JPEG',
'PNG',
'bmp'
))) {
if (file_put_contents(MYPATH . $this->curpath['path'], $this->tmpdata)) {
$this->tmpdata = '';
$this->oldpath = $this->curpath['path'];
} //写入图片流
}
}
}
}
file_put_contents(MYPATH . $this->curpath['path'], $data, FILE_APPEND); //写入图片流
} else {
if ($remote_info['type'] == 'system' && $remote_info['data']['code'] == 10001) {
if ($this->client_a != $remote_info['data']['fd']) {
Expand Down Expand Up @@ -186,21 +210,28 @@ public function onReceive($serv, $fd, $from_id, $data)
}
$serv->send($fd, $serv->taskwait($remote_info['data']));
} else {
print_r($remote_info);
$serv->task($remote_info['data']);
}
break;
case 'file':
if ($this->localip == $this->connectioninfo['remote_ip']) {
foreach ($this->b_server_pool as $k => $v) {
$v['client']->send($data);
$v['client']->sendfile(MYPATH . $remote_info['data']['path']);
}
$serv->send($fd, $serv->taskwait($remote_info['data']));
} else {
if (isset($remote_info['data']['path'])) {
$this->curpath = $remote_info['data'];
$this->curpath = $remote_info['data'];
$this->filesizes = $remote_info['data']['size'];
$data_s = array(
'type' => 'filesizemes',
'data' => array(
'path' => $remote_info['data']['path']
)
);
$serv->send($fd, json_encode($data_s, true));
}

$serv->task($remote_info['data']);
}
break;
Expand Down
Binary file added public/images/weixin.jpeg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit be41346

Please sign in to comment.