Skip to content

Commit

Permalink
feat&fix(Client.php): fixed waitRpc method expectations from the resu…
Browse files Browse the repository at this point in the history
…lt and add reconnect and basic recover functionals in the catch block
  • Loading branch information
sobirjonovs committed Nov 21, 2023
1 parent c3f8a7b commit e7ce4c6
Showing 2 changed files with 15 additions and 12 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
"description": "Easy tool for working with RabbitMQ",
"type": "library",
"require": {
"php": "^7.3|^8.0",
"php": "^7.4|^8.0",
"php-amqplib/php-amqplib": "^3.1",
"illuminate/support": ">8"
},
25 changes: 14 additions & 11 deletions src/Rabbit/Client.php
Original file line number Diff line number Diff line change
@@ -2,21 +2,22 @@

namespace App\Rabbitmq\Rabbit;

use App\Rabbitmq\Exceptions\DeadLetterHandler;
use App\Rabbitmq\Exceptions\InvalidLetterHandler;
use Closure;
use Illuminate\Validation\ValidationException;
use Rabbitmq;
use Exception;
use Throwable;
use ErrorException;
use PhpAmqpLib\Wire\AMQPTable;
use PhpAmqpLib\Message\AMQPMessage;
use Illuminate\Support\Facades\DB;
use PhpAmqpLib\Channel\AMQPChannel;
use Illuminate\Database\Eloquent\Model;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Channel\AbstractChannel;
use Illuminate\Database\Eloquent\Model;
use App\Rabbitmq\Contracts\RabbitContract;
use App\Rabbitmq\Exceptions\DeadLetterHandler;
use Illuminate\Validation\ValidationException;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Rabbitmq\Exceptions\InvalidLetterHandler;

class Client implements RabbitContract
{
@@ -178,9 +179,7 @@ public function request(string $queue = null): Client
$message->ack(true);
});

$client = $this->waitRpc();

return $client;
return $this->waitRpc();
}

/**
@@ -269,12 +268,17 @@ public function waitRpc(): Client
try {
$channel = $this->getRpcChannel();

while (!$this->result) {
while (! is_array($this->result) && blank($this->result)) {
$channel->wait(null, false, config('amqp.channel_rpc_timeout'));
}

return $this->stopRpc()->disableRpc();
} catch (Throwable $exception) {
DB::reconnect();

$this->getChannel()->basic_recover(true);
$this->getConnection()->reconnect();

$this->stopRpc()->disableRpc();

throw new Exception('Service is not responding');
@@ -349,7 +353,6 @@ public function dispatchEvents(AMQPMessage $message): Client
->setMessage($result);

if ($this->isMultiQueue()) {

$client = $client->disableMultiQueue()
->publish($message->get('reply_to'))
->enableMultiQueue();
@@ -835,4 +838,4 @@ protected function extract(string $key, AMQPMessage $message)
{
return data_get($message->get('application_headers')->getNativeData(), $key);
}
}
}

0 comments on commit e7ce4c6

Please sign in to comment.