Skip to content

Commit

Permalink
Fix client has already been bound to another coroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
louislivi committed Dec 21, 2018
1 parent be063b2 commit 3879dd8
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 55 deletions.
30 changes: 30 additions & 0 deletions src/Base.php
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,36 @@ public function parseDbConfig(array $_config)
return $config['databases'];
}

/**
* 协程pop
*
* @param $chan
* @param int $timeout
*
* @return bool
*/
protected static function coPop($chan, $timeout = 0)
{
if (version_compare(swoole_version(), '4.0.3', '>=')) {
return $chan->pop($timeout);
} else {
if (0 == $timeout) {
return $chan->pop();
} else {
$writes = [];
$reads = [$chan];
$result = $chan->select($reads, $writes, $timeout);
if (false === $result || empty($reads)) {
return false;
}

$readChannel = $reads[0];

return $readChannel->pop();
}
}
}

protected static function writeErrMessage(int $id, string $msg, int $errno = 0, $sqlState = 'HY000')
{
$err = new ErrorPacket();
Expand Down
53 changes: 14 additions & 39 deletions src/MysqlPool/MySQLPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,12 @@ public static function fetch(string $connName, \swoole_server $server, int $fd)
$client = self::coPop(self::$yieldChannel[$connName], self::$connsConfig[$connName]['serverInfo']['timeout']);
if (false === $client) {
--self::$pendingFetchCount[$connName];
throw new MySQLException('Reach max connections! Cann\'t pending fetch!');
$message = 'SMProxy@Reach max connections! Cann\'t pending fetch!';
$errMessage = self::writeErrMessage(1, $message, ErrorCode::ER_HAS_GONE_AWAY);
if ($server->exist($fd)) {
$server->send($fd, getString($errMessage));
}
throw new MySQLException($message);
}
--self::$resumeFetchCount[$connName];
if (!empty($connsPool)) {
Expand Down Expand Up @@ -193,11 +198,7 @@ public static function initConn(\swoole_server $server, int $fd, string $connNam

$conn->account = $serverInfo['account'];
$conn->charset = self::$connsConfig[$connName]['charset'];
if (false == $conn->connect(
$serverInfo['host'],
$serverInfo['port'],
$serverInfo['timeout'] ?? 0.1
)) {
if (false == $conn->connect($serverInfo['host'], $serverInfo['port'], $serverInfo['timeout'] ?? 0.1)) {
--self::$initConnCount[$connName];
$message = 'SMProxy@MySQL server has gone away';
$errMessage = self::writeErrMessage(1, $message, ErrorCode::ER_HAS_GONE_AWAY);
Expand All @@ -206,15 +207,19 @@ public static function initConn(\swoole_server $server, int $fd, string $connNam
}
throw new MySQLException($message);
}
$timeout_message = 'Connection ' . $serverInfo['host'] . ':' . $serverInfo['port'] .
' waiting timeout, timeout=' . $serverInfo['timeout'];
$client = self::coPop($chan, $serverInfo['timeout'] * 3);
if ($client === false) {
--self::$initConnCount[$connName];
if ($tryStep < 3) {
return self::initConn($server, $fd, $connName, ++$tryStep);
} else {
throw new MySQLException($timeout_message);
$message = 'SMProxy@Connection ' . $serverInfo['host'] . ':' . $serverInfo['port'] .
' waiting timeout, timeout=' . $serverInfo['timeout'];
$errMessage = self::writeErrMessage(1, $message, ErrorCode::ER_HAS_GONE_AWAY);
if ($server->exist($fd)) {
$server->send($fd, getString($errMessage));
}
throw new MySQLException($message);
}
}
$id = spl_object_hash($client);
Expand Down Expand Up @@ -252,36 +257,6 @@ public static function destruct(Client $cli, string $connName)
});
}

/**
* 协程pop
*
* @param $chan
* @param int $timeout
*
* @return bool
*/
private static function coPop($chan, $timeout = 0)
{
if (version_compare(swoole_version(), '4.0.3', '>=')) {
return $chan->pop($timeout);
} else {
if (0 == $timeout) {
return $chan->pop();
} else {
$writes = [];
$reads = [$chan];
$result = $chan->select($reads, $writes, $timeout);
if (false === $result || empty($reads)) {
return false;
}

$readChannel = $reads[0];

return $readChannel->pop();
}
}
}

/**
* 断重链.
*
Expand Down
42 changes: 26 additions & 16 deletions src/SMProxyServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,20 @@ public function onReceive(\swoole_server $server, int $fd, int $reactor_id, stri
$model = 'write';
}
$key = $this ->compareModel($model, $server, $fd);
if (isset($this->mysqlClient[$fd][$model])) {
$client = $this->mysqlClient[$fd][$model];
if (isset($this->mysqlClient[$fd][$key])) {
$client = self::coPop($this->mysqlClient[$fd][$key], $this->dbConfig[$key]['serverInfo']['timeout']);
if ($data && $client->client->isConnected()) {
$client->client->send($data);
}
$this->mysqlClient[$fd][$key]->push($client);
} else {
$client = MySQLPool::fetch($key, $server, $fd);
$this->mysqlClient[$fd][$model] = $client;
if ($data && $client->client->isConnected()) {
$client->client->send($data);
$result = $client->client->send($data);
if ($result) {
$this->mysqlClient[$fd][$key] = new Coroutine\Channel(1);
$this->mysqlClient[$fd][$key]->push($client);
}
}
}
}
Expand Down Expand Up @@ -124,19 +128,25 @@ public function onClose(\swoole_server $server, int $fd)
unset($this->connectHasAutoCommit[$fd]);
}
if (isset($this->mysqlClient[$fd])) {
if (isset($this->mysqlClient[$fd]['write'] ->client) && $this->mysqlClient[$fd]['write'] ->client && $this->mysqlClient[$fd]['write'] ->client->isConnected()) {
if ($connectHasTransaction) {
$this->mysqlClient[$fd]['write']->client->send(getString([9, 0, 0, 0, 3, 82, 79, 76, 76, 66, 65, 67, 75]));
}
if ($connectHasAutoCommit) {
$this->mysqlClient[$fd]['write']->client->send(getString([
17, 0, 0, 0, 3, 115, 101, 116, 32, 97, 117, 116, 111, 99, 111, 109, 109, 105, 116, 61, 49,
]));
foreach ($this->mysqlClient[$fd] as $key => $mysqlClient) {
$model = explode(DB_DELIMITER, $key)[0];
$conn = self::coPop($mysqlClient, $this->dbConfig[$key]['serverInfo']['timeout']);
if ($conn) {
if ($model == 'write') {
if (isset($conn ->client) && $conn ->client && $conn ->client->isConnected()) {
if ($connectHasTransaction) {
$conn->client->send(getString([9, 0, 0, 0, 3, 82, 79, 76, 76, 66, 65, 67, 75]));
}
if ($connectHasAutoCommit) {
$conn->client->send(getString([
17, 0, 0, 0, 3, 115, 101, 116, 32, 97, 117, 116, 111, 99, 111, 109, 109, 105, 116, 61, 49,
]));
}
}
}
MySQLPool::recycle($conn);
}
}
foreach ($this->mysqlClient[$fd] as $mysqlClient) {
MySQLPool::recycle($mysqlClient);
}
unset($this->mysqlClient[$fd]);
}
if (isset($this->connectReadState[$fd])) {
Expand Down Expand Up @@ -194,7 +204,7 @@ private function setStartConns()
$test_client = new \Swoole\Coroutine\Client(SWOOLE_SOCK_TCP);
if (!$test_client->connect($value['serverInfo']['host'], $value['serverInfo']['port'], $value['serverInfo']['timeout'])) {
throw new MySQLException('connect ' . explode(DB_DELIMITER, $key)[0] .
' ' . explode(DB_DELIMITER, $key)[1] . ' failed, ErrorCode: ' . $test_client->errCode . "\n");
' ' . explode(DB_DELIMITER, $key)[1] . ' failed, ErrorCode: ' . $test_client->errCode . "\n");
}
$test_client->close();
//初始化连接
Expand Down

0 comments on commit 3879dd8

Please sign in to comment.